def existingMessages: Observable[Message] = ...
def tailObserver: Observable[Message] = ...
existingMessages ++ tailObserver
Seems like @squadgazzz use case is the same
my case is similar, but I know when to stop and this means I know what’s the last element.
val obs1 = Observable
.fromIterable(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
.doOnNext(e => Task(println(e)))
.takeWhileInclusive(_ < 5)
.flatMap {
case 5 => Observable.pure(5) ++ Observable.fromIterable(List(10, 11, 12, 13, 14))
case e => Observable.pure(e)
}
In case I don’t know the last element we should properly use publishSelector
somehow
concatWithLast
is a very mild hack for the gains it promises
concatWithLast
is incredibly and admittedly naive
Can anyone please explain what's going on here?
val obs = Observable.fromIterable(1 to 2)
val result = obs.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
}
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5.seconds.toMillis)
result: [1,2,11,12]
val obs = Observable.fromIterable(1 to 2)
val result = obs.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
} ++ Observable.fromIterable(101 to 102)
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5.seconds.toMillis)
result: [11,12,101,102]
Where did obs
's elements gone?
publishSelector
works with a PublishSubject
underneath, and it might lose initial events if the underlying implementation subscribes too early.
val obs = Observable.fromIterable(1 to 2)
val result =
obs
.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
}
.publishSelector(_ ++ Observable.fromIterable(101 to 102))
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5000)
publishSelector
call.
val buf = new OutputStream {
override def write(b: Int): Unit = {
/* previously bytes were emitted to the next stage of the flow */
}
}