Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    pbagchi
    @pbagchi
    Standalone, sorry I don't understand? I have three services that I am reaching out to. Two on computation scheduler and one on IO.
    I would like them all scheduled independently and in parallel not waiting on each other.
    rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
    rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
    rx.Observable<Greeter.GreeterReply> memberReplyObservable = memberCommand.toObservable().subscribeOn(Schedulers.io());
    
    Observable.zip(worldReplyObservable, teamReplyObservable, memberReplyObservable, new Func3<Greeter.GreeterReply, Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
                    @Override
                    public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply, Greeter.GreeterReply memberReply) {
    David Karnok
    @akarnokd
    Your "infinite loop" comment doesn't make sense to me. Whenever a task needs to be scheduled, that hook body will be executed as well as the Action0 wrapper. Providing standalone means code that captures the essence of your flows and uses only plain JDK types and RxJava so I can try it and see what could be wrong.
    I have a blog post about writing a scheduler that copies thread-local data around.
    pbagchi
    @pbagchi
    Thanks. I am in talks to put it in public github. I'll go through your blog over the weekend and dig through more.
    4goodapp
    @4goodapp

    Hi,
    I saw this "After an Observer calls an Observable's subscribe method..." and was wondering how an Observer can call an Observable's subscribe method.

    Is that sentence correct?

    David Karnok
    @akarnokd
    @4goodapp Most Observers are not aware the Observable they are going to get subscribed to via ObservableSource.subscribe(). I guess the sentence could be reworded. PR welcome.
    Ozzy Osborne
    @BarDweller
    Can anyone help with what I'm doing wrong here? I'm trying to have the messages in the messages Observable pop out at random intervals.. This is based off some code from stack overflow, but if I grok it right, it's trying to create one observable stream that loops that has the switchMap to cause it to wait a random amount (via the call to randomTime() that in my impl just returns '2').. and then zipWith laces the two together.. meaning each message has to wait for the next delay to pop.. except it doesn't work.. and I don't understand why...
           Observable<String> delay = Observable.just("")
                                        .switchMap(dummy -> Observable.timer(randomTime(), TimeUnit.SECONDS))
                                        .map( a -> String.valueOf(a) )
                                        .repeat();
    
           Observable<String> messages = Observable.just("Test") //eventually lines from a file...
                                                   .repeat();
    
           messages.zipWith(delay, (d, msg) -> ""+d+" "+msg  ).subscribe( System.out::println );
    David Karnok
    @akarnokd
    How does it not work? Did you add Thread.sleep(10000) to see it produce events before the main quits?
    Ozzy Osborne
    @BarDweller
    The main doesn't quit at all, and nothing is emitted
    it shouldn't quit tho right? I'm zipping across 2 infinite streams
    David Karnok
    @akarnokd
    Is this RxJava 1 code? I've tested it and works, prints Test 0 a couple of times.
    Ozzy Osborne
    @BarDweller
    rxjava2
    I mean.. it's great to know it works for you.. you've no idea how nice it is to know my half brained understanding of this stuff is at least on the right tracks ;p
    David Karnok
    @akarnokd
    v2 Observable doesn't support backpressure, therefore the just.repeat will keep emitting the "Test" into the zip's buffer and never letting the delay ever execute. With Flowable, the messages will pause after 128 items which allows the delay to start executing
    Ozzy Osborne
    @BarDweller
    Ah.. ok.. I'm new to all this stuff =) so I should swap them all to be Flowables?
    David Karnok
    @akarnokd
    Yes, and switch the places of messages and delay: so it can start the timer first.
    Ozzy Osborne
    @BarDweller
    I shall have a bash, thank you =)
    David Karnok
    @akarnokd
    Also please read the wiki about the differences between the two major versions.
    Ozzy Osborne
    @BarDweller
    I've never used version 1 or version 2 at least never beyond very simple examples
     Flowable<String> delay = Flowable.just("")
                                        .switchMap(dummy -> Flowable.timer(randomTime(), TimeUnit.SECONDS))
                                        .map( a -> String.valueOf(a) )
                                        .repeat();
    
           Flowable<String> messages = Flowable.just("Test") //eventually lines from a file...
                                                   .repeat();
    
           delay.zipWith(messages, (d, msg) -> ""+d+" "+msg  ).subscribe( System.out::println );
    So I tried that... and now it just exits immediately with no output =)
    David Karnok
    @akarnokd
    There are a lot of tutorials out there, mostly targeting v1. The wiki should help you turning them into v2 code.
    Did you put in Thread.sleep(10000)?
    Ozzy Osborne
    @BarDweller
    No.. why do I need it?
    David Karnok
    @akarnokd
    Standard Schedulers use daemon threads that won't prevent the JVM from exiting.
    Ozzy Osborne
    @BarDweller
    It does work with the sleep.. so that's good to know.. =)
    ahh.. so I need to subscribe on the current thread somehow?
    David Karnok
    @akarnokd
    You need to prevent the main thread from returning prematurely by some means. If you'd have finite flows, blockingSubscribe could work, although the only way to stop the example flow is to kill the JVM.
    Ozzy Osborne
    @BarDweller
    yeah.. this is basically a quick app that reads lines from a file, and invokes a webservice with a line as a message, then delays randomly and repeats.. (ideally using the lines sequentially)
    so when it runs out of lines from the file, ideally I want it to start over again.. it's never supposed to exit, or rather, I'm fine exiting it by killing the process
    David Karnok
    @akarnokd
    In that case, use blockingSubscribe().
    Ozzy Osborne
    @BarDweller
    ah.. of course.. =) I find a lot of this stuff is at the mo 'try this, doesn't work, repeat until find one way that works'
    like.. I understand that Flowable.timer( delay, unit) produces an observable that returns 0L after the specified delay
    ahh.. and now I figure out why my next change didn't work either..
    David Karnok
    @akarnokd
    It might be worth reading an RxJava 2 book: https://www.packtpub.com/application-development/learning-rxjava
    Ozzy Osborne
    @BarDweller
    Indeed.. probably a sane suggestion.
    I'm playing with WebFlux, RxJava and Lagom all at the same time..
    not in the same app, thankfully
    but in parallel, trying to do the same thing in each
    Yannick Lecaillez
    @ylecaillez
    Hi guys !
    Just got a weird issues on my project. The source of it lies in BaseEmitter.complete() which invokes the serial.dipose().
    This has the side effect of invoking the callback registered with setCancelled(). This is quite surprising and looks like a bug to me ?
    David Karnok
    @akarnokd
    @ylecaillez So you want your resource to leak?
    Yannick Lecaillez
    @ylecaillez
    Well, at least i would have expected setCancelled() to behave differently then setDisposable())
    when response.cancel() is invoked i want to send a CANCEL request. But surely i don't want to send a CANCEL request when the response is completed.
    i manage to solve that using .doOnCancel(), but i found the method naming setCancelled() to be a bit misleading
    David Karnok
    @akarnokd
    There is no such method setCancelled.
    Yannick Lecaillez
    @ylecaillez
    Sorry, it is on FlowableEmitter
    and it is named setCancellable
    David Karnok
    @akarnokd
    Disposable and Cancellable are logically equivalent. Cancellable.cancel() allows the implementor to throw a checked exception. They are there to allow resource cleanup when the emitter is terminated or cancelled.
    Why don't want you to send a CANCEL to whatever source when your generator decides to end the sequence?
    Yannick Lecaillez
    @ylecaillez
    I have a RequestState which can be shared amongst multiple request (see it like a transaction). this request state manage the cancellation of the whole transaction.
    If RequestState.isCancelled() return true, subsequent request sharing the same RequestState will signal onError(CancelException) on the response Flowable.
    I've used setCancellable() to plug the RxJava cancel() to this RequestState.
    Not sure this is clear. But my bug was that on the first request completed, the RequestState.cancel() was invoked. The subsequent request were always cancelled without "reason".