Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Bartłomiej Hołota
    @bholota
    ok, one more thing: when I will Observable.from(...).map(...).subscribe(...) and in such chaining this subscribe will catch all emitted items? It don't has to be defered? (it's hot observable right?)
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    1. It's not hot observable
    1. subscriber will catch all items
    1. It needs to be deferred only if in from(something) something should be computed only after subscription to the observable
    ah, sorry for indexes, markdown…
    About defer: imagine you have Observable.just(networkRequest())
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    the problem is that in Java all arguments computed before execution of the function, so you can not schedule networkRequest() on the other thread, it will be computed in thread that called Observable.just(networkRequest()).
    To avoid that you need to have a function that wraps your call and defer it:
    Observable
      .defer(() -> Observable.just(networkRequest())
      .map()
      .subscribe()
    this will be cold observable and networkRequest() will be executed only after subscription.
    Bartłomiej Hołota
    @bholota
    Ok, thanks, I need to read more about it.
    Abhinav Solan
    @eyeced
    I am doing an insert into DB wihich are single value inserts mostly coming from numerous clients, these are around 50k values coming in from different sources every 3 seconds, I am not able to do a batch insert into the db. Is their a way with obsevables that before calling on insert I can combine all of these into a single observable stream .. and all other observables are pushing into this defined observable ?
    David Stemmer
    @weefbellington
    @eyeced how often do you want do do the batch insert?
    is there something that triggers an end-of-stream -- like a sentinal value that says "this is the last thing to be inserted into the database"?
    or some sort of event that would allow you to figure out when the batch insert should occur?
    Abhinav Solan
    @eyeced
    These are continuos values coming no end of stream
    David Stemmer
    @weefbellington
    @eyeced alright, so what I might try is sample -- should allow you to sample a continuous stream of values at a specified interval, returning the values since the last sample as a Collection
    Abhinav Solan
    @eyeced
    Should I create one observable like batcher in the class and then call on merge for all incoming observables and use the window / batch option with time for the batcher observable and keep on doing batch insert using that .. would this be a good solution ?
    David Stemmer
    @weefbellington
    alternatively you could try buffer which allows you split the observable stream into fixed-sized chunks, for example if you wanted to do a batch update each time 1000 events were queued up
    looks like buffer supports sampling as well so that might be what you want
    @eyeced ^
    Abhinav Solan
    @eyeced
    So is should I just create common observable and merge in all the incoming observables in that .. and use buffer with that ...
    David Stemmer
    @weefbellington
    yep
    Abhinav Solan
    @eyeced
    thanks a lot @weefbellington
    David Stemmer
    @weefbellington
    sure
    Abhinav Solan
    @eyeced
    I did a setup for cassandra cluster which was giving around 60k inserts performance .. when I used single insertes it came down 20k inserts .. I will try out with this suggestion .. this might put it back to original performance
    60k / sec
    David Stemmer
    @weefbellington
    @eyeced you could use a PublishSubject as your source observable if you are OK with your observable being "hot"
    each time you need to push an event to the stream, you would call PublishSubject#onNext
    Abhinav Solan
    @eyeced
    Ok i will try out with that too ..
    David Stemmer
    @weefbellington
    personally I don't recommend Subjects ("hot" observables) in the general case (they can be problematic because you can start pushing data to them before anything is subscribed to them) but for some scenarios they can be useful
    Dorus
    @Dorus
    I could swear Sample only returns the latest value (if any) every time interval, while Buffer and Window allow you to group all events in a certain timespan together (Probably buffer in this case, as that returns the collected values in a collection, while window keeps them in a nested observable so you would still need to aggregate them). But i'm coming from Rx.NET, so it might be different here with RxJava.
    David Stemmer
    @weefbellington
    @Dorus yeah looks like you're right, buffer is what he wants here
    I usually use sample to manage backpressure when I don't care about receiving every update
    Dorus
    @Dorus
    yeah exactly
    Dorus
    @Dorus
    Also, combining streams is simply Merge, as long as the clients dont emit errors. I see there is even a mergeDelayError in case you want to wait for all other clients to complete first. If bulk insert from a single client is easier than combining events from different clients, do the buffer operation before you merge.
    Or connect to the clients inside the sequence with flatMap.
    Abhinav Solan
    @eyeced
    I am exposing an API so I would not be able to control the client .. they are just going to call "put(Observable<T> t) " and in our case it would be just one value most of the time so while doing the insert in put call I want to merge all the incoming observables.
    Dorus
    @Dorus
    If you work with the previous mentioned PublishSubject<Observable<T>>, you can just call PublishSubject.OnNext(t)inside put. Do first subscribe to the PublishSubject with PublishSubject.asObservable().Merge().Subscribe(...) (doesn't strictly need the asObservable if you subscribe locally). Just make sure you don't expose the Subject anywhere.
    Abhinav Solan
    @eyeced
    PublishSubject seems the way ..
    Dorus
    @Dorus
    The general concession is to avoid Subjects, but i still haven't found a way to avoid them in these situations, just as long as you keep the Subject private and local it shouldn't become a mess.
    Dorus
    @Dorus
    I mean, i would love to call Observer.Create(), but that doesn't allow you to call merge anymore. So you kinda need
    private Observer ob;
    public constructor() {
        PublishSubject<Observable<T>> sub = PublishSubject.create();
        sub.merge().subscribe(...);
        ob = sub.asObserver();
    }
    public void put(Observable<T> t) {
        ob.onNext(t);
    }
    (Please anybody correct me if i'm wrong)
    Abhinav Solan
    @eyeced
    Thanks @Dorus this worked beautifully here is the implementation I did

    public class SubjectTest {
    private static final SubjectTest INSTANCE = new SubjectTest();
    private PublishSubject<Integer> subject;

    public SubjectTest() {
        subject = PublishSubject.create();
        subject.buffer(10).subscribe(System.out::println);
    }
    
    void put(Observable<Integer> obs) {
        obs.subscribe(integer -> subject.onNext(integer));
    }
    
    public static void main(String[] args) {
        ExecutorService executor = Executors.newWorkStealingPool(10);
        Observable.range(1, 100).subscribe(integer -> {
            executor.execute(() -> INSTANCE.put(Observable.just(integer)));
        });
    }

    }

    Dorus
    @Dorus
    you're missing a few spaces in front.
    (you can edit)
    Anyway looks good, but arn't you missing a merge() before buffer?
    Abhinav Solan
    @eyeced
    in RxJava there is no merge() call for the subject
    Dorus
    @Dorus
    really?