Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Alex Reisberg
    @a-reisberg
    ah, that makes sense
    but the main thing is still confusing
    wait, but that's still weird, since the first subscribe is also from a just, but maybe the oncomplete doesn't have time to run yet
    Dorus
    @Dorus
    yeah, serializedsubject probably tries to put all onNext msgs trough before it calls onCompleted.
    Thats probably why everything runs on main. Main is the first thing that hits the publishSubject.
    What if you subscribe to the PublishSubjectinstead?
    Alex Reisberg
    @a-reisberg
    why doesn't it put everything on the computation thread instead?
    Dorus
    @Dorus
    object Launcher extends App {
      val pubSub = PublishSubject.create[Int];
      val serSub = new SerializedSubject(pubSub)
      val observable = Observable.just(1, 2).subscribeOn(Schedulers.computation())
    
      serSub.subscribe(new Action1[Int] {
        override def call(t: Int): Unit =
          println(s"Subject subscribeOn $t, ${Thread.currentThread().getName}")
      })
    
      observable.subscribe(pubSub)
    
      Observable.just(6).subscribe(pubSub)
    
      while (true) {}
    }
    Alex Reisberg
    @a-reisberg
    then it behaves as expected
    run on different threads
    Dorus
    @Dorus
    aaah
    Alex Reisberg
    @a-reisberg
    that makes sense right? it's just that there's some magic of SerializedSubject that confuses me
    Dorus
    @Dorus
    I think that's how you need to use SerializedSubject. The serSub is for subscribing and the pubSub for calls to onNext.
    Alex Reisberg
    @a-reisberg
    oh, I misread your code, let me rerun it
    yeah, that's correct
    weird!
    why isn't there a place that wrote this lol
    thanks man. that's great! :)
    Dorus
    @Dorus
    i'm not sure. Subjects are confusing. I usually tend to avoid them.
    Even in this case i'm still in doubt if it's a bug or if we're doing something wrong.
    Alex Reisberg
    @a-reisberg
    :/
    so what I'm trying to achieve is to create an eventbus
    using Rx
    what's the alternative?
    Dorus
    @Dorus
    @a-reisberg i'm back. For one thing i would only expose the observer or the observable. Preferably have a static entry point for your events and if you really do need to expose both, expose one half as an observer and another as observable.
    You really do not want an subject that's reachable all over your code, that would be a huge global state and quickly degenerate into spaghetti code.
    Alex Reisberg
    @a-reisberg
    @Dorus Thanks. To extract an Observable you can do subject.asObservable. How about extracting the Observer?
    Dorus
    @Dorus
    @a-reisberg Options are limited, either cast it to Observer, or my favorite, expose the onNext method trough a (static) function.
    Samuel Tardieu
    @samueltardieu
    Any idea on how to do elegantly what I describe in https://github.com/cgeo/cgeo/issues/5533#issuecomment-197284324 with RxJava?
    We have a list of geocaches that we want to refresh (in a geocaching application), and checking if they do have static maps stored requires a disk access. But we want to favor refresh of caches without static maps because the Google Maps API quota is quite low, and we want to give a chance to those caches. And we want to do all that with a minimum delay, and limit disk accesses if the user cancels the operation. With Akka streams, it would be very easy to build a graph as described with the Graph DSL. With RxJava, it looks less straightforward.
    Edoardo Vacchi
    @evacchi
    Hi all; problem: I have a (remote) datasource represented as an iterator that I want to share between different computations; I also need that each of these computations consume the data at their own pace, dropping on backpressure; in other words, the iterator should be consumed at maximum speed (the iterator itself will synchronously block when no data is available). This is what I've come up with https://gist.github.com/evacchi/cf75d4199a8d6af285f6
    I expected to see 1) and 2) produce different output, instead they seem to be running round-robin at the speed of the slow computation...
    (rx java 2.x branch)
    dwursteisen
    @dwursteisen

    @samueltardieu

     obs = geoCaches.share();
    withMap = obs.filter(c -> c.withStaticMap()).buffer(5);
    withoutMap = obs.filter(c -> c.withoutStaticMap()).buffer();
    result = withMap.mergeWith(withoutMap);

    Something like this ?

    Samuel Tardieu
    @samueltardieu
    @dwursteisen Will mergeWith prefer withMap over withoutMap?
    Also, I can't filter as easily as the information with/without static maps is not in the geocache object. It requires a disk access, so that would be: geocaches -> (geoCache, hasStaticMaps?) -> cache -> filter…
    (first enrich the geocache with the static maps information to do it only once)
    But the important think is how backpressure and input preference is handled by mergeWith. In Akka streams, there is an explicit mergePreferred which favours one entry over the others.
    dwursteisen
    @dwursteisen
    @samueltardieu : mergeWith will subscribe sequentially I think. So I will first emit items from withMap.
    For the backpressure part, I don't know
    Samuel Tardieu
    @samueltardieu
    @dwursteisen Yes I know, but it won't do what I want: I want the first source to be preferred if elements are available, and the other one to be used if not. The idea is to have one of the source be a priority source. Backpressure is important here, as we have to wait for the subscriber to request elements before looking if the priority source has some to give.
    For example, in my case, the subscriber does network requests, and only a few of them can be done in parallel. As soon as a slot is available because a previous request has terminated, then the subscriber will request a new element from upstream to start a new request. That's when I want to use the priority source if it has an element ready, or the other one otherwise.
    dwursteisen
    @dwursteisen
    hum; ok.
    if elements are available : when do you know it's available ? Have you check switch operators ?
    Samuel Tardieu
    @samueltardieu
    I should not have to know. Look at the definition of mergePreferred in Akka streams: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stages-overview.html#mergepreferred
    When downstream (the subscriber) requests an element, this request is forwarded to the various sources (observables). If several sources have an element ready, then the one from the preferred source is sent to the subscriber (and another one is queried for later). Elements from the non-preferred sources are only sent when no elements from the preferred source are available and the subscriber requests some.
    It's not easy to explain when we are at the boundary of two systems with different vocabularies :-) We have source/observable, flow/operator, sink/subscriber
    dwursteisen
    @dwursteisen
    ok. I see the problem. But I don't know (yet) how to write it using RxJava.
    hum. Your (prefered) Observable should be the first to emit an item, in fact ?
    prefered.mergeWith(other) will (defacto) emit items from prefered as it be subscribed first. It should be pretty closed to your description ? (but the mergefrom Akka can't be done out of the box with Rx, I think, as Rx don't do random subscription order)