These are chat archives for ReactiveX/RxJava

4th
Apr 2016
Tushar Mathur
@tusharmath
Apr 04 2016 13:03
@cavemansspa Is there a method for Observable.merge?
@DmitriyZaitsev Sorry for the late response. Though your code works just perfectly find, I am unable to understand what is lock and how is it working here?
whst notifyAll doing?
Dmitriy Zaitsev
@DmitriyZaitsev
Apr 04 2016 14:05
@tusharmath lock.wait() causes the current thread to wait until another thread invokes lock.notify() or lock.notifyAll().
Since the reactive code executes asynchronously on a different thread, method main() can not be finished until rx.Observable emits elements.
As soon as the Observable is completed, it unlocks the lock object, which will cause the main thread to continue till the end.
Tushar Mathur
@tusharmath
Apr 04 2016 15:48
is it documented somewhere?
Dorus
@Dorus
Apr 04 2016 15:49
You mean the java language specs? yeah, those are documented.
Tushar Mathur
@tusharmath
Apr 04 2016 15:50
I mean this particular lock.wait thingy... sorry if it sounds stupid, I am new to scala
Samuel Tardieu
@samueltardieu
Apr 04 2016 16:00
@tusharmath Those are Java methods available on Object and thus any Java (or Scala) object, as toString() is for example.
Tushar Mathur
@tusharmath
Apr 04 2016 16:04
okay
Dorus
@Dorus
Apr 04 2016 16:10
Ah sorry, i got distracted. But looks like you already have an answer. I was going to point at the Java guarded blocks tutorial as these are Java methods available on every Object. The only scala part is using new AnyRef instead of new Object().
Tushar Mathur
@tusharmath
Apr 04 2016 19:26
I thought Rx was single threaded?
How many threads would I have created in the above code?
is it one per interval call?
Dmitriy Zaitsev
@DmitriyZaitsev
Apr 04 2016 20:05
interval implicitly subscribes on computation scheduler
your observables a, b, c emit elements on different threads
Tushar Mathur
@tusharmath
Apr 04 2016 20:10
But the actual excution is still happening on the main thread right?
every interval creates a new thread, which is quite essentially idle and waiting for a timeout. Eventually when it fires it goes to the main thread to get consumed?
Dorus
@Dorus
Apr 04 2016 20:28
I always understood it runs on a threadpool, so most of the time it will actually run singlethreaded, as not all timeouts are expected to happen at the same time.
However, the only thing it guarantees is that events run serialized, different events might still run parallel for different steps, and flatMap might even run everything concurrent (just that it starts serialized but if the inner observable has a delay, it already starts the next one).
Im not sure what the computation does, but @DmitriyZaitsev might be right that it starts multiple threads.
As that's often optimal for computation work.
Tushar Mathur
@tusharmath
Apr 04 2016 20:33
Which part would be pushed to the thread pool ?
In the above example
Will the map functions be called parallely ?
or would it be done on the main thread?
Or is it something we can control?
Dorus
@Dorus
Apr 04 2016 20:34
Can you point back at the example?
You can give it an scheduler yes
Tushar Mathur
@tusharmath
Apr 04 2016 20:34
  def main(args: Array[String]) {
    val lock: AnyRef = new AnyRef
    val a = Observable.interval(1000 millis).map(_ => "A")
    val d = Observable.interval(3000 millis).map(_ => "D")
    val e = Observable.interval(5000 millis).map(_ => "E")
    a.merge(d).merge(e)
      .take(100)
      .subscribe(new Subscriber[String]() {
        def onCompleted() {
          lock synchronized {
            lock.notifyAll()
          }
        }
        def onError(throwable: Throwable) {}
        def onNext(s: String) = println(s)
      })
    lock synchronized {
      lock.wait()
    }
  }
}
Dorus
@Dorus
Apr 04 2016 20:35
What part needs to run on one thread?
There are multiple options, but 90% of the time the default will perform best and adding too many hints isn't optimal.
Tushar Mathur
@tusharmath
Apr 04 2016 20:36
I don't know how this would actually work
what is off loaded to the thread pool and what is not
For instance — in this case
Dorus
@Dorus
Apr 04 2016 20:37
You can set the scheduler on interval, or use observeOn (and subscribeOn) to give hints, gurantee some parts are on the IO scheduler etc.
Tushar Mathur
@tusharmath
Apr 04 2016 20:37
  1. Scheduling could be done on a different thread
  2. The map functions could be called in parallel too
