These are chat archives for ReactiveX/RxJava

11th
Jun 2018
Murtuza Chhil
@chhil
Jun 11 2018 07:35
Hi.
I have a ConnectableFlowable that reads records from a file and have 2 subscribers, each subscriber writes to a file after some transformations.
When I run the subscribers individually using the ConnectableFlowable , one takes 5 seconds and the other 32 seconds.
When I run the 2 together they take close to 32 seconds each.
I cannot get my head around the time difference of why the the quicker one is now taking longer. Any pointers will be appreciated.
Murtuza Chhil
@chhil
Jun 11 2018 08:30
When I create 2 instances of the connectable and each subscriber subscibes to one, then the behavior is accepted, I get the 2 processed in 5 and 32 seconds. So it appears that when the observable is emitting to multiple subscribers it needs to slow down to the slowest subscriber. Is there a way to have a single Connectable and the emissions occur without slowing down based on the slowest subscriber?
David Karnok
@akarnokd
Jun 11 2018 08:50
ConnectableFlowable runs with the speed of the slowest consumer. You have to unbound the slower consumer, for example, via onBackpressureBuffer and moving the processing to another thread.
Murtuza Chhil
@chhil
Jun 11 2018 09:59
Thanks, will give it a try.
Murtuza Chhil
@chhil
Jun 11 2018 10:14
@akarnokd This doesnt work but is this the general idea you were trying to convey?
        return flowable
                       .observeOn(Schedulers.io())
                       .filter(x -> all.test(x))
                       .onBackpressureBuffer()

                       .subscribe(fsdMsg -> {

                           Runnable r = () -> {
                            ps.println(String.format("[offset:%15d][Record Number :%10d]", offset + (record * 1400),
                                                     record));
                            fsdMsg.dump(ps, "");

                        };
                           new Thread(r).start();

                       },........
Murtuza Chhil
@chhil
Jun 11 2018 10:19
By doesnt work, the file is incomplete and the order of lnes screwed up. The line order is understandable but the incomplete data in file I don't get.
David Karnok
@akarnokd
Jun 11 2018 11:10

Please see how words in my sentence match the operator applications in code:

source.onBackpressurBuffer().observeOn(Schedulers.io()).subscribe(fsdMsg -> {
    ps.println(String.format("[offset:%15d][Record Number :%10d]", offset + (record * 1400), record));
     fsdMsg.dump(ps, "");
});

source -> unbound -> move to another thread.

Murtuza Chhil
@chhil
Jun 11 2018 12:38
@akarnokd Resolved. Thank you.