ContextShift[Task]
from a Scheduler
instance. If i understood correctly, you can do Scheduler#timer
and Scheduler#contextShift
provided that the right implicits are imported. However, when trying to create a ContextShift
from a scheduler instance (https://github.com/47deg/fetch/blob/4328bc61627eaf580624ec825cd63bbe17526659/examples/src/test/scala/MonixExample.scala#L44-L46) I get the following error: value contextShift is not a member of monix.execution.Scheduler
Hey guys! I'm working on a personal pet-project learning final-tagless and functional programming. Usually I was using Monix through its direct API.
Currently, I'm trying to abstract over the Task
, so choosing of the effect monad will be deferred until the end of the world.
I've faced with a problem of being unable to express some functionality using cats-effect:
Task(...).onErrorRestartLoop(MaxRetries) { (ex, retries, retry) =>
if (retries > 0) {
retry(retries - 1).delayExecution(1.second)
} else {
onError(ex)
Task.raiseError(ex)
}
}
This is a simple retries mechanism with a retries number threshold and delayed execution of retries.
import cats.MonadError
import cats.effect.Timer
import cats.syntax.all._
import scala.concurrent.duration._
def retry[F[_], A](fa: F[A])(retries: Int)(implicit F: MonadError[F, Throwable], T: Timer[F]): F[A] =
fa.handleErrorWith {
case _ if retries > 0 =>
T.sleep(1.second) >> retry(fa)(retries - 1)
case e =>
F.raiseError(e)
}
Example of use:
// not sure if Task.raiseError exists, maybe Task.eval(throw exception) will do
val task = Task.eval(println("foo")) *> Task.raiseError(new Exception("boom"))
retry(task)(5)
Output:
foo
foo
foo
foo
foo
foo
java.lang.Exception: boom
type Dataset = (DataSpecification, DataSetRef)
val clients = Nested[Task, Either[DataLakeRepositoryError, ?], Seq[Client]](dlRegistry.listClients())
def clientDatasets(client: String) =
Nested[Task, Either[DataLakeRepositoryError, ?], Seq[Dataset]](dlRegistry.listDatasets(client))
val r: Task[_] = (clients map { cls: Seq[Client] ⇒
cls map { client ⇒
(clientDatasets( client.id) map { dsds: Seq[Dataset] ⇒
dsds map { ds ⇒
ds._2.shortName.pp("\n\n"); ds
}
}).value
}
}).value
s"r is ${await(r)}".pp
r is Right(List(Task.Map$995060245, Task.Map$958851644, Task.Map$119297310, Task.Map$713902690, Task.Map$1792185060, Task.Map$1688986248))
Task.sequence
and other things but the result is always something else than desired.
cats.data.Nested
see https://typelevel.org/cats/datatypes/nested.html
def listClients(): Task[Either[DataLakeRepositoryError, Seq[Client]]]
def listDatasets(clientId: String): Task[Either[DataLakeRepositoryError, Seq[(DataSpecification, DataSetRef)]]]
Task.Suspend
and did not know what to do with them.
Task.Suspend
is internal, that's not what matters, but rather the type you're expecting.
F[List[Client]]
and String => F[List(DataSpec, DataRef)]
.flatMap
to get to List[Client]
traverse(c => listDataSets(c.id))
flatTraverse