import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import scala.concurrent.Future
class ObservableOps[A](delegate: Observable[A]) {
def concatWithLast(fn: A=>Observable[A]): Observable[A] = {
Observable.unsafeCreate[A] { subscriber =>
var lastOpt: Option[A] = None
var c0 = Cancelable.empty
def subscribeWithLast(): Unit = {
lastOpt match {
case None =>
subscriber.onComplete()
case Some(last) =>
c0 = fn(last).subscribe(subscriber)
}
}
val c1 =
delegate.subscribe(
new Subscriber[A] {
override implicit def scheduler: Scheduler =
subscriber.scheduler
override def onNext(elem: A): Future[Ack] = {
lastOpt = Some(elem)
subscriber.onNext(elem)
}
override def onError(ex: Throwable): Unit = {
subscriber.onError(ex)
}
override def onComplete(): Unit = {
subscribeWithLast()
}
}
)
Cancelable.collection(c0, c1)
}
}
}
def existingMessages: Observable[Message] = ...
def tailObserver: Observable[Message] = ...
existingMessages ++ tailObserver
Seems like @squadgazzz use case is the same
my case is similar, but I know when to stop and this means I know what’s the last element.
val obs1 = Observable
.fromIterable(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
.doOnNext(e => Task(println(e)))
.takeWhileInclusive(_ < 5)
.flatMap {
case 5 => Observable.pure(5) ++ Observable.fromIterable(List(10, 11, 12, 13, 14))
case e => Observable.pure(e)
}
In case I don’t know the last element we should properly use publishSelector
somehow
concatWithLast
is a very mild hack for the gains it promises
concatWithLast
is incredibly and admittedly naive
Can anyone please explain what's going on here?
val obs = Observable.fromIterable(1 to 2)
val result = obs.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
}
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5.seconds.toMillis)
result: [1,2,11,12]
val obs = Observable.fromIterable(1 to 2)
val result = obs.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
} ++ Observable.fromIterable(101 to 102)
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5.seconds.toMillis)
result: [11,12,101,102]
Where did obs
's elements gone?
publishSelector
works with a PublishSubject
underneath, and it might lose initial events if the underlying implementation subscribes too early.
val obs = Observable.fromIterable(1 to 2)
val result =
obs
.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
}
.publishSelector(_ ++ Observable.fromIterable(101 to 102))
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5000)