existingMessages ++ tailObserver
Seems like @squadgazzz use case is the same
my case is similar, but I know when to stop and this means I know what’s the last element.
val obs1 = Observable
.fromIterable(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
.doOnNext(e => Task(println(e)))
.takeWhileInclusive(_ < 5)
.flatMap {
case 5 => Observable.pure(5) ++ Observable.fromIterable(List(10, 11, 12, 13, 14))
case e => Observable.pure(e)
}
In case I don’t know the last element we should properly use publishSelector
somehow
concatWithLast
is a very mild hack for the gains it promises
concatWithLast
is incredibly and admittedly naive
Can anyone please explain what's going on here?
val obs = Observable.fromIterable(1 to 2)
val result = obs.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
}
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5.seconds.toMillis)
result: [1,2,11,12]
val obs = Observable.fromIterable(1 to 2)
val result = obs.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
} ++ Observable.fromIterable(101 to 102)
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5.seconds.toMillis)
result: [11,12,101,102]
Where did obs
's elements gone?
publishSelector
works with a PublishSubject
underneath, and it might lose initial events if the underlying implementation subscribes too early.
val obs = Observable.fromIterable(1 to 2)
val result =
obs
.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
}
.publishSelector(_ ++ Observable.fromIterable(101 to 102))
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5000)
publishSelector
call.
val buf = new OutputStream {
override def write(b: Int): Unit = {
/* previously bytes were emitted to the next stage of the flow */
}
}
Hello, I'm using monix circuit breaker. All in all it works great, but I'd like to let some of the exceptions happen and still be thrown without counting towards failures. Does anyone have any hints on how to achieve that?
Currently it looks similar to:
circuitBreaker
.protect { Task... }
.toAsync(Async[M], eff)
.recoverWith {
case t: ErrIDontWantToCount => Effect[M].raiseError(t)
case NonFatal(ex) if (n - 1) > 0 =>
retry(n - 1)
case t => Effect[M].raiseError(FailedLastRetry(t))
}
and ErrIDontWantToCount
is handled correctly, but counts towards failures