Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
ClementWee
@ClementWee
And I get the first log message "DataHolder Listener Event"
So the source is emitting until that point
Is there an issue with chaining doOnNext?
Piotr Gawryś
@Avasil

Is there an issue with chaining doOnNext?

No, it's fine

ClementWee
@ClementWee
hmm

my source Observables are defined like this (following the website tutorial):

val massgn = MultiAssignCancellable()

Observable.create[String](Unbounded){ sub =>
implicit val scheduler = sub.scheduler
val c = BooleanCancellable(() => off())
massgn:=c
onListen(() => sub.onNext(item.value()).syncStopOrFailure(_ => c.cancel()))
c
}

Is there anything I am missing here?

Oleg Pyzhcov
@oleg-py
Well, this looks quite convoluted so I'm not even sure what you're trying to do. Usually my js wrappers are something along the lines of
Observable.create[String](Unbounded) { sub =>
  onListen(item => sub.onNext(item.value()))
  Cancelable(() => off())
}
You'll have to minimize your problem to advance, there's nothing immediately obvious from the pieces you've posted but they don't provide complete information.
ClementWee
@ClementWee
Hmm ... my general concept is that I am taking all the UI events and using Observable.merge to merge them into a single Observable
This Observable is then processed into a new DataState and used to reload the UI.
The complication is that parts of this UI are dropdowns. The dropdowns need to be deactivated when updating their value so they don't re-trigger the whole long process.
Glen Marchesani
@fizzy33
I am integrating with an existing synchronous java api. Reading the docs https://monix.io/docs/current/eval/task.html#blocking-for-a-result makes me think how I do that is do the blocking in a future with scala.concurrent.blocking
or can I just use scala.concurrent.blocking directly an not have to make sure I am in a scala Future ?
ClementWee
@ClementWee
in what sense is your API blocking?
Piotr Gawryś
@Avasil

@fizzy33 blocking gives a hint to your current thread pool to potentially create an extra thread for other computations. AFAIK many ExecutionContexts and Monix Schedulers supports that but it's not always the case.

If you're using Task, I would recommend to explicitly run this code on a dedicated thread pool (e.g. Scheduler.io) with executeOn

Glen Marchesani
@fizzy33
great and as always thanks @Avasil
Ilya
@squadgazzz

