These are chat archives for ReactiveX/RxJava

22nd
Mar 2016
Tushar Mathur
@tusharmath
Mar 22 2016 17:40
I have the following code —
import rx.lang.scala.Observable
import scala.concurrent.duration._

object Main {
  def main(args: Array[String]): Unit = {
    val a = Observable.interval(1000 millis).map(_ => "A")
    val d = Observable.interval(3000 millis).map(_ => "D")
    val e = Observable.interval(5000 millis).map(_ => "E")
    a.merge(d).merge(e)
      .take(100)
      .subscribe(x => println(x))
  }
}
But when I do sbt run it outputs nothing
Can someone tell me whats wrong? Why is my process ending without waiting for 100 events?
cavemansspa
@cavemansspa
Mar 22 2016 18:07
@tusharmath -- why are you doing multiple merges -- don't you want Observable.merge(a, d, e).take(100)?
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Mar 22 2016 18:07
@tusharmath you're not blocking main thread
Dmitriy Zaitsev
@DmitriyZaitsev
Mar 22 2016 18:08
@tusharmath you have to synchronize your code
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Mar 22 2016 18:09
Observable.interval() works on Schedulers.computation() by default, so your chain works on 3 threads (each Observable.interval() on separate thread) and main thread ends before chain processing
Dorus
@Dorus
Mar 22 2016 18:09
I know you guys always use toBlocking and toList stuff to make queries blocking, but that's just tutorial useage. In real world scenario's your thread usually wont kill your application if you allow it to continue to do stuff. Better to follow the query with a readLine or so.
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Mar 22 2016 18:10
Right, but for tutorial/sample usage like this it's ok to use toBlocking() I guess
Dorus
@Dorus
Mar 22 2016 18:10
Or add synchronization to the onCompleted method of your subscription.
If you are writing a testcase, you can rely on testScheduler to block.
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Mar 22 2016 18:11
which is basically manual toBlocking() :) (it was about synchronization)
Dmitriy Zaitsev
@DmitriyZaitsev
Mar 22 2016 18:26
@tusharmath try smth like this:
  def main(args: Array[String]) {
    val lock: AnyRef = new AnyRef
    val a = Observable.interval(1000 millis).map(_ => "A")
    val d = Observable.interval(3000 millis).map(_ => "D")
    val e = Observable.interval(5000 millis).map(_ => "E")
    a.merge(d).merge(e)
      .take(100)
      .subscribe(new Subscriber[String]() {
        def onCompleted() {
          lock synchronized {
            lock.notifyAll()
          }
        }
        def onError(throwable: Throwable) {}
        def onNext(s: String) = println(s)
      })
    lock synchronized {
      lock.wait()
    }
  }
}
Dorus
@Dorus
Mar 22 2016 18:27
excelent
systemfreund
@systemfreund
Mar 22 2016 19:33
i am using an io scheduler's createWorker method to do work with a worker... but apparently the created (daemon-)threads are never terminated, as the thread-count keeps rising. but i am pretty sure that the code which does the work is actually terminating
Dorus
@Dorus
Mar 22 2016 19:46
Workers have an unsubscribe method
@systemfreund Use that.
systemfreund
@systemfreund
Mar 22 2016 19:47
i do, but didn't change much... everytime i trigger that worker the thread-count still keeps getting higher
Dorus
@Dorus
Mar 22 2016 19:47
I kinda dislike the interface, shame there is no 'schedule final' that let all current work finish and then kill the worker thread. I guess you could use worker.schedule(() => worker.unsubscribe())
systemfreund
@systemfreund
Mar 22 2016 19:48
yes, that's basically what i am doing ... () -> doRealWork(); worker.unsubscribe();
Dorus
@Dorus
Mar 22 2016 19:48
Mm then i'm clueless :S
systemfreund
@systemfreund
Mar 22 2016 21:02
@Dorus looks like i forgot to unsubscribe at another place :) now everything appears to be fine