@phongngtuan sorry for not replying that long, I’m on a vacation right now.
your implementation using
Observable
only worksEither
right?
No, you can use it for any A
. The Either
part is for handling errors-or-results produced by .attempt
.
In your Task[Option[A]]
case you’d have something like this.
Observable.fromTask(t.attempt).collect { case Right(Some(a)) => a }
This observable will emit a
for Some(a)
and will complete empty on None
or an error.
Now, when you combine these observables with merge
, you’d have your tasks running in parallel, emitting results as soon as they arrive.
Finally, taking firstOptionL
on the merged observable will give you a Task[Option[A]]
which will emit the first result if it’s available, or None
.
To summarise, try this
val tasks: List[Task[Option[A]]] = ???
val observables = tasks map { t =>
Observable.fromTask(t.attempt).collect { case Right(Some(a)) => a }
}
Observable.fromIterable(observables).merge.firstOptionL
Observable.fromIterable(observables).merge.firstOptionL
won't give any result if observables
all return None, didn't know that it will also return None if the all the underlying Observable are "finished". Am I right?
@phongngtuan yup. Quoting the scaladoc
Returns an Option because the source can be empty.
FYI, there is also .firstL: Task[A]
that fails with NoSuchElementException
on empty source.
shift
-ing after every call xD
hi guys, I started working on #690 and can't figure out why my test is not starting :(
here is the code
monix/monix@b827aaf
reactiveJVM/testOnly *TypeClassLawsForObservableSuite
<- works as expectedreactiveJVM/testOnly *TypeClassLawsForSubjectSuite
<- doesn't do anything like there no test
???
is just a placeholder
window.setTimeout(..., 0)
has a small delay. Would it be possible to use setImmediate
using a polyfill instead? I'd like to know if it makes a difference for my application. Is it correct that I have to reimplement AsyncScheduler using setImmediate
?
Hi everyone,
It’s possible to cancel a pure computation task ?
Example of what I’m trying to do:
val heuristic: Task[Option[A]] = ???
val bruteForce: Task[Option[A]] = ???
Task
.racePair(heuristic, bruteForce)
.flatMap {
case Left((Some(a), slower)) => slower.cancel.map(_ => Some(a))
case Left((None, slower)) => slower.join
case Right((slower, Some(b))) => slower.cancel.map(_ => Some(b))
case Right((slower, None)) => slower.join
}
Task { inline hot loop }
, it's effectively not cancelable at all
This is not cancellable:
Task.delay { … }
while, this, is:
Task.delay { … }.shift
??
but here
for {
a <- Task { ... A ... }
_ <- Task.shift
b <- Task { ... B ...}
} yield a + b
Either nothing executes, or A is executed, or A and B are both executed
With this:
heuristic.flatMap {
case Some(a) => Task.now(Some(a))
case _ => bruteForce
}
My benchmark gives me ~4000 ops/s
While with this
bruteForce.flatMap {
case Some(a) => Task.now(Some(a))
case _ => heuristic
}
The same benchmark gives me ~5 ops/s
So I don’t understand why with this code:
Task
.racePair(heuristic, bruteForce)
.flatMap {
case Left((Some(a), _)) => Task.now(Some(a)) // I don’t cancel anymore here
case Left((None, slower)) => slower.join
case Right((_, Some(b))) => Task.now(Some(b)) // I don’t cancel anymore here
case Right((slower, None)) => slower.join
}
The same benchmark gives me ~40 ops/s
With this last impl, I was expecting to get the 4000 ops/s.