Dorus
@Dorus
Apr 04 2016 20:37
Pretty much everything runs on the threadpool since interval has an delay.
Tushar Mathur
@tusharmath
Apr 04 2016 20:38
So if the map function was doing something computationally expensive, would it be automatically moved on a different thread?
Dorus
@Dorus
Apr 04 2016 20:39
Well, the other 2 intervals will keep running on different threads yes.
However, the next interval of the one that's slow wont fire.
I'm not 100% sure if it plays catch up later on or not.
Tushar Mathur
@tusharmath
Apr 04 2016 20:39
what about the map()
?
Dorus
@Dorus
Apr 04 2016 20:40
Map never runs parallel with itself, that's a propertie of Rx.
Part of the Rx contract.
If you need the stuff inside Map to run paralele, you should have used flatMap
However, the 3 sources do run parallel.
(or could, they're not guaranteed to do so, they only will if they fire at the same time ofcourse.)
Tbh Rx is more suited for IO work than computation heavy work.
But you can still work with it
Tushar Mathur
@tusharmath
Apr 04 2016 20:45
i see
Dorus
@Dorus
Apr 04 2016 20:47
The only way to make things run parallel in Rx is by calling them async. Operations like merge you use here, or flatMap, do run concurrent.
At least, everything before merge runs concurrent. After merge the pipeline is serialized.
Hope this makes sense.
Things in onNext will run serialized, but the 3 calls to map can run parallele.
Tushar Mathur
@tusharmath
Apr 04 2016 20:49
How does one know what part of the code is being executed in parallel?
Dorus
@Dorus
Apr 04 2016 20:49
Just that the events from a single interval source here wont run parallel.
Observable.interval(1000 millis) This spawns a thread that calls observable.onNext and then schedule itself to do so again in 1 second. If onNext takes 900ms to return, it will wait for 100ms.
The thread that spawned comes from the scheduler.
The call to onNext will then run the code inside .map(...), continue to run the code in .merge(...), continue with take(100) and then subscribe. It returns and sleeps 100ms.
Observable.interval(3000 millis) does the same, because thread1 is bussy, the scheduler gives it a new thread. This thread will call onNext runs map in parallel with a and then get to merge. Merge is already bussy with a at this time (assuming is arrives there just after a did), so it schedules it's event and returns. It will then schedule itself to call onNext again 3 seconds after the first event.
The thread from a will then probably be used to call onNext on take(100) and subscribe, before returning to a.
Dorus
@Dorus
Apr 04 2016 20:54
You get the picture?
Tushar Mathur
@tusharmath
Apr 04 2016 20:55
i think i do
I am trying to draw parallels between RxJS and RxScala.
thanks a ton for the explaination
:)
Dorus
@Dorus
Apr 04 2016 21:00
Mmm i dont think this is different between the two. I'm just talking about Rx here.
(pc crashed so i was a bit slow)
James Moore
@banshee
Apr 04 2016 21:53
What's the right way to do nested observables? Something like Observable<Observable<String>>, where the inner observable should terminate if the outer observable calls onComplete or onError?
Something like switchMap that doesn't flatten out the inner observable
Dorus
@Dorus
Apr 04 2016 21:54
RxJava already does that for onError i think.
Not sure what to do about onCompeted, guess you can write an operator that does that.
James Moore
@banshee
Apr 04 2016 21:58
Got it - sounds like there's nothing builtin to do this that I'm not seeing
Dorus
@Dorus
Apr 04 2016 21:58
Not that i know of no.