Hello, community!
Can you help me please to solve the following task?

  val obs1 = Observable
    .fromIterable(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
    .doOnNext(e => Task(println(e)))
    .takeWhileInclusive(_ < 5)
  val obs2 = obs1.last.concatMap {
    case 5 => Observable.fromIterable(List(10, 11, 12, 13, 14))
    case _ => Observable.empty
  }
  (obs1 ++ obs2).toListL.runToFuture.foreach(l => println(l.mkString(",")))
  Thread.sleep(10.seconds.toMillis)

We have some obs1, when it finishes I need to check if the last element satisfies some predicate I need to append another Observable to it including the last element. Else do not append anything. So the result should be 1,2,3,4,5,10,11,12,13,14
This code works, but with unwanted behavior.
We have doOnNext(e => Task(println(e))) and every element prints twice here.
In real code, there's some logic that shouldn't be evaluated more than once.
I tried another solution

  val obs1 = Observable
    .fromIterable(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
    .doOnNext(e => Task(println(e)))
    .takeWhileInclusive(_ < 5)
    .publishSelector { obs2 =>
      val last = obs2.last.concatMap {
        case 5 => Observable.fromIterable(List(10, 11, 12, 13, 14))
        case _ => Observable.empty
      }
      obs2 ++ last
    }

But here I have 1,2,3,4,5 only.
Any solution for this task please? =)

1 reply
Glen Marchesani
@fizzy33
@Avasil what do you think of this for the use case of having an observable concatenated with the last element and a fn

class ObservableOps[A](delegate: Observable[A]) {

  def concatWithLast(fn: A=>Observable[A]): Observable[A] = {
    Observable.unsafeCreate[A] { subscriber =>

      var lastOpt: Option[A] = None

      var c0 = Cancelable.empty

      def subscribeWithLast(): Unit = {
        lastOpt match {
          case None =>
            subscriber.onComplete()
          case Some(last) =>
            c0 = fn(last).subscribe(subscriber)
        }
      }

      val c1 =
        delegate.subscribe(
          new Subscriber[A] {

            override implicit def scheduler: Scheduler =
              subscriber.scheduler

            override def onNext(elem: A): Future[Ack] = {
              last = Some(elem)
              subscriber.onNext(elem)
            }

            override def onError(ex: Throwable): Unit = {
              subscriber.onError(ex)
            }

            override def onComplete(): Unit = {
              subscribeWithLast()
            }
          }
        )

      Cancelable.cancelAll(Iterable(c0, c1))

    }

  }

}
Glen Marchesani
@fizzy33
ooof
a few fixes after testing

import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber

import scala.concurrent.Future

class ObservableOps[A](delegate: Observable[A]) {

  def concatWithLast(fn: A=>Observable[A]): Observable[A] = {
    Observable.unsafeCreate[A] { subscriber =>

      var lastOpt: Option[A] = None

      var c0 = Cancelable.empty

      def subscribeWithLast(): Unit = {
        lastOpt match {
          case None =>
            subscriber.onComplete()
          case Some(last) =>
            c0 = fn(last).subscribe(subscriber)
        }
      }

      val c1 =
        delegate.subscribe(
          new Subscriber[A] {

            override implicit def scheduler: Scheduler =
              subscriber.scheduler

            override def onNext(elem: A): Future[Ack] = {
              lastOpt = Some(elem)
              subscriber.onNext(elem)
            }

            override def onError(ex: Throwable): Unit = {
              subscriber.onError(ex)
            }

            override def onComplete(): Unit = {
              subscribeWithLast()
            }
          }
        )

      Cancelable.collection(c0, c1)

    }

  }

}
Piotr Gawryś
@Avasil

@fizzy33 Did you try:

  left
    .pipeThroughSelector(Pipe.behavior[Int](0), { source: Observable[Int] =>
      source ++ source.last.flatMap(right)
    })

that I suggested earlier?

Seems like @squadgazzz use case is the same
Glen Marchesani
@fizzy33
@Avasil I did try that and it wasn't working...
the source wasn't closed
I can try it again as I fixed a bunch of stuff that "may" have caused that
I mean the source didn't have onComplete eagerly called
which is needed
the use case is reading the tail of a message log
so we want existing messages
then switch to a hot observable that is the tail
so it is in effect
def existingMessages: Observable[Message] = ...
def tailObserver: Observable[Message] = ...
existingMessages ++ tailObserver
where we need to get the onComplete on existingMessagess otherwise readers hang around forever
fwiw Monix has made this much much simpler
I plan to create a simple example of what you recommended to show what I ran into
just too many things needed before we release
Ilya
@squadgazzz

Seems like @squadgazzz use case is the same

my case is similar, but I know when to stop and this means I know what’s the last element.

  val obs1 = Observable
    .fromIterable(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
    .doOnNext(e => Task(println(e)))
    .takeWhileInclusive(_ < 5)
    .flatMap {
      case 5 => Observable.pure(5) ++ Observable.fromIterable(List(10, 11, 12, 13, 14))
      case e => Observable.pure(e)
    }

In case I don’t know the last element we should properly use publishSelector somehow

Glen Marchesani
@fizzy33
@squadgazzz yeah I use that same cheat (knowing when to stop)
Ilya
@squadgazzz

@squadgazzz yeah I use that same cheat (knowing when to stop)

So why not to use flatMap?

Glen Marchesani
@fizzy33
I have two observables. One is the existingMessages (which is backed by a reader of the data store) and the tailObserver for new messages
I need the existingMessages observable to have onComplete eagerly called
the flatMap means the source (existingMessages in my case) never has onComplete called
so it leaves lots of readers hanging out there
Ilya
@squadgazzz
oh I see. let me think too)
Glen Marchesani
@fizzy33
Sort of the whole reason I did concatWithLast was to eagerly call onComplete on the original observable
the cool thing for me is I now have an in memory hot path for messages so the latency goes way way down
and the amount of code to do that is almost nil
I just get it for free