Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    Can you point back at the example?
    You can give it an scheduler yes
    Tushar Mathur
    @tusharmath
      def main(args: Array[String]) {
        val lock: AnyRef = new AnyRef
        val a = Observable.interval(1000 millis).map(_ => "A")
        val d = Observable.interval(3000 millis).map(_ => "D")
        val e = Observable.interval(5000 millis).map(_ => "E")
        a.merge(d).merge(e)
          .take(100)
          .subscribe(new Subscriber[String]() {
            def onCompleted() {
              lock synchronized {
                lock.notifyAll()
              }
            }
            def onError(throwable: Throwable) {}
            def onNext(s: String) = println(s)
          })
        lock synchronized {
          lock.wait()
        }
      }
    }
    Dorus
    @Dorus
    What part needs to run on one thread?
    There are multiple options, but 90% of the time the default will perform best and adding too many hints isn't optimal.
    Tushar Mathur
    @tusharmath
    I don't know how this would actually work
    what is off loaded to the thread pool and what is not
    For instance — in this case
    Dorus
    @Dorus
    You can set the scheduler on interval, or use observeOn (and subscribeOn) to give hints, gurantee some parts are on the IO scheduler etc.
    Tushar Mathur
    @tusharmath
    1. Scheduling could be done on a different thread
    2. The map functions could be called in parallel too
    Dorus
    @Dorus
    Pretty much everything runs on the threadpool since interval has an delay.
    Tushar Mathur
    @tusharmath
    So if the map function was doing something computationally expensive, would it be automatically moved on a different thread?
    Dorus
    @Dorus
    Well, the other 2 intervals will keep running on different threads yes.
    However, the next interval of the one that's slow wont fire.
    I'm not 100% sure if it plays catch up later on or not.
    Tushar Mathur
    @tusharmath
    what about the map()
    ?
    Dorus
    @Dorus
    Map never runs parallel with itself, that's a propertie of Rx.
    Part of the Rx contract.
    If you need the stuff inside Map to run paralele, you should have used flatMap
    However, the 3 sources do run parallel.
    (or could, they're not guaranteed to do so, they only will if they fire at the same time ofcourse.)
    Tbh Rx is more suited for IO work than computation heavy work.
    But you can still work with it
    Tushar Mathur
    @tusharmath
    i see
    Dorus
    @Dorus
    The only way to make things run parallel in Rx is by calling them async. Operations like merge you use here, or flatMap, do run concurrent.
    At least, everything before merge runs concurrent. After merge the pipeline is serialized.
    Hope this makes sense.
    Things in onNext will run serialized, but the 3 calls to map can run parallele.
    Tushar Mathur
    @tusharmath
    How does one know what part of the code is being executed in parallel?
    Dorus
    @Dorus
    Just that the events from a single interval source here wont run parallel.
    Observable.interval(1000 millis) This spawns a thread that calls observable.onNext and then schedule itself to do so again in 1 second. If onNext takes 900ms to return, it will wait for 100ms.
    The thread that spawned comes from the scheduler.
    The call to onNext will then run the code inside .map(...), continue to run the code in .merge(...), continue with take(100) and then subscribe. It returns and sleeps 100ms.
    Observable.interval(3000 millis) does the same, because thread1 is bussy, the scheduler gives it a new thread. This thread will call onNext runs map in parallel with a and then get to merge. Merge is already bussy with a at this time (assuming is arrives there just after a did), so it schedules it's event and returns. It will then schedule itself to call onNext again 3 seconds after the first event.
    The thread from a will then probably be used to call onNext on take(100) and subscribe, before returning to a.
    Dorus
    @Dorus
    You get the picture?
    Tushar Mathur
    @tusharmath
    i think i do
    I am trying to draw parallels between RxJS and RxScala.
    thanks a ton for the explaination
    :)
    Dorus
    @Dorus
    Mmm i dont think this is different between the two. I'm just talking about Rx here.
    (pc crashed so i was a bit slow)
    James Moore
    @banshee
    What's the right way to do nested observables? Something like Observable<Observable<String>>, where the inner observable should terminate if the outer observable calls onComplete or onError?
    Something like switchMap that doesn't flatten out the inner observable
    Dorus
    @Dorus
    RxJava already does that for onError i think.
    Not sure what to do about onCompeted, guess you can write an operator that does that.
    James Moore
    @banshee
    Got it - sounds like there's nothing builtin to do this that I'm not seeing
    Dorus
    @Dorus
    Not that i know of no.
    nicu marasoiu
    @nmarasoiu
    Hi, I used both delaySubscription with and without another Scheduler. Both do not actually induce any delay on the subscribers: for instance starting 30 observables with delays up to many seconds, merging them in a new one, and subscribing on the merged one, and they all start in 22 ms
    Dorus
    @Dorus
    @nmarasoiu Are you sure you are measuring the startup time, and not the execution time of subscribe? Because it should subscribe almost instantly (async code), but only does the actual work after the said delay.
    I'm clueless otherwise. Perhaps share some code?