RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Ah gotcha! So the main queue is only responsible for scalars from the following line:
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
}
Each inner will have its own queue and not use the main. @akarnokd ?
flatDir
, I can't seem to use the Java 9 part.
Hi,
Was wondering if the community could help me on how I could parallelize the 3 calls shown in the code below?
Observable<CheckoutDetailsDTO> o = Observable.zip(
orderIntegrationService.getRxOrderByOrderId(orderId),
shippingIntegrationService.getRxAddressListForUser(),
paymentIntegrationService.getRxPaymentInfoForUser(),
(order, addresslist, paymntinfo) -> new CheckoutDetailsDTO(order, addresslist, paymntinfo));
The calls to order, shipping and payment happen to be called one after another in the code above, is there any way it can be parallelized as the 3 calls are independent of each other ?
orderIntegrationService.getRxOrderByOrderId(orderId).subscribeOn(Schedulers.io()),
shippingIntegrationService.getRxAddressListForUser().subscribeOn(Schedulers.io()),
paymentIntegrationService.getRxPaymentInfoForUser().subscribeOn(Schedulers.io()),
BehaviorSubject
that emits strings every time a barcode is scanned, i then want to try map this to a product in my room database. i also want to know if it failed to map so i cannot useObservable
as 'empty' values are filtered out so i changed the sig to use Maybe<T>
. Is there anything special i need to do to flatmap from my Observable
to the Maybe
?
Hi, I am working on a project using Hystrix and gRPC. I have a HystrixObservableCommand class - MemberCommand and I am subscribing using Schedulers.io()
memberCommand.toObservable().subscribeOn(Schedulers.io());
In the construct
method, I log the gRPC context but it returns null -
protected Observable<Greeter.GreeterReply> construct() {
logger.info("0 -- In MemberCommand, traceId from gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
Any ideas, how I can propagate gRPC context through RxIoScheduler threads?
0 -- In MemberCommand, traceId from gRPC context = null
Hi, for the question I had above, I am trying around with RxJavaHooks. Can somebody point me to what the Runnable is inside the call() method?
RxJavaHooks.setOnIOScheduler(new Func1<Scheduler, Scheduler>() {
@Override
public Scheduler call(Scheduler scheduler) {
//context.attach();
return scheduler;
}
});
rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());
The version of RxJava Hystrix is using is 1.2.0.
@akarnokd - So, I did this
rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
Context context = Context.current();
RxJavaHooks.setOnScheduleAction(new Func1<Action0, Action0>() {
@Override
public Action0 call(Action0 action0) {
context.attach();
logger.info("traceId in gRPC context = {} ************", ContextKeys.TRACE_ID_CTX_KEY.get());
Runnable r = new Runnable() {
@Override
public void run() {
context.attach();
logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
}
};
r.run();
return action0;
}
});
rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());
Going into an endless loop because return action0;
is eventually calling action0
again. What am I doing wrong?
I have three Observables - two on computation scheduler and one on IO. As far as I understand setOnScheduleAction
will get called for all three which isn't a problem per se. But I am seeing RxComputationScheduler threads which have the gRPC context but then one RxIOScheduler thread which still doesn't have the context.
Runnable r = new Runnable() {
@Override
public void run() {
context.attach();
logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
action0.run(); // <--------------------------------------------------------------------------------------------
}
};
return r; // <------------------------------------------------------------------------------
@akarnokd - Sorry, for bugging, but Action0 does not implement Runnable so I can't return r or do action0.run()
RxJavaHooks.setOnScheduleAction(new Func1<Action0, Action0>() {
@Override
public Action0 call(Action0 action0) {
context.attach();
logger.info("traceId in gRPC context = {} ************", ContextKeys.TRACE_ID_CTX_KEY.get());
Runnable r = new Runnable() {
@Override
public void run() {
context.attach();
logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
action0.run();
}
};
return r;
}
});
I can do action0.call() but that would be an infinite loop.
rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());
Observable.zip(worldReplyObservable, teamReplyObservable, memberReplyObservable, new Func3<Greeter.GreeterReply, Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
@Override
public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply, Greeter.GreeterReply memberReply) {
Action0
wrapper. Providing standalone means code that captures the essence of your flows and uses only plain JDK types and RxJava so I can try it and see what could be wrong.
Hi,
I saw this "After an Observer calls an Observable's subscribe method..." and was wondering how an Observer can call an Observable's subscribe method.
Is that sentence correct?
Observable<String> delay = Observable.just("")
.switchMap(dummy -> Observable.timer(randomTime(), TimeUnit.SECONDS))
.map( a -> String.valueOf(a) )
.repeat();
Observable<String> messages = Observable.just("Test") //eventually lines from a file...
.repeat();
messages.zipWith(delay, (d, msg) -> ""+d+" "+msg ).subscribe( System.out::println );