Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    dwursteisen
    @dwursteisen
    (ie: I expect more a cache than a buffer -> if I can to avoid a List<Item>)
    dwursteisen
    @dwursteisen
    our current solution :
    public <T1, T2, R> Observable.Transformer<T1, R> waitToCombineWith(Observable<T2> other, Func2<T2, T1, R> lambda) {
            return source -> {
                Observable<T1> s = source.replay(1).autoConnect();
                return Observable.concat(other, Observable.never()).buffer(s)
                                 .flatMapIterable(i -> i)
                                 .withLatestFrom(s, lambda);
            };
        }
    => force buffering (concat obs + never) until the other obs finish (buffer(s)) then compose
    Dorus
    @Dorus
    wait, now you do withLatestFrom(source), wasn't that suposed to be withLatestFrom(other)?
    I'm also not sure what you get out of .buffer(s)
    autoConnect seems dangerous here in case source yields 2 items right away.
    @dwursteisen
    dwursteisen
    @dwursteisen
    you mean replay(1) is dangerous, no ?
    Dorus
    @Dorus
    no i mean autoConnect, i prefer to do .publish(); subscribe(); .connect() like i did above.
    because you subscribe multiple times, the second subscription might miss out on items if the source yields really quick.
    Mmm thinking about it, that's not really an issue here. biggest thing is all values from other are stored in the buffer until source yields. While that's only required the very first time.
    Do i also understand it right that you intend source to be wait and other to be data?
    dwursteisen
    @dwursteisen
    We just uptated our code, as it wasn't clear enought. See bellow :
     public <T1, T2, R> Observable.Transformer<T1, R> waitToCombineWith(Observable<T2> other, Func2<T1, T2, R> lambda) {
            return source -> {
                Observable<T2> s = other.replay(1).autoConnect();
                return Observable.concat(source, Observable.never()).buffer(s)
                                 .flatMapIterable(i -> i)
                                 .withLatestFrom(s, lambda);
            };
        }
     Observable<Long> wait = Observable.timer(1, TimeUnit.SECONDS)
                                                .doOnSubscribe(() -> System.out.println("PERFORM NETWORK CALL"))
                                                .doOnNext(i -> System.out.println("NETWORK RESPONSE"));
    
            Observable<String> data = Observable.just(1, 2, 3, 4, 5)
                                                 .doOnNext(System.out::println)
                                                 .compose(waitToCombineWith(wait, (a, b) -> a + " -> " + b));
    
            data
                    .subscribe(System.out::println);
    Dorus
    @Dorus
    @dwursteisen I understand why you use .never(), but right now your source wont ever complete even when both wait and data have completed right? That's a bit of a leak.
    dwursteisen
    @dwursteisen
    It will complete after the last buffer
    Vadym A. Dragan
    @ardoramor
    Hello! Can anyone suggest an approach to the following. I am collecting several sensors in android and also location points. Location is collected less frequently. I'm trying to figure out if there is an operator that would allow me to buffer other sensor data between location onNext emissions. This buffer operator would be applied to each sensor, then calculate min, max, avg and be combined with the location data.
    I know that there are buffer and a few other operators like window but they all rely upon time segment
    This has a bit of a different requirement
    Dorus
    @Dorus

    @ardoramor Something like

    Observable.zip(
      location,
      sensor1.window(location).flatMap(window.reduce(...)),
      sensor2.window(location).flatMap(window.reduce(...)),
      (location, sensor1, sensor2) -> ...)

    ?
    Might be a good idea to add a little extra logic that ensures every window as at least some data, like the previouse value if the current window is empty.

    Vadym A. Dragan
    @ardoramor
    @Dorus wow, that's brilliant! I missed it because I'm coming from rxjs and remembered only one flavor of window. Thank you!
    Dorus
    @Dorus
    RxJs has the same flavours
    Vadym A. Dragan
    @ardoramor
    I meant that I used only one and it got stuck in my head XD
    Dorus
    @Dorus
    Its always good to learn new ticks :)
    Ozzy Osborne
    @BarDweller
    Hi.. I'm trying to build a custom Observable, using AsyncOnSubscribe.createStateless, and it seems to work.. (yay).. until I try to unsubscribe.. and my unsubscribe handler is never called.. I'm new to RXJava, so may have got this completely upside down.. but if I try something like Subscriber sub = myClass.getObserver().doOnUnsubscribe( () -> System.out.println("Fish") ).subscribe( s -> System.out.println(s) ); .. Fish is printed immediately.. which is odd, because my Observer is still emitting events.. calling sub.unsubscribe() does nothing.. when I'd expect it to invoke the unsubscribe action I passed as the 2nd arg to createStateless ...
    David Karnok
    @akarnokd
    @BarDweller could you post an unit test that demonstrates this problem?
    Ozzy Osborne
    @BarDweller
    Hi.. I think it's related to my lack of understanding of rxjava's threading model
    I had my asynconsubscribe 'call' method using another thread to obtain data, and the other thread invoked the observer.onNext .. instead now I have that perform the obtain of the data in the same thread that invoked call, and use Observable.subscribeOn(...) to unblock my subscriber
    so now .. if I create my Observable like... Observable<T> o = Observable.create(AsyncOnSubscribe.createStateless( myDataProvider) ).doOnUnsubScribe( ()->System.out.println("Fish").subscribeOn(Schedulers.from(myExecutor)) ... it all works.
    although most interestingly.. I cannot get an unsubscribe handler to work if I pass it as a 2nd arg to createStateless..
    so maybe I'm still doing something wrong there.. but then AsyncOnSubscribe is still @experimental
    Ozzy Osborne
    @BarDweller
    (although also the only real offered way to implement an observable with backpressure handling etc)
    David Karnok
    @akarnokd
    This test passes:
            Action0 action = mock(Action0.class);
    
            TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
    
            AsyncOnSubscribe.createStateless(new Action2<Long, Observer<Observable<? extends Integer>>>() {
                @Override
                public void call(Long t1, Observer<Observable<? extends Integer>> t2) {
                    t2.onCompleted();
                }
            }, action).call(ts);
    
            verify(action).call();
    Ozzy Osborne
    @BarDweller
    I wrote myself a testcase, convinced it wasn't working this morning.. the test case passed (you'll be glad to hear) .. my real code still appeared to fail.. I dug further.. here are the results of the canadian jury.. ;p
    Ozzy Osborne
    @BarDweller
    If you build an AsyncOnSubscribe via createStateless and do not pass an unsubscribe hook, but instead wire up the same hook via Observable.create(AsyncOnSubscribe.createStateless(callback)).doOnUnsubscribe(unsubscribeHook) .. then as soon as a user invokes unsubscribe, you get your callback, and you can do whatever you need to unblock / free up the 'callback' function that may be looping calling a blocking fn with a timeout, and testing a 'do we still need to do this' flag..
    But.. if you take that same unsubscribe hook, and instead pass it as part of the createStateless call like.. Observable.create(AsyncOnSubscribe.createStateless(callback,unsubscribeHook)) Then your users unsubscribe invocations will not be passed on to your unsubscribeHook until your blocking call method has returned.. (which if you were relying on the unsubscribe to shut it down.. may happen a lot 'later')
    That's what was confusing me so much yesterday, because my callback is polling for messages from a queue, which are infrequent.. so most of my unsubscribe manual experiments never saw unsubscribe called, because the blocking call to the callback never returned.
    As an aside, for an Observer like this (I think it's a hot observer, as it always emits events) .. my usage pattern is pretty much to just subscribe/unsubscribe from the observable.. does onCompleted have any meaning in this scenario? the underlying messages never have an 'ending' as such.
    David Karnok
    @akarnokd
    Okay, I see. The AsyncOnSubscribe does in-sequence unsubscription, as you specified. I can't really tell if this can be changed to independent unsubscription or not; since these are user-provided functions, an async cancel may interfere with a concurrently blocking/running generator callback. The operator using, as far as I remember, does anytime unsubscription to allow, for example, closing an InputStream and break the blocking on its read().
    Vadym
    @dragantl
    Can anyone propose a more elegant way of keeping track of emission count for observable than below? I need to emit a structure along with its sequence counter so that it looks like so (it eventually gets serialized to json):
       {
          count: 5,
          data: {...}
       }
    Observable<Data> dataObservable = DataProvider.getDataObservable();
    // Should I use something like Observable.just(1).window(dataObservable).flatMap(window -> ...).scan(0, (sum, value) -> sum + value);
    Observable<Integer> countObservable = dataObservable.map(data -> 1).scan(0, (sum, value) -> sum + value);
    
    Observable<Structure> structureObservable = Observable.zip(dataObservable, countObservable, (data, count) -> new Structure(count, data));
    
    structureObservable.subscribe(structure -> {
       ...
    });
    Simon Baslé
    @simonbasle
    @dragantl is the dataObservable bounded? maybe you could simply zip it with an Observable.range(1, Integer.MAX_VALUE) (or whatever max expected length you might have)?
    Lalit Maganti
    @tilal6991
    @dragantl https://github.com/davidmoten/rxjava-extras - mapWithIndex may come in handy
    Dorus
    @Dorus
    @dragantl I filled #3602 before but it was rejected. So you have to stick with the zip thing.
    Vadym
    @dragantl
    @simonbasle, @tilal6991, @Dorus thank you very much! It is a bound observable. Also, thank so pointing me to the rxjava-extras and the issue.
    Vadym
    @dragantl
    @Dorus wanted to refer back to your answer https://gitter.im/ReactiveX/RxJava?at=572913d30149d6bb04b86b7b, I've been working on this for some time today, trying to figure out one issue, which you had pointed out, with dealing of empty values. That is, more specifically, location emits every second. Some sensors emit fast and usually result in a case that window will not be empty. However, some sensors emit less frequently. This results in an empty window. An empty window then results in Observable.zip not emitting until the slow sensor emits again
    Sorry, I'm not sure how to link directly to the message timestamp
    What I've been trying to do was to get a hold of the latest emission but i keep getting into situation where nothing good comes of it >_< Right now, I'm looking into BlockingObservable.latest()
    Dorus
    @Dorus
    @dragantl Might have time to look at this tomorrow :)
    Vadym
    @dragantl
    @Dorus oh man, I think withLatestFrom would have solved it but it looks like RxJava only implemented combination with only one observable and not variable arguments like RxJS has :(
    Vadym
    @dragantl

    @Dorus I think I've got it but I'd still love to hear your thoughts. I've created a SensorHolder object that holds the found sensor types that depend upon location to be triggered. I do the following:

    Observer<SensorHolder> sensorObserver = Observer.combineLatest(
       sensor1, sensor2, sensor3, sensor4, SensorHolder::new);
    
    Observer<SensorPoint> pointerObserver = location.combineWithLatest(sensorObserver, (location, holder) -> new SensorPoint(
       location, holder.sensor1, holder.sensor2, holder.sensor3, holder.sensor4));

    That seems to work pretty well. Shame for the useless holder though but at least it works. Do you think there is a way to optimize it further?

    Dorus
    @Dorus
    @dragantl If you need multiple withlatestfrom, do something like source.zip(source.withLatestFrom(b, (a, b) => b), source.withLatestFrom(c, (a, c) => c))), zip does support a variable number of arguments right? (sorry, getting creative here)\
    Or, source.withLatestFrom(b.withLatestFrom(c.withLatestFrom(d)))?