concatWithLast
is incredibly and admittedly naive
Can anyone please explain what's going on here?
val obs = Observable.fromIterable(1 to 2)
val result = obs.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
}
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5.seconds.toMillis)
result: [1,2,11,12]
val obs = Observable.fromIterable(1 to 2)
val result = obs.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
} ++ Observable.fromIterable(101 to 102)
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5.seconds.toMillis)
result: [11,12,101,102]
Where did obs
's elements gone?
publishSelector
works with a PublishSubject
underneath, and it might lose initial events if the underlying implementation subscribes too early.
val obs = Observable.fromIterable(1 to 2)
val result =
obs
.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
}
.publishSelector(_ ++ Observable.fromIterable(101 to 102))
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5000)
publishSelector
call.
val buf = new OutputStream {
override def write(b: Int): Unit = {
/* previously bytes were emitted to the next stage of the flow */
}
}
Hello, I'm using monix circuit breaker. All in all it works great, but I'd like to let some of the exceptions happen and still be thrown without counting towards failures. Does anyone have any hints on how to achieve that?
Currently it looks similar to:
circuitBreaker
.protect { Task... }
.toAsync(Async[M], eff)
.recoverWith {
case t: ErrIDontWantToCount => Effect[M].raiseError(t)
case NonFatal(ex) if (n - 1) > 0 =>
retry(n - 1)
case t => Effect[M].raiseError(FailedLastRetry(t))
}
and ErrIDontWantToCount
is handled correctly, but counts towards failures