Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Gordeev Boris
    @gordeevbr
    That's why I asked if someone could give it a look.
    Gordeev Boris
    @gordeevbr
    Oh wait, I'm totally wrong.
    Yeah, that won't work it seems.
    I have read through this again.
    Sorry for wasting your time.
    Sascha Held
    @faken
    Hey folks, i'am currently totally lost on a RxJava / functional-programing issue. Still new to all of this. I've documented my issue in this Stackoverflow question: https://stackoverflow.com/questions/48341012/rxjava-conditional-execution-of-observables-null-handling
    Would be super awesome if maybe somebody could have a look
    Rafael Guillen
    @rguillens
    mreddimasi
    @mreddimasi

    Hi,
    Was wondering if the community could help me on how I could parallelize the 3 calls shown in the code below?

    Observable<CheckoutDetailsDTO> o = Observable.zip(
                orderIntegrationService.getRxOrderByOrderId(orderId),
                shippingIntegrationService.getRxAddressListForUser(),
                paymentIntegrationService.getRxPaymentInfoForUser(),
                (order, addresslist, paymntinfo) ->  new CheckoutDetailsDTO(order, addresslist, paymntinfo));

    The calls to order, shipping and payment happen to be called one after another in the code above, is there any way it can be parallelized as the 3 calls are independent of each other ?

    mreddimasi
    @mreddimasi
    I've tried the following to parallelize, but still see that its been called one after another
    orderIntegrationService.getRxOrderByOrderId(orderId).subscribeOn(Schedulers.io()),
                shippingIntegrationService.getRxAddressListForUser().subscribeOn(Schedulers.io()),
                paymentIntegrationService.getRxPaymentInfoForUser().subscribeOn(Schedulers.io()),
    Troy Patrick
    @teasp00n
    i have a BehaviorSubject that emits strings every time a barcode is scanned, i then want to try map this to a product in my room database. i also want to know if it failed to map so i cannot useObservable as 'empty' values are filtered out so i changed the sig to use Maybe<T>. Is there anything special i need to do to flatmap from my Observable to the Maybe?
    pbagchi
    @pbagchi

    Hi, I am working on a project using Hystrix and gRPC. I have a HystrixObservableCommand class - MemberCommand and I am subscribing using Schedulers.io()

    memberCommand.toObservable().subscribeOn(Schedulers.io());

    In the construct method, I log the gRPC context but it returns null -

    protected Observable<Greeter.GreeterReply> construct() {
     logger.info("0 -- In MemberCommand, traceId from gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());

    Any ideas, how I can propagate gRPC context through RxIoScheduler threads?

    For the above I get,
    0 -- In MemberCommand, traceId from gRPC context = null
    Rafael Guillen
    @rguillens
    @teasp00n I recommend you wrapping the scanned String with something like Optional, this way you can send Optional.empty() emissions when room database map fails.
    Rafael Guillen
    @rguillens
    @mreddimasi Read about and try with mergeDelayError operator.
    pbagchi
    @pbagchi
    @rguillens - thanks.
    pbagchi
    @pbagchi

    Hi, for the question I had above, I am trying around with RxJavaHooks. Can somebody point me to what the Runnable is inside the call() method?

                RxJavaHooks.setOnIOScheduler(new Func1<Scheduler, Scheduler>() {
                    @Override
                    public Scheduler call(Scheduler scheduler) {
                        //context.attach();
                        return scheduler;
                    }
                });
    
                rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());

    The version of RxJava Hystrix is using is 1.2.0.

    David Karnok
    @akarnokd
    You can hook into the task scheduling via setOnScheduleAction. Get the current context, create a new Runnable which restores that context and calls the original Runnable.
    pbagchi
    @pbagchi

    @akarnokd - So, I did this

    rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
                rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
                Context context = Context.current();
                RxJavaHooks.setOnScheduleAction(new Func1<Action0, Action0>() {
                    @Override
                    public Action0 call(Action0 action0) {
                        context.attach();
                        logger.info("traceId in gRPC context = {} ************", ContextKeys.TRACE_ID_CTX_KEY.get());
                        Runnable r = new Runnable() {
                            @Override
                            public void run() {
                                context.attach();
                                logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
                            }
                        };
                        r.run();
                        return action0;
                    }
                });
                rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());

    Going into an endless loop because return action0; is eventually calling action0 again. What am I doing wrong?
    I have three Observables - two on computation scheduler and one on IO. As far as I understand setOnScheduleAction will get called for all three which isn't a problem per se. But I am seeing RxComputationScheduler threads which have the gRPC context but then one RxIOScheduler thread which still doesn't have the context.

    David Karnok
    @akarnokd
    Don't call r.run() and return r.
    Runnable r = new Runnable() {
        @Override
        public void run() {
            context.attach();
            logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
    
            action0.run();  // <--------------------------------------------------------------------------------------------
         }
    };
    return r;  // <------------------------------------------------------------------------------
    pbagchi
    @pbagchi

    @akarnokd - Sorry, for bugging, but Action0 does not implement Runnable so I can't return r or do action0.run()

    RxJavaHooks.setOnScheduleAction(new Func1<Action0, Action0>() {
                    @Override
                    public Action0 call(Action0 action0) {
                        context.attach();
                        logger.info("traceId in gRPC context = {} ************", ContextKeys.TRACE_ID_CTX_KEY.get());
                        Runnable r = new Runnable() {
                            @Override
                            public void run() {
                                context.attach();
                                logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
                                action0.run();
                            }
                        };
                        return r;
                    }
                });

    I can do action0.call() but that would be an infinite loop.

    David Karnok
    @akarnokd
    I meant Action0:
    Action0 r = new Action0() {
        @Override
        public void call() {
            context.attach();
            logger.info("**************** traceId in gRPC context = {}", ContextKeys.TRACE_ID_CTX_KEY.get());
    
            action0.call();
         }
    };
    return r;
    pbagchi
    @pbagchi
    @akarnokd - action0.call() is starting an infinite loop and taking it out is just making it hang and not complete.
    David Karnok
    @akarnokd
    Can you provide something standalone? How many items do you have? It is possible each item will be scheduled separately.
    pbagchi
    @pbagchi
    Standalone, sorry I don't understand? I have three services that I am reaching out to. Two on computation scheduler and one on IO.
    I would like them all scheduled independently and in parallel not waiting on each other.
    rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
    rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
    rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());
    
    Observable.zip(worldReplyObservable, teamReplyObservable, memberReplyObservable, new Func3<Greeter.GreeterReply, Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
                    @Override
                    public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply, Greeter.GreeterReply memberReply) {
    David Karnok
    @akarnokd
    Your "infinite loop" comment doesn't make sense to me. Whenever a task needs to be scheduled, that hook body will be executed as well as the Action0 wrapper. Providing standalone means code that captures the essence of your flows and uses only plain JDK types and RxJava so I can try it and see what could be wrong.
    I have a blog post about writing a scheduler that copies thread-local data around.
    pbagchi
    @pbagchi
    Thanks. I am in talks to put it in public github. I'll go through your blog over the weekend and dig through more.
    4goodapp
    @4goodapp

    Hi,
    I saw this "After an Observer calls an Observable's subscribe method..." and was wondering how an Observer can call an Observable's subscribe method.

    Is that sentence correct?

    David Karnok
    @akarnokd
    @4goodapp Most Observers are not aware the Observable they are going to get subscribed to via ObservableSource.subscribe(). I guess the sentence could be reworded. PR welcome.
    Ozzy Osborne
    @BarDweller
    Can anyone help with what I'm doing wrong here? I'm trying to have the messages in the messages Observable pop out at random intervals.. This is based off some code from stack overflow, but if I grok it right, it's trying to create one observable stream that loops that has the switchMap to cause it to wait a random amount (via the call to randomTime() that in my impl just returns '2').. and then zipWith laces the two together.. meaning each message has to wait for the next delay to pop.. except it doesn't work.. and I don't understand why...
           Observable<String> delay = Observable.just("")
                                        .switchMap(dummy -> Observable.timer(randomTime(), TimeUnit.SECONDS))
                                        .map( a -> String.valueOf(a) )
                                        .repeat();
    
           Observable<String> messages = Observable.just("Test") //eventually lines from a file...
                                                   .repeat();
    
           messages.zipWith(delay, (d, msg) -> ""+d+" "+msg  ).subscribe( System.out::println );
    David Karnok
    @akarnokd
    How does it not work? Did you add Thread.sleep(10000) to see it produce events before the main quits?
    Ozzy Osborne
    @BarDweller
    The main doesn't quit at all, and nothing is emitted
    it shouldn't quit tho right? I'm zipping across 2 infinite streams
    David Karnok
    @akarnokd
    Is this RxJava 1 code? I've tested it and works, prints Test 0 a couple of times.
    Ozzy Osborne
    @BarDweller
    rxjava2
    I mean.. it's great to know it works for you.. you've no idea how nice it is to know my half brained understanding of this stuff is at least on the right tracks ;p
    David Karnok
    @akarnokd
    v2 Observable doesn't support backpressure, therefore the just.repeat will keep emitting the "Test" into the zip's buffer and never letting the delay ever execute. With Flowable, the messages will pause after 128 items which allows the delay to start executing
    Ozzy Osborne
    @BarDweller
    Ah.. ok.. I'm new to all this stuff =) so I should swap them all to be Flowables?
    David Karnok
    @akarnokd
    Yes, and switch the places of messages and delay: so it can start the timer first.
    Ozzy Osborne
    @BarDweller
    I shall have a bash, thank you =)
    David Karnok
    @akarnokd
    Also please read the wiki about the differences between the two major versions.
    Ozzy Osborne
    @BarDweller
    I've never used version 1 or version 2 at least never beyond very simple examples
     Flowable<String> delay = Flowable.just("")
                                        .switchMap(dummy -> Flowable.timer(randomTime(), TimeUnit.SECONDS))
                                        .map( a -> String.valueOf(a) )
                                        .repeat();
    
           Flowable<String> messages = Flowable.just("Test") //eventually lines from a file...
                                                   .repeat();
    
           delay.zipWith(messages, (d, msg) -> ""+d+" "+msg  ).subscribe( System.out::println );
    So I tried that... and now it just exits immediately with no output =)
    David Karnok
    @akarnokd
    There are a lot of tutorials out there, mostly targeting v1. The wiki should help you turning them into v2 code.
    Did you put in Thread.sleep(10000)?
    Ozzy Osborne
    @BarDweller
    No.. why do I need it?