Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    If you work with the previous mentioned PublishSubject<Observable<T>>, you can just call PublishSubject.OnNext(t)inside put. Do first subscribe to the PublishSubject with PublishSubject.asObservable().Merge().Subscribe(...) (doesn't strictly need the asObservable if you subscribe locally). Just make sure you don't expose the Subject anywhere.
    Abhinav Solan
    @eyeced
    PublishSubject seems the way ..
    Dorus
    @Dorus
    The general concession is to avoid Subjects, but i still haven't found a way to avoid them in these situations, just as long as you keep the Subject private and local it shouldn't become a mess.
    Dorus
    @Dorus
    I mean, i would love to call Observer.Create(), but that doesn't allow you to call merge anymore. So you kinda need
    private Observer ob;
    public constructor() {
        PublishSubject<Observable<T>> sub = PublishSubject.create();
        sub.merge().subscribe(...);
        ob = sub.asObserver();
    }
    public void put(Observable<T> t) {
        ob.onNext(t);
    }
    (Please anybody correct me if i'm wrong)
    Abhinav Solan
    @eyeced
    Thanks @Dorus this worked beautifully here is the implementation I did

    public class SubjectTest {
    private static final SubjectTest INSTANCE = new SubjectTest();
    private PublishSubject<Integer> subject;

    public SubjectTest() {
        subject = PublishSubject.create();
        subject.buffer(10).subscribe(System.out::println);
    }
    
    void put(Observable<Integer> obs) {
        obs.subscribe(integer -> subject.onNext(integer));
    }
    
    public static void main(String[] args) {
        ExecutorService executor = Executors.newWorkStealingPool(10);
        Observable.range(1, 100).subscribe(integer -> {
            executor.execute(() -> INSTANCE.put(Observable.just(integer)));
        });
    }

    }

    Dorus
    @Dorus
    you're missing a few spaces in front.
    (you can edit)
    Anyway looks good, but arn't you missing a merge() before buffer?
    Abhinav Solan
    @eyeced
    in RxJava there is no merge() call for the subject
    Dorus
    @Dorus
    really?
    Abhinav Solan
    @eyeced
    There is a mergeWith(Observable<T> t) but I don't think that's what you meant
    Dorus
    @Dorus
    http://reactivex.io/RxJava/javadoc/rx/subjects/PublishSubject.html -> Methods inherited from class rx.Observable -> merge.
    no i'm looking for the merge() that goes from Observable<Observable<Integer>> to Observable<Integer>. Mmm
    I do see merge(Observable<? extends Observable<? extends T>> source), but i'm not 100% sure how you call it.
    Abhinav Solan
    @eyeced
    ohh that's on class level only
    Dorus
    @Dorus
    The docs i'm reading here are pretty clear you can do merge() on a observable of observables. Did you find it?

    http://reactivex.io/documentation/operators/merge.html

    Instead of passing multiple Observables (up to nine) into merge, you could also pass in a List<> (or other Iterable) of Observables, an Array of Observables, or even an Observable that emits Observables, and merge will merge their output into the output of a single Observable:
    If you pass in an Observable of Observables, you have the option of also passing in a value indicating to merge the maximum number of those Observables it should attempt to be subscribed to simultaneously. Once it reaches this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.

    Javadoc: merge(Observable<Observable>)
    Javadoc: merge(Observable<Observable>,int)

    Abhinav Solan
    @eyeced
    Ok let me try it using this then .. but from the code snippet you send out you were calling merge only on the created subject instance only .. these are static methods not for the created instance
    Dorus
    @Dorus
    In C# it's a extension method. Those are static but work on instances. Not 100% sure how that flies in Java, don't have it set up here to test.
    Dorus
    @Dorus
    I think it's rx.Observable.merge(subject).buffer(10).subscribe(System.out::println);
    Indeed, just a static call.
    That was me writing in C# style before :sweat_smile:
    Abhinav Solan
    @eyeced
    Using the previous method I am able to print out all numbers .. using it merge is showing up o nly 90 elements from the 100
    Dorus
    @Dorus
        public class App {
            private Observer<Observable<Integer>> ob;
    
            public App() {
                PublishSubject<Observable<Integer>> sub = PublishSubject.create();
                rx.Observable.merge(sub, 10).subscribe(System.out::println);
                ob = sub;
            }
    
            public void put(Observable<Integer> t) {
                ob.onNext(t);
            }
    
            public static void main(String[] args) {
           App app = new App();
            Observable.range(1, 100).subscribe(integer -> {
                app.put(Observable.just(integer));
            });
            }
        }
    That works for me
    Abhinav Solan
    @eyeced
    yup .. seems instead of buffer merge with count would work here
    Dorus
    @Dorus
    public class App {
        private Observer<Observable<Integer>> ob;
    
        public App() {
            PublishSubject<Observable<Integer>> sub = PublishSubject.create();
            rx.Observable.merge(sub).buffer(10).subscribe(System.out::println);
            ob = sub;
        }
    
        public void put(Observable<Integer> t) {
            ob.onNext(t);
        }
    
        public static void main(String[] args) {
            App app = new App();
            Observable.range(1, 100).subscribe(integer -> {
                app.put(Observable.just(integer));
            });
        }
    }
    Works also
    Abhinav Solan
    @eyeced
    yes .. thanks a lot @Dorus
    Dorus
    @Dorus
    No problem. I finally did something with RxJava. yay :D
    Abhinav Solan
    @eyeced
    I got to learn Subjects yay
    Dorus
    @Dorus
    nah, avoid them like the plague :)
    observe how i kept the subject as hidden as possible in my snipped
    Abhinav Solan
    @eyeced
    yes .. going to only use that like that only
    Dorus
    @Dorus
    Most of the time there are methods that do the same but hide the underlying subject, always prefer those above actual subjects. For example .replay() instead of replaySubject. (Again C# stuff here, but i'm sure RxJava has something similar).
    This case we had here is pretty much the only one where i dont know how to avoid the subject.
    Abhinav Solan
    @eyeced
    replay is there
    Dorus
    @Dorus
    oh btw, my example loses the Subscription from subscribe(). You probably want to store that one somewhere. Again not something i'm overly familiar with (but in Rx.Net we tend to keep track of our IDisposables)
    Dorus
    @Dorus
    @eyeced Another thing i was thinking about: I'm not 100% sure how safe it is for merge to have onNext called from multiple threads. Might need to use serialize() -> rx.Observable.merge(sub.serialize())just to be totally safe.
    Abhinav Solan
    @eyeced
    Ahh yes have to use that also thanks @Dorus
    David Stemmer
    @weefbellington
    this is a pretty active gitter channel -- are there any other Java/Android channels that get a lot of traffic?
    Matt Langston
    @mattblang
    @weefbellington #android-dev on freenode IRC is extremely active
    David Stemmer
    @weefbellington
    I've heard that there's an android-study-group slack channel that's pretty active too...can't seem to find somebody who can get me an invite though
    @mattblang I haven't used IRC in years I guess I'll find a new client :D
    Abhinav Solan
    @eyeced
    This is really good stuff .. covers almost everything in Rx with examples on where to use https://github.com/Froussios/Intro-To-RxJava
    arman yessenamanov
    @yesenarman

    Hello, everyone.
    Can someone please explain why do the following two snippets of code behave differently?

    Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber))
        .flatMap(
            b -> Observable.<Integer>create(subscriber -> {
                   subscriber.onNext(1);
                   subscriber.onCompleted();
            }).subscribeOn(Schedulers.io()) // to imitate async request
        )
        .subscribe(System.out::println);
    Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted))
        .flatMap(
            b -> Observable.<Integer>create(subscriber -> {
                   subscriber.onNext(1);
                   subscriber.onCompleted();
            }).subscribeOn(Schedulers.io()) // to imitate async request
        )
        .subscribe(System.out::println);

    When I run the first one it doesn’t print anything to the console, but the second one prints 1 as expected.

    Aaron Tull
    @stealthcode
    @yesenarman your first example has a race condition. When your observable chain subscribes to the subscribeOn operator a new thread on the io scheduler subscribes upward to the flatmap but your main thread returns from the subscribe and terminates the program. hence why you don't see it print. The other thread hasn't onNexted yet.
    you can fix the race condition by using a TestSubscriber and calling awaitTerminalEventAndUnsubscribeOnTimeout(int, TimeUnit) or simply adding a countdown latch and awaiting after your subscribe