Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Abhinav Solan
    @eyeced
    n
    Dave Moten
    @davidmoten
    @samueltardieu that makes sense, looks good, is it doing the job for you?
    Samuel Tardieu
    @samueltardieu
    @davidmoten It seems to be, although I don't use it in production yet.
    Samuel Tardieu
    @samueltardieu
    It is now used in production in cgeo nightly builds.
    Justin Hall
    @wKovacs64
    Is there a way to clear the cache from a caching Observable (.cache()) once the cached value has been emitted (observed by a subscriber)?
    David Stemmer
    @weefbellington
    @wKovacs64 is there a reason you can't simply create a new Observable?
    Justin Hall
    @wKovacs64
    @weefbellington I might be able to, although it could be messy due to where the Observable lives vs. the Subscription actions. That might be what I have to do if there isn't a native way to clear it.
    Dave Moten
    @davidmoten
    @wKovacs64 I answered that on stack overflow a while back. See http://stackoverflow.com/questions/31733455/rxjava-observable-cache-invalidate/31738646#31738646
    Justin Hall
    @wKovacs64
    @davidmoten Thanks.
    Justin Hall
    @wKovacs64
    That post is actually the exact same scenario I'm in, heh.
    David Stemmer
    @weefbellington
    This message was deleted
    does anybody know what effect onNext will have on a PublishSubject after PublishSubject#onCompleted has been called?
    will the onNext item be emitted, will it fail to be emitted silently, or will the call fail with an exception?
    Dorus
    @Dorus
    Try it? If the subject is implemented properly, it should silently ignore the onNext().
    http://reactivex.io/RxJava/javadoc/rx/subjects/PublishSubject.html#onNext%28T%29
    onNext [...] The Observable will not call this method again after it calls either Observer.onCompleted() or Observer.onError(java.lang.Throwable).
    Ilya Arkhanhelsky
    @iarkhanhelsky

    Hi, I have some flow which I can't resolve with provided operations. Or I just don't see something important.
    I have sequence A which produces sequence B. Then later I combine them with combineLatest operator, and get two notifies on each A update. But I want one and latest.
    Short illustration:

            Observable<Long> o1 = Observable.interval(1, TimeUnit.SECONDS).map(x -> x + 1);
            Observable<Long> o2 = o1.map(v -> -v);
            Observable.combineLatest(o1, o2, (v1, v2) -> "o1: " + v1 + ", o2: " + v2)
                   .subscribe(System.out::println);

    Provides output like this:

    o1: 1, o2: -1
    o1: 1, o2: -2
    o1: 2, o2: -2
    o1: 3, o2: -2
    o1: 3, o2: -3
    o1: 3, o2: -4
    o1: 4, o2: -4
    o1: 4, o2: -5
    o1: 5, o2: -5
    Dorus
    @Dorus
    Try zip
    Ilya Arkhanhelsky
    @iarkhanhelsky

    Ok. I missed one detail. B can be merged with C

           Observable<Long> a = Observable.interval(1, TimeUnit.SECONDS).map(x -> x + 1);
            Observable<Long> b = a.map(v -> -v);
            Observable<Long> c = Observable.interval(500, TimeUnit.MILLISECONDS).map(x -> x + 1000);
            Observable.zip(a, Observable.merge(b, c), (v1, v2) -> "o1: " + v1 + ", o2: " + v2).subscribe(System.out::println);

    Then zip does wrong job too

    Dorus
    @Dorus
    zip a with b, combineLatest a with c, and then merge.
    Alternative, you could use debounce to filter out the double results when a and b both yield, at the risk of losing c when it yields too soon after b.
    oh, and i just realize, you need something like combineLatestRight. Does that exists in RxJava?
    Dorus
    @Dorus
    -> zip a with b, combineLatestRight a with c, and then merge.
    Ilya Arkhanhelsky
    @iarkhanhelsky
    works nice
    thanks a lot!
    Jason Martens
    @jasonmartens
    Hello All, Is there a takeEvery(N) operation? For instance, to take every Nth element from a stream?
    Maybe Scan?
    Jason Martens
    @jasonmartens
    I have a pre-existing set of timeseries data, and what I really want is Sample, but using the timestamps in the existing set
    Dorus
    @Dorus
    Use Where and then timestamp mod 1 min < 10s. Or if you want to base it on the previous accepted element, use scan to retain that element (and use distinct or so to filter our duplicates). You can also take 1 skip 9 with buffer(1,10).
    Thats what i can come up with in 15 seconds ;)
    Jason Martens
    @jasonmartens
    phew, thanks for the ideas. ;-)
    Dorus
    @Dorus
    I'm not 100% sure if this is possible, but i suppose it should be, but you can also use window or groupBy to combine all elements within 1 minute and then only take the first N elements from the resulting sequences. Doing that with live data is easy, but i'm sure it's possible with pre-existing timestamp data too.
    Dorus
    @Dorus
    @iarkhanhelsky I remember again how to do combineLatestRight. You can use
     Observable.switch(a.map(v1 -> Observable.merge(b, c).map(v2 -> "o1: " + v1 + ", o2: " + v2))
    Ilya Arkhanhelsky
    @iarkhanhelsky
    I have another question. If I map something to something I do map for each signal and each subscription. It's ok in sense of immutability. But what is the propper way of maping and subscribeing signals with statefull objects in RX way?
    Dorus
    @Dorus
    Scan + Publish?
    It does depend on what state you had in mind
    Ilya Arkhanhelsky
    @iarkhanhelsky
    I map some stateless object to statefull container. Then (in way I do it now) i make 2 subscriptions. 1st subscription updates object (in pair with another observable), 2nd reads state (in pair with completly different observable)
    Dorus
    @Dorus
    Ok i didn't understand taht
    Subscriptions only receive events, they do not update the source.
    Ilya Arkhanhelsky
    @iarkhanhelsky
            PublishSubject<Integer> seq = PublishSubject.create();
    
            Observable<Integer> seq2= seq.map(x ->{
                timesCalled += 1;
                return x * 2;
            });
    
    
            seq2.subscribe(e -> System.out.println("Hello 1: " + e));
            seq2.subscribe(e -> System.out.println("Hello 2: " + e));
    
            seq.onNext(1);
    
            System.out.println("Times map called: " + timesCalled);
    This code shows that map in this case called twice. (Is that a error or not?). And I get 2 different objects in each subscription, expected same.
    If it helps to understand my problem
    Dorus
    @Dorus
    Yes that's expected behavior
    I assume timesCalled is declared outside the scope of map, that's where the problem lies.
    Unless that was intended
    Ilya Arkhanhelsky
    @iarkhanhelsky
    No, that's for example
    Dorus
    @Dorus
    You can use Observable.Create to be able to declare a local variable instead
    Or publish seq2 so that side effects are shared between the observables instead.
    Ilya Arkhanhelsky
    @iarkhanhelsky
    It works. But as far as I understand doc, publish creates ConnectableObservable. Which waits connect before emitting somethig. Sounds like not really aimed at current problem, right? Or I get it wrong? Or I need publish just to create new observable which keeps needed objects.
    Until I map it again by the way
    Dorus
    @Dorus
    You can use refCount, or do both subscriptions and then connect() yes.
    Simon Baslé
    @simonbasle
    hi guys, can someone shed some light on how flatMap(Func1<T, Observable<R>> f, int maxConcurrent) works?