Task
is represented as: (A => Unit) => Unit
?IO
is () => (A=>Unit) => Unit
?async
) has a shape similar to that. I'm not really aware (contributors please correct me if wrong) of a detailed explanation of Monix internals specifically. I touch on some aspects in my talk on Fibers (they are for an abstract IO, so most things should be relevant), so I would recommend starting from there
IO/Task
store it in suspended form in a data structure, and it does not get evaluated until interpretation, whereas Future sends it to a thread pool almost straight away (in the implementation of transform
), and that's the bit that breaks Referential Transparency for Future. Unfortunately in both implementations you need to jump through some hoops (various methods) to get to the exact place
@Garnek20 cats-effect compatible types usually stop running next steps after some condition (e.g. after an async boundary or every several flatMaps).
But we also require effect types to provide a cancelable
builder that can execute arbitrary user-supplied logic for cancellation. It can be used, for instance, to make a call with Java thread interruption, or lift CompletableFuture
from Java 8 with its cancel logic.
Hello everyone, I've been working on bifunctor implementation for Monix Task lately (sorry for ignoring issues on monix/monix
:'D I needed to focus) and I'm ready to share my progress: https://github.com/monix/monix-bio
Hi guys, how do I restart monix observable with backoff?
I found that there is
final def onErrorRestart(maxRetries: Long): Observable[A] = {
require(maxRetries >= 0, "maxRetries should be positive")
new OnErrorRetryCountedObservable(self, maxRetries)
}
I think that it will be sufficient to add .delay
on selft
but unfortunately OnErrorRetryCountedObservable
is package private. Is there any other way to do that?
observable.doOnNext(el => ref.update(...))
A bit ugly but should work:
def retryBackoffNew[A](source: Observable[A],
maxRetries: Int, firstDelay: FiniteDuration): Observable[A] = {
@volatile var resetDelay = false
def loop(source: Observable[A], retriesLeft: Int, currentDelay: FiniteDuration): Observable[A] =
source.onErrorHandleWith {
case ex: Exception =>
if (retriesLeft > 0) {
val newDelay = if (resetDelay) {
resetDelay = false
firstDelay
} else {
currentDelay
}
loop(source, retriesLeft - 1, newDelay * 100)
.delayExecution(newDelay)
}
else
Observable.raiseError(ex)
}
loop(source.doOnStart(_ => Task {resetDelay = true}), maxRetries, firstDelay)
Perhaps there is a better way to write this. Other option is custom operator like OnErrorRetryCountedObservable
cats-retry
so I don't know if it supports Observable
(it doesnt have Sync
)
redeemWith
that does both. If you don't want it to change the type, you can use guaranteeCase
, it will also work for cancelation
guaranteeCase
won't have access to the value. There's also continual
https://github.com/typelevel/cats-effect/blob/master/core/shared/src/main/scala/cats/effect/Concurrent.scala#L392 which should be available via cats-effect
Task
because that's what I think about it when I hear Monix but I feel like it is more important to have a simple migration and keep it an alias to BIO[Throwable, A]