Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    That was me writing in C# style before :sweat_smile:
    Abhinav Solan
    @eyeced
    Using the previous method I am able to print out all numbers .. using it merge is showing up o nly 90 elements from the 100
    Dorus
    @Dorus
        public class App {
            private Observer<Observable<Integer>> ob;
    
            public App() {
                PublishSubject<Observable<Integer>> sub = PublishSubject.create();
                rx.Observable.merge(sub, 10).subscribe(System.out::println);
                ob = sub;
            }
    
            public void put(Observable<Integer> t) {
                ob.onNext(t);
            }
    
            public static void main(String[] args) {
           App app = new App();
            Observable.range(1, 100).subscribe(integer -> {
                app.put(Observable.just(integer));
            });
            }
        }
    That works for me
    Abhinav Solan
    @eyeced
    yup .. seems instead of buffer merge with count would work here
    Dorus
    @Dorus
    public class App {
        private Observer<Observable<Integer>> ob;
    
        public App() {
            PublishSubject<Observable<Integer>> sub = PublishSubject.create();
            rx.Observable.merge(sub).buffer(10).subscribe(System.out::println);
            ob = sub;
        }
    
        public void put(Observable<Integer> t) {
            ob.onNext(t);
        }
    
        public static void main(String[] args) {
            App app = new App();
            Observable.range(1, 100).subscribe(integer -> {
                app.put(Observable.just(integer));
            });
        }
    }
    Works also
    Abhinav Solan
    @eyeced
    yes .. thanks a lot @Dorus
    Dorus
    @Dorus
    No problem. I finally did something with RxJava. yay :D
    Abhinav Solan
    @eyeced
    I got to learn Subjects yay
    Dorus
    @Dorus
    nah, avoid them like the plague :)
    observe how i kept the subject as hidden as possible in my snipped
    Abhinav Solan
    @eyeced
    yes .. going to only use that like that only
    Dorus
    @Dorus
    Most of the time there are methods that do the same but hide the underlying subject, always prefer those above actual subjects. For example .replay() instead of replaySubject. (Again C# stuff here, but i'm sure RxJava has something similar).
    This case we had here is pretty much the only one where i dont know how to avoid the subject.
    Abhinav Solan
    @eyeced
    replay is there
    Dorus
    @Dorus
    oh btw, my example loses the Subscription from subscribe(). You probably want to store that one somewhere. Again not something i'm overly familiar with (but in Rx.Net we tend to keep track of our IDisposables)
    Dorus
    @Dorus
    @eyeced Another thing i was thinking about: I'm not 100% sure how safe it is for merge to have onNext called from multiple threads. Might need to use serialize() -> rx.Observable.merge(sub.serialize())just to be totally safe.
    Abhinav Solan
    @eyeced
    Ahh yes have to use that also thanks @Dorus
    David Stemmer
    @weefbellington
    this is a pretty active gitter channel -- are there any other Java/Android channels that get a lot of traffic?
    Matt Langston
    @mattblang
    @weefbellington #android-dev on freenode IRC is extremely active
    David Stemmer
    @weefbellington
    I've heard that there's an android-study-group slack channel that's pretty active too...can't seem to find somebody who can get me an invite though
    @mattblang I haven't used IRC in years I guess I'll find a new client :D
    Abhinav Solan
    @eyeced
    This is really good stuff .. covers almost everything in Rx with examples on where to use https://github.com/Froussios/Intro-To-RxJava
    arman yessenamanov
    @yesenarman

    Hello, everyone.
    Can someone please explain why do the following two snippets of code behave differently?

    Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber))
        .flatMap(
            b -> Observable.<Integer>create(subscriber -> {
                   subscriber.onNext(1);
                   subscriber.onCompleted();
            }).subscribeOn(Schedulers.io()) // to imitate async request
        )
        .subscribe(System.out::println);
    Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted))
        .flatMap(
            b -> Observable.<Integer>create(subscriber -> {
                   subscriber.onNext(1);
                   subscriber.onCompleted();
            }).subscribeOn(Schedulers.io()) // to imitate async request
        )
        .subscribe(System.out::println);

    When I run the first one it doesn’t print anything to the console, but the second one prints 1 as expected.

    Aaron Tull
    @stealthcode
    @yesenarman your first example has a race condition. When your observable chain subscribes to the subscribeOn operator a new thread on the io scheduler subscribes upward to the flatmap but your main thread returns from the subscribe and terminates the program. hence why you don't see it print. The other thread hasn't onNexted yet.
    you can fix the race condition by using a TestSubscriber and calling awaitTerminalEventAndUnsubscribeOnTimeout(int, TimeUnit) or simply adding a countdown latch and awaiting after your subscribe
    Aaron Tull
    @stealthcode
    @yesenarman oh and in production code you can call .toBlocking () to get access to operators that await a terminal event.
    Samuel Tardieu
    @samueltardieu
    Hi. Is there an equivalent to delaySubscription but for unsubscription? I'd like to be able to have the subscriber be effectively unsubscribed, but the observable should not be unsubscribed immediately (and sent data should be sent to oblivion). Use case: a GPS observable that takes time to acquire a fix initially that is share()d. A delayed unsubscription would let the GPS active for a few seconds in case another subscriber gets interested fast enough. I guess that could be a variant of refCount() too, that would take a delay before unsubscribing.
    Samuel Tardieu
    @samueltardieu
    (I cooked up an operator for doing this if anyone is interested: https://gist.github.com/samueltardieu/12db44dc01340f22e9da)
    Dave Moten
    @davidmoten
    @samueltardieu I had a look, don't you want the scheduled unsubscribe to be cancelled on a new subscription as well?
    Samuel Tardieu
    @samueltardieu
    @davidmoten It doesn't matter if this occurs after a refCount(), since it won't unsubscribe if a new subscription has arrived in the meantime as the subscriber count will not go down to zero.
    (I can use it as newObservable = observable.share().lift(new DelayedUnsubscription(5, TimeUnit.SECONDS)), and I then subscribe to newObservable)
    Abhinav Solan
    @eyeced
    n
    Dave Moten
    @davidmoten
    @samueltardieu that makes sense, looks good, is it doing the job for you?
    Samuel Tardieu
    @samueltardieu
    @davidmoten It seems to be, although I don't use it in production yet.
    Samuel Tardieu
    @samueltardieu
    It is now used in production in cgeo nightly builds.
    Justin Hall
    @wKovacs64
    Is there a way to clear the cache from a caching Observable (.cache()) once the cached value has been emitted (observed by a subscriber)?
    David Stemmer
    @weefbellington
    @wKovacs64 is there a reason you can't simply create a new Observable?
    Justin Hall
    @wKovacs64
    @weefbellington I might be able to, although it could be messy due to where the Observable lives vs. the Subscription actions. That might be what I have to do if there isn't a native way to clear it.
    Dave Moten
    @davidmoten
    @wKovacs64 I answered that on stack overflow a while back. See http://stackoverflow.com/questions/31733455/rxjava-observable-cache-invalidate/31738646#31738646
    Justin Hall
    @wKovacs64
    @davidmoten Thanks.
    Justin Hall
    @wKovacs64
    That post is actually the exact same scenario I'm in, heh.
    David Stemmer
    @weefbellington
    This message was deleted
    does anybody know what effect onNext will have on a PublishSubject after PublishSubject#onCompleted has been called?
    will the onNext item be emitted, will it fail to be emitted silently, or will the call fail with an exception?
    Dorus
    @Dorus
    Try it? If the subject is implemented properly, it should silently ignore the onNext().
    http://reactivex.io/RxJava/javadoc/rx/subjects/PublishSubject.html#onNext%28T%29
    onNext [...] The Observable will not call this method again after it calls either Observer.onCompleted() or Observer.onError(java.lang.Throwable).
    Ilya Arkhanhelsky
    @iarkhanhelsky

    Hi, I have some flow which I can't resolve with provided operations. Or I just don't see something important.
    I have sequence A which produces sequence B. Then later I combine them with combineLatest operator, and get two notifies on each A update. But I want one and latest.
    Short illustration:

            Observable<Long> o1 = Observable.interval(1, TimeUnit.SECONDS).map(x -> x + 1);
            Observable<Long> o2 = o1.map(v -> -v);
            Observable.combineLatest(o1, o2, (v1, v2) -> "o1: " + v1 + ", o2: " + v2)
                   .subscribe(System.out::println);

    Provides output like this:

    o1: 1, o2: -1
    o1: 1, o2: -2
    o1: 2, o2: -2
    o1: 3, o2: -2
    o1: 3, o2: -3
    o1: 3, o2: -4
    o1: 4, o2: -4
    o1: 4, o2: -5
    o1: 5, o2: -5
    Dorus
    @Dorus
    Try zip