These are chat archives for ReactiveX/RxJava

1st
Feb 2018
Rafael Guillen
@rguillens
Feb 01 2018 14:47
@teasp00n I recommend you wrapping the scanned String with something like Optional, this way you can send Optional.empty() emissions when room database map fails.
Rafael Guillen
@rguillens
Feb 01 2018 14:59
@mreddimasi Read about and try with mergeDelayError operator.
pbagchi
@pbagchi
Feb 01 2018 15:02
@rguillens - thanks.
pbagchi
@pbagchi
Feb 01 2018 18:12

Hi, for the question I had above, I am trying around with RxJavaHooks. Can somebody point me to what the Runnable is inside the call() method?

            RxJavaHooks.setOnIOScheduler(new Func1<Scheduler, Scheduler>() {
                @Override
                public Scheduler call(Scheduler scheduler) {
                    //context.attach();
                    return scheduler;
                }
            });

            rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());

The version of RxJava Hystrix is using is 1.2.0.

David Karnok
@akarnokd
Feb 01 2018 19:30
You can hook into the task scheduling via setOnScheduleAction. Get the current context, create a new Runnable which restores that context and calls the original Runnable.
pbagchi
@pbagchi
Feb 01 2018 20:21

@akarnokd - So, I did this

rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
            rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
            Context context = Context.current();
            RxJavaHooks.setOnScheduleAction(new Func1<Action0, Action0>() {
                @Override
                public Action0 call(Action0 action0) {
                    context.attach();
                    logger.info("traceId in gRPC context = {} ************", ContextKeys.TRACE_ID_CTX_KEY.get());
                    Runnable r = new Runnable() {
                        @Override
                        public void run() {
                            context.attach();
                            logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
                        }
                    };
                    r.run();
                    return action0;
                }
            });
            rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());

Going into an endless loop because return action0; is eventually calling action0 again. What am I doing wrong?
I have three Observables - two on computation scheduler and one on IO. As far as I understand setOnScheduleAction will get called for all three which isn't a problem per se. But I am seeing RxComputationScheduler threads which have the gRPC context but then one RxIOScheduler thread which still doesn't have the context.

David Karnok
@akarnokd
Feb 01 2018 21:30
Don't call r.run() and return r.
Runnable r = new Runnable() {
    @Override
    public void run() {
        context.attach();
        logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());

        action0.run();  // <--------------------------------------------------------------------------------------------
     }
};
return r;  // <------------------------------------------------------------------------------
pbagchi
@pbagchi
Feb 01 2018 22:09

@akarnokd - Sorry, for bugging, but Action0 does not implement Runnable so I can't return r or do action0.run()

RxJavaHooks.setOnScheduleAction(new Func1<Action0, Action0>() {
                @Override
                public Action0 call(Action0 action0) {
                    context.attach();
                    logger.info("traceId in gRPC context = {} ************", ContextKeys.TRACE_ID_CTX_KEY.get());
                    Runnable r = new Runnable() {
                        @Override
                        public void run() {
                            context.attach();
                            logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
                            action0.run();
                        }
                    };
                    return r;
                }
            });

I can do action0.call() but that would be an infinite loop.

David Karnok
@akarnokd
Feb 01 2018 23:21
I meant Action0:
Action0 r = new Action0() {
    @Override
    public void call() {
        context.attach();
        logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());

        action0.call();
     }
};
return r;