These are chat archives for ReactiveX/RxJava

13th
Aug 2015
Bartłomiej Hołota
@bholota
Aug 13 2015 05:52
Ok, thanks, I need to read more about it.
Abhinav Solan
@eyeced
Aug 13 2015 20:09
I am doing an insert into DB wihich are single value inserts mostly coming from numerous clients, these are around 50k values coming in from different sources every 3 seconds, I am not able to do a batch insert into the db. Is their a way with obsevables that before calling on insert I can combine all of these into a single observable stream .. and all other observables are pushing into this defined observable ?
David Stemmer
@weefbellington
Aug 13 2015 20:10
@eyeced how often do you want do do the batch insert?
is there something that triggers an end-of-stream -- like a sentinal value that says "this is the last thing to be inserted into the database"?
or some sort of event that would allow you to figure out when the batch insert should occur?
Abhinav Solan
@eyeced
Aug 13 2015 20:12
These are continuos values coming no end of stream
David Stemmer
@weefbellington
Aug 13 2015 20:15
@eyeced alright, so what I might try is sample -- should allow you to sample a continuous stream of values at a specified interval, returning the values since the last sample as a Collection
Abhinav Solan
@eyeced
Aug 13 2015 20:15
Should I create one observable like batcher in the class and then call on merge for all incoming observables and use the window / batch option with time for the batcher observable and keep on doing batch insert using that .. would this be a good solution ?
David Stemmer
@weefbellington
Aug 13 2015 20:16
alternatively you could try buffer which allows you split the observable stream into fixed-sized chunks, for example if you wanted to do a batch update each time 1000 events were queued up
looks like buffer supports sampling as well so that might be what you want
@eyeced ^
Abhinav Solan
@eyeced
Aug 13 2015 20:19
So is should I just create common observable and merge in all the incoming observables in that .. and use buffer with that ...
David Stemmer
@weefbellington
Aug 13 2015 20:19
yep
Abhinav Solan
@eyeced
Aug 13 2015 20:20
thanks a lot @weefbellington
David Stemmer
@weefbellington
Aug 13 2015 20:20
sure
Abhinav Solan
@eyeced
Aug 13 2015 20:21
I did a setup for cassandra cluster which was giving around 60k inserts performance .. when I used single insertes it came down 20k inserts .. I will try out with this suggestion .. this might put it back to original performance
60k / sec
David Stemmer
@weefbellington
Aug 13 2015 20:23
@eyeced you could use a PublishSubject as your source observable if you are OK with your observable being "hot"
each time you need to push an event to the stream, you would call PublishSubject#onNext
Abhinav Solan
@eyeced
Aug 13 2015 20:25
Ok i will try out with that too ..
David Stemmer
@weefbellington
Aug 13 2015 20:27
personally I don't recommend Subjects ("hot" observables) in the general case (they can be problematic because you can start pushing data to them before anything is subscribed to them) but for some scenarios they can be useful
Dorus
@Dorus
Aug 13 2015 20:27
I could swear Sample only returns the latest value (if any) every time interval, while Buffer and Window allow you to group all events in a certain timespan together (Probably buffer in this case, as that returns the collected values in a collection, while window keeps them in a nested observable so you would still need to aggregate them). But i'm coming from Rx.NET, so it might be different here with RxJava.
David Stemmer
@weefbellington
Aug 13 2015 20:29
@Dorus yeah looks like you're right, buffer is what he wants here
I usually use sample to manage backpressure when I don't care about receiving every update
Dorus
@Dorus
Aug 13 2015 20:30
yeah exactly
Dorus
@Dorus
Aug 13 2015 20:37
Also, combining streams is simply Merge, as long as the clients dont emit errors. I see there is even a mergeDelayError in case you want to wait for all other clients to complete first. If bulk insert from a single client is easier than combining events from different clients, do the buffer operation before you merge.
Or connect to the clients inside the sequence with flatMap.
Abhinav Solan
@eyeced
Aug 13 2015 20:51
I am exposing an API so I would not be able to control the client .. they are just going to call "put(Observable<T> t) " and in our case it would be just one value most of the time so while doing the insert in put call I want to merge all the incoming observables.
Dorus
@Dorus
Aug 13 2015 21:00
If you work with the previous mentioned PublishSubject<Observable<T>>, you can just call PublishSubject.OnNext(t)inside put. Do first subscribe to the PublishSubject with PublishSubject.asObservable().Merge().Subscribe(...) (doesn't strictly need the asObservable if you subscribe locally). Just make sure you don't expose the Subject anywhere.
Abhinav Solan
@eyeced
Aug 13 2015 21:07
PublishSubject seems the way ..
Dorus
@Dorus
Aug 13 2015 21:12
The general concession is to avoid Subjects, but i still haven't found a way to avoid them in these situations, just as long as you keep the Subject private and local it shouldn't become a mess.
Dorus
@Dorus
Aug 13 2015 21:22
I mean, i would love to call Observer.Create(), but that doesn't allow you to call merge anymore. So you kinda need
private Observer ob;
public constructor() {
    PublishSubject<Observable<T>> sub = PublishSubject.create();
    sub.merge().subscribe(...);
    ob = sub.asObserver();
}
public void put(Observable<T> t) {
    ob.onNext(t);
}
(Please anybody correct me if i'm wrong)
Abhinav Solan
@eyeced
Aug 13 2015 21:43
Thanks @Dorus this worked beautifully here is the implementation I did

public class SubjectTest {
private static final SubjectTest INSTANCE = new SubjectTest();
private PublishSubject<Integer> subject;

public SubjectTest() {
    subject = PublishSubject.create();
    subject.buffer(10).subscribe(System.out::println);
}

void put(Observable<Integer> obs) {
    obs.subscribe(integer -> subject.onNext(integer));
}

public static void main(String[] args) {
    ExecutorService executor = Executors.newWorkStealingPool(10);
    Observable.range(1, 100).subscribe(integer -> {
        executor.execute(() -> INSTANCE.put(Observable.just(integer)));
    });
}

}

Dorus
@Dorus
Aug 13 2015 21:46
you're missing a few spaces in front.
(you can edit)
Anyway looks good, but arn't you missing a merge() before buffer?
Abhinav Solan
@eyeced
Aug 13 2015 21:47
in RxJava there is no merge() call for the subject
Dorus
@Dorus
Aug 13 2015 21:47
really?
Abhinav Solan
@eyeced
Aug 13 2015 21:48
There is a mergeWith(Observable<T> t) but I don't think that's what you meant
Dorus
@Dorus
Aug 13 2015 21:48
http://reactivex.io/RxJava/javadoc/rx/subjects/PublishSubject.html -> Methods inherited from class rx.Observable -> merge.
no i'm looking for the merge() that goes from Observable<Observable<Integer>> to Observable<Integer>. Mmm
I do see merge(Observable<? extends Observable<? extends T>> source), but i'm not 100% sure how you call it.
Abhinav Solan
@eyeced
Aug 13 2015 21:51
ohh that's on class level only
Dorus
@Dorus
Aug 13 2015 21:53
The docs i'm reading here are pretty clear you can do merge() on a observable of observables. Did you find it?

http://reactivex.io/documentation/operators/merge.html

Instead of passing multiple Observables (up to nine) into merge, you could also pass in a List<> (or other Iterable) of Observables, an Array of Observables, or even an Observable that emits Observables, and merge will merge their output into the output of a single Observable:
If you pass in an Observable of Observables, you have the option of also passing in a value indicating to merge the maximum number of those Observables it should attempt to be subscribed to simultaneously. Once it reaches this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.

Javadoc: merge(Observable<Observable>)
Javadoc: merge(Observable<Observable>,int)

Abhinav Solan
@eyeced
Aug 13 2015 21:58
Ok let me try it using this then .. but from the code snippet you send out you were calling merge only on the created subject instance only .. these are static methods not for the created instance
Dorus
@Dorus
Aug 13 2015 22:00
In C# it's a extension method. Those are static but work on instances. Not 100% sure how that flies in Java, don't have it set up here to test.
Dorus
@Dorus
Aug 13 2015 22:17
I think it's rx.Observable.merge(subject).buffer(10).subscribe(System.out::println);
Indeed, just a static call.
That was me writing in C# style before :sweat_smile:
Abhinav Solan
@eyeced
Aug 13 2015 22:25
Using the previous method I am able to print out all numbers .. using it merge is showing up o nly 90 elements from the 100
Dorus
@Dorus
Aug 13 2015 22:28
    public class App {
        private Observer<Observable<Integer>> ob;

        public App() {
            PublishSubject<Observable<Integer>> sub = PublishSubject.create();
            rx.Observable.merge(sub, 10).subscribe(System.out::println);
            ob = sub;
        }

        public void put(Observable<Integer> t) {
            ob.onNext(t);
        }

        public static void main(String[] args) {
       App app = new App();
        Observable.range(1, 100).subscribe(integer -> {
            app.put(Observable.just(integer));
        });
        }
    }
That works for me
Abhinav Solan
@eyeced
Aug 13 2015 22:29
yup .. seems instead of buffer merge with count would work here
Dorus
@Dorus
Aug 13 2015 22:29
public class App {
    private Observer<Observable<Integer>> ob;

    public App() {
        PublishSubject<Observable<Integer>> sub = PublishSubject.create();
        rx.Observable.merge(sub).buffer(10).subscribe(System.out::println);
        ob = sub;
    }

    public void put(Observable<Integer> t) {
        ob.onNext(t);
    }

    public static void main(String[] args) {
        App app = new App();
        Observable.range(1, 100).subscribe(integer -> {
            app.put(Observable.just(integer));
        });
    }
}
Works also
Abhinav Solan
@eyeced
Aug 13 2015 22:30
yes .. thanks a lot @Dorus
Dorus
@Dorus
Aug 13 2015 22:31
No problem. I finally did something with RxJava. yay :D
Abhinav Solan
@eyeced
Aug 13 2015 22:31
I got to learn Subjects yay
Dorus
@Dorus
Aug 13 2015 22:31
nah, avoid them like the plague :)
observe how i kept the subject as hidden as possible in my snipped
Abhinav Solan
@eyeced
Aug 13 2015 22:32
yes .. going to only use that like that only
Dorus
@Dorus
Aug 13 2015 22:34
Most of the time there are methods that do the same but hide the underlying subject, always prefer those above actual subjects. For example .replay() instead of replaySubject. (Again C# stuff here, but i'm sure RxJava has something similar).
This case we had here is pretty much the only one where i dont know how to avoid the subject.
Abhinav Solan
@eyeced
Aug 13 2015 22:34
replay is there
Dorus
@Dorus
Aug 13 2015 22:40
oh btw, my example loses the Subscription from subscribe(). You probably want to store that one somewhere. Again not something i'm overly familiar with (but in Rx.Net we tend to keep track of our IDisposables)