These are chat archives for ReactiveX/RxJava

2nd
Feb 2018
pbagchi
@pbagchi
Feb 02 2018 19:02
@akarnokd - action0.call() is starting an infinite loop and taking it out is just making it hang and not complete.
David Karnok
@akarnokd
Feb 02 2018 20:11
Can you provide something standalone? How many items do you have? It is possible each item will be scheduled separately.
pbagchi
@pbagchi
Feb 02 2018 20:25
Standalone, sorry I don't understand? I have three services that I am reaching out to. Two on computation scheduler and one on IO.
I would like them all scheduled independently and in parallel not waiting on each other.
rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());

Observable.zip(worldReplyObservable, teamReplyObservable, memberReplyObservable, new Func3<Greeter.GreeterReply, Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
                @Override
                public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply, Greeter.GreeterReply memberReply) {
David Karnok
@akarnokd
Feb 02 2018 21:34
Your "infinite loop" comment doesn't make sense to me. Whenever a task needs to be scheduled, that hook body will be executed as well as the Action0 wrapper. Providing standalone means code that captures the essence of your flows and uses only plain JDK types and RxJava so I can try it and see what could be wrong.
I have a blog post about writing a scheduler that copies thread-local data around.
pbagchi
@pbagchi
Feb 02 2018 22:03
Thanks. I am in talks to put it in public github. I'll go through your blog over the weekend and dig through more.