scala.concurrent.blocking
scala.concurrent.blocking
directly an not have to make sure I am in a scala Future ?
@fizzy33 blocking
gives a hint to your current thread pool to potentially create an extra thread for other computations. AFAIK many ExecutionContext
s and Monix Scheduler
s supports that but it's not always the case.
If you're using Task
, I would recommend to explicitly run this code on a dedicated thread pool (e.g. Scheduler.io
) with executeOn
Hello, community!
Can you help me please to solve the following task?
val obs1 = Observable
.fromIterable(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
.doOnNext(e => Task(println(e)))
.takeWhileInclusive(_ < 5)
val obs2 = obs1.last.concatMap {
case 5 => Observable.fromIterable(List(10, 11, 12, 13, 14))
case _ => Observable.empty
}
(obs1 ++ obs2).toListL.runToFuture.foreach(l => println(l.mkString(",")))
Thread.sleep(10.seconds.toMillis)
We have some obs1
, when it finishes I need to check if the last element satisfies some predicate I need to append another Observable to it including the last element. Else do not append anything. So the result should be 1,2,3,4,5,10,11,12,13,14
This code works, but with unwanted behavior.
We have doOnNext(e => Task(println(e)))
and every element prints twice here.
In real code, there's some logic that shouldn't be evaluated more than once.
I tried another solution
val obs1 = Observable
.fromIterable(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
.doOnNext(e => Task(println(e)))
.takeWhileInclusive(_ < 5)
.publishSelector { obs2 =>
val last = obs2.last.concatMap {
case 5 => Observable.fromIterable(List(10, 11, 12, 13, 14))
case _ => Observable.empty
}
obs2 ++ last
}
But here I have 1,2,3,4,5
only.
Any solution for this task please? =)
class ObservableOps[A](delegate: Observable[A]) {
def concatWithLast(fn: A=>Observable[A]): Observable[A] = {
Observable.unsafeCreate[A] { subscriber =>
var lastOpt: Option[A] = None
var c0 = Cancelable.empty
def subscribeWithLast(): Unit = {
lastOpt match {
case None =>
subscriber.onComplete()
case Some(last) =>
c0 = fn(last).subscribe(subscriber)
}
}
val c1 =
delegate.subscribe(
new Subscriber[A] {
override implicit def scheduler: Scheduler =
subscriber.scheduler
override def onNext(elem: A): Future[Ack] = {
last = Some(elem)
subscriber.onNext(elem)
}
override def onError(ex: Throwable): Unit = {
subscriber.onError(ex)
}
override def onComplete(): Unit = {
subscribeWithLast()
}
}
)
Cancelable.cancelAll(Iterable(c0, c1))
}
}
}
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import scala.concurrent.Future
class ObservableOps[A](delegate: Observable[A]) {
def concatWithLast(fn: A=>Observable[A]): Observable[A] = {
Observable.unsafeCreate[A] { subscriber =>
var lastOpt: Option[A] = None
var c0 = Cancelable.empty
def subscribeWithLast(): Unit = {
lastOpt match {
case None =>
subscriber.onComplete()
case Some(last) =>
c0 = fn(last).subscribe(subscriber)
}
}
val c1 =
delegate.subscribe(
new Subscriber[A] {
override implicit def scheduler: Scheduler =
subscriber.scheduler
override def onNext(elem: A): Future[Ack] = {
lastOpt = Some(elem)
subscriber.onNext(elem)
}
override def onError(ex: Throwable): Unit = {
subscriber.onError(ex)
}
override def onComplete(): Unit = {
subscribeWithLast()
}
}
)
Cancelable.collection(c0, c1)
}
}
}
def existingMessages: Observable[Message] = ...
def tailObserver: Observable[Message] = ...
existingMessages ++ tailObserver
Seems like @squadgazzz use case is the same
my case is similar, but I know when to stop and this means I know what’s the last element.
val obs1 = Observable
.fromIterable(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
.doOnNext(e => Task(println(e)))
.takeWhileInclusive(_ < 5)
.flatMap {
case 5 => Observable.pure(5) ++ Observable.fromIterable(List(10, 11, 12, 13, 14))
case e => Observable.pure(e)
}
In case I don’t know the last element we should properly use publishSelector
somehow
concatWithLast
is a very mild hack for the gains it promises
concatWithLast
is incredibly and admittedly naive
Can anyone please explain what's going on here?
val obs = Observable.fromIterable(1 to 2)
val result = obs.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
}
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5.seconds.toMillis)
result: [1,2,11,12]
val obs = Observable.fromIterable(1 to 2)
val result = obs.publishSelector { o =>
o ++ Observable.fromIterable(11 to 12)
} ++ Observable.fromIterable(101 to 102)
result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
Thread.sleep(5.seconds.toMillis)
result: [11,12,101,102]
Where did obs
's elements gone?