Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Tushar Mathur
    @tusharmath
    what about the map()
    ?
    Dorus
    @Dorus
    Map never runs parallel with itself, that's a propertie of Rx.
    Part of the Rx contract.
    If you need the stuff inside Map to run paralele, you should have used flatMap
    However, the 3 sources do run parallel.
    (or could, they're not guaranteed to do so, they only will if they fire at the same time ofcourse.)
    Tbh Rx is more suited for IO work than computation heavy work.
    But you can still work with it
    Tushar Mathur
    @tusharmath
    i see
    Dorus
    @Dorus
    The only way to make things run parallel in Rx is by calling them async. Operations like merge you use here, or flatMap, do run concurrent.
    At least, everything before merge runs concurrent. After merge the pipeline is serialized.
    Hope this makes sense.
    Things in onNext will run serialized, but the 3 calls to map can run parallele.
    Tushar Mathur
    @tusharmath
    How does one know what part of the code is being executed in parallel?
    Dorus
    @Dorus
    Just that the events from a single interval source here wont run parallel.
    Observable.interval(1000 millis) This spawns a thread that calls observable.onNext and then schedule itself to do so again in 1 second. If onNext takes 900ms to return, it will wait for 100ms.
    The thread that spawned comes from the scheduler.
    The call to onNext will then run the code inside .map(...), continue to run the code in .merge(...), continue with take(100) and then subscribe. It returns and sleeps 100ms.
    Observable.interval(3000 millis) does the same, because thread1 is bussy, the scheduler gives it a new thread. This thread will call onNext runs map in parallel with a and then get to merge. Merge is already bussy with a at this time (assuming is arrives there just after a did), so it schedules it's event and returns. It will then schedule itself to call onNext again 3 seconds after the first event.
    The thread from a will then probably be used to call onNext on take(100) and subscribe, before returning to a.
    Dorus
    @Dorus
    You get the picture?
    Tushar Mathur
    @tusharmath
    i think i do
    I am trying to draw parallels between RxJS and RxScala.
    thanks a ton for the explaination
    :)
    Dorus
    @Dorus
    Mmm i dont think this is different between the two. I'm just talking about Rx here.
    (pc crashed so i was a bit slow)
    James Moore
    @banshee
    What's the right way to do nested observables? Something like Observable<Observable<String>>, where the inner observable should terminate if the outer observable calls onComplete or onError?
    Something like switchMap that doesn't flatten out the inner observable
    Dorus
    @Dorus
    RxJava already does that for onError i think.
    Not sure what to do about onCompeted, guess you can write an operator that does that.
    James Moore
    @banshee
    Got it - sounds like there's nothing builtin to do this that I'm not seeing
    Dorus
    @Dorus
    Not that i know of no.
    nicu marasoiu
    @nmarasoiu
    Hi, I used both delaySubscription with and without another Scheduler. Both do not actually induce any delay on the subscribers: for instance starting 30 observables with delays up to many seconds, merging them in a new one, and subscribing on the merged one, and they all start in 22 ms
    Dorus
    @Dorus
    @nmarasoiu Are you sure you are measuring the startup time, and not the execution time of subscribe? Because it should subscribe almost instantly (async code), but only does the actual work after the said delay.
    I'm clueless otherwise. Perhaps share some code?
    nicu marasoiu
    @nmarasoiu
    we have a method m1 returning an observable in which' lambda it makes a request to ignite grid (a remoting call); we want to make a retry scheme in which we create 3 such observables with.delaySubscription of exponentially larger wait times, and we merge those in a single error delaying observble to consume from ignite, original calls or retries, as needed
    but the first observables are being consumed in the same time, and only other observables truly delay subscribers but very few ms compared to hundreds of ms specified
    Dorus
    @Dorus
    @nmarasoiu I tried to set up what you describe, and it seems to work more or less.
    Really bad code, but i wacked it down very quickly.
    nicu marasoiu
    @nmarasoiu
    @Dorus Thanks! We solved the issue, it was in the test apparently..
    nicu marasoiu
    @nmarasoiu
    Hi, is it possible, when merging Observables, to delay some types of errors, but do not delay other types of errors?
    Tushar Mathur
    @tusharmath
    Use filter
    and then merge
    Dorus
    @Dorus
    I think he wants to delay onError
    nicu marasoiu
    @nmarasoiu
    probably a healthy solution is to incorporate the non-fatal "errors" as part of the value
    as value types, not as error types
    and remain with mergeDelayError
    Dorus
    @Dorus
    True, especially in RxJava where errors tend to 'skip ahead' and kill any value in front of them.
    nicu marasoiu
    @nmarasoiu
    Thanks