RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Hi, for the question I had above, I am trying around with RxJavaHooks. Can somebody point me to what the Runnable is inside the call() method?
RxJavaHooks.setOnIOScheduler(new Func1<Scheduler, Scheduler>() {
@Override
public Scheduler call(Scheduler scheduler) {
//context.attach();
return scheduler;
}
});
rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());
The version of RxJava Hystrix is using is 1.2.0.
@akarnokd - So, I did this
rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
Context context = Context.current();
RxJavaHooks.setOnScheduleAction(new Func1<Action0, Action0>() {
@Override
public Action0 call(Action0 action0) {
context.attach();
logger.info("traceId in gRPC context = {} ************", ContextKeys.TRACE_ID_CTX_KEY.get());
Runnable r = new Runnable() {
@Override
public void run() {
context.attach();
logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
}
};
r.run();
return action0;
}
});
rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());
Going into an endless loop because return action0;
is eventually calling action0
again. What am I doing wrong?
I have three Observables - two on computation scheduler and one on IO. As far as I understand setOnScheduleAction
will get called for all three which isn't a problem per se. But I am seeing RxComputationScheduler threads which have the gRPC context but then one RxIOScheduler thread which still doesn't have the context.
Runnable r = new Runnable() {
@Override
public void run() {
context.attach();
logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
action0.run(); // <--------------------------------------------------------------------------------------------
}
};
return r; // <------------------------------------------------------------------------------
@akarnokd - Sorry, for bugging, but Action0 does not implement Runnable so I can't return r or do action0.run()
RxJavaHooks.setOnScheduleAction(new Func1<Action0, Action0>() {
@Override
public Action0 call(Action0 action0) {
context.attach();
logger.info("traceId in gRPC context = {} ************", ContextKeys.TRACE_ID_CTX_KEY.get());
Runnable r = new Runnable() {
@Override
public void run() {
context.attach();
logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
action0.run();
}
};
return r;
}
});
I can do action0.call() but that would be an infinite loop.
rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());
Observable.zip(worldReplyObservable, teamReplyObservable, memberReplyObservable, new Func3<Greeter.GreeterReply, Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
@Override
public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply, Greeter.GreeterReply memberReply) {
Action0
wrapper. Providing standalone means code that captures the essence of your flows and uses only plain JDK types and RxJava so I can try it and see what could be wrong.
Hi,
I saw this "After an Observer calls an Observable's subscribe method..." and was wondering how an Observer can call an Observable's subscribe method.
Is that sentence correct?
Observable<String> delay = Observable.just("")
.switchMap(dummy -> Observable.timer(randomTime(), TimeUnit.SECONDS))
.map( a -> String.valueOf(a) )
.repeat();
Observable<String> messages = Observable.just("Test") //eventually lines from a file...
.repeat();
messages.zipWith(delay, (d, msg) -> ""+d+" "+msg ).subscribe( System.out::println );
Flowable<String> delay = Flowable.just("")
.switchMap(dummy -> Flowable.timer(randomTime(), TimeUnit.SECONDS))
.map( a -> String.valueOf(a) )
.repeat();
Flowable<String> messages = Flowable.just("Test") //eventually lines from a file...
.repeat();
delay.zipWith(messages, (d, msg) -> ""+d+" "+msg ).subscribe( System.out::println );