Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ozzy Osborne
    @BarDweller
    I shall have a bash, thank you =)
    David Karnok
    @akarnokd
    Also please read the wiki about the differences between the two major versions.
    Ozzy Osborne
    @BarDweller
    I've never used version 1 or version 2 at least never beyond very simple examples
     Flowable<String> delay = Flowable.just("")
                                        .switchMap(dummy -> Flowable.timer(randomTime(), TimeUnit.SECONDS))
                                        .map( a -> String.valueOf(a) )
                                        .repeat();
    
           Flowable<String> messages = Flowable.just("Test") //eventually lines from a file...
                                                   .repeat();
    
           delay.zipWith(messages, (d, msg) -> ""+d+" "+msg  ).subscribe( System.out::println );
    So I tried that... and now it just exits immediately with no output =)
    David Karnok
    @akarnokd
    There are a lot of tutorials out there, mostly targeting v1. The wiki should help you turning them into v2 code.
    Did you put in Thread.sleep(10000)?
    Ozzy Osborne
    @BarDweller
    No.. why do I need it?
    David Karnok
    @akarnokd
    Standard Schedulers use daemon threads that won't prevent the JVM from exiting.
    Ozzy Osborne
    @BarDweller
    It does work with the sleep.. so that's good to know.. =)
    ahh.. so I need to subscribe on the current thread somehow?
    David Karnok
    @akarnokd
    You need to prevent the main thread from returning prematurely by some means. If you'd have finite flows, blockingSubscribe could work, although the only way to stop the example flow is to kill the JVM.
    Ozzy Osborne
    @BarDweller
    yeah.. this is basically a quick app that reads lines from a file, and invokes a webservice with a line as a message, then delays randomly and repeats.. (ideally using the lines sequentially)
    so when it runs out of lines from the file, ideally I want it to start over again.. it's never supposed to exit, or rather, I'm fine exiting it by killing the process
    David Karnok
    @akarnokd
    In that case, use blockingSubscribe().
    Ozzy Osborne
    @BarDweller
    ah.. of course.. =) I find a lot of this stuff is at the mo 'try this, doesn't work, repeat until find one way that works'
    like.. I understand that Flowable.timer( delay, unit) produces an observable that returns 0L after the specified delay
    ahh.. and now I figure out why my next change didn't work either..
    David Karnok
    @akarnokd
    It might be worth reading an RxJava 2 book: https://www.packtpub.com/application-development/learning-rxjava
    Ozzy Osborne
    @BarDweller
    Indeed.. probably a sane suggestion.
    I'm playing with WebFlux, RxJava and Lagom all at the same time..
    not in the same app, thankfully
    but in parallel, trying to do the same thing in each
    Yannick Lecaillez
    @ylecaillez
    Hi guys !
    Just got a weird issues on my project. The source of it lies in BaseEmitter.complete() which invokes the serial.dipose().
    This has the side effect of invoking the callback registered with setCancelled(). This is quite surprising and looks like a bug to me ?
    David Karnok
    @akarnokd
    @ylecaillez So you want your resource to leak?
    Yannick Lecaillez
    @ylecaillez
    Well, at least i would have expected setCancelled() to behave differently then setDisposable())
    when response.cancel() is invoked i want to send a CANCEL request. But surely i don't want to send a CANCEL request when the response is completed.
    i manage to solve that using .doOnCancel(), but i found the method naming setCancelled() to be a bit misleading
    David Karnok
    @akarnokd
    There is no such method setCancelled.
    Yannick Lecaillez
    @ylecaillez
    Sorry, it is on FlowableEmitter
    and it is named setCancellable
    David Karnok
    @akarnokd
    Disposable and Cancellable are logically equivalent. Cancellable.cancel() allows the implementor to throw a checked exception. They are there to allow resource cleanup when the emitter is terminated or cancelled.
    Why don't want you to send a CANCEL to whatever source when your generator decides to end the sequence?
    Yannick Lecaillez
    @ylecaillez
    I have a RequestState which can be shared amongst multiple request (see it like a transaction). this request state manage the cancellation of the whole transaction.
    If RequestState.isCancelled() return true, subsequent request sharing the same RequestState will signal onError(CancelException) on the response Flowable.
    I've used setCancellable() to plug the RxJava cancel() to this RequestState.
    Not sure this is clear. But my bug was that on the first request completed, the RequestState.cancel() was invoked. The subsequent request were always cancelled without "reason".
    the FlowableEmitter.setCancellable() looks more like a Flowable.doOnTerminate() to me
    FloableEmitter.doOnTerminate()
    David Karnok
    @akarnokd
    It acts like doFinally actually. It is on purpose because most use cases involving a resource would leak that resource when the generator terminates. It sounds like you used the wrong abstraction or didn't properly anticipate the sharing of that request state.
    Yannick Lecaillez
    @ylecaillez
    What do you mean by "used the wrong abstraction" ?
    RequestStare are designed to be shared. Everything works fine if i bridge the RequestState using the doOnCancel()
    Flowable.create(...).doOnCancel(requestState::cancel) worked while Flowable.create(emitter -> emitter.setCancellable(requestState::cancel)) didn't
    My point is just that setCancellable() is a misleading method name given that it is invoked when flowable is cancel() but also when onComplete() or onError().
    David Karnok
    @akarnokd
    Why would you complete for some consumers in the create() while not for others?
    Yannick Lecaillez
    @ylecaillez
    Sorry i don't understand your question. Each requests shared by the RequestState have its own Flowable.create()
    because each requests is producing a Flowable<Response>
    once a request is onComplete(), a Flowable.concat(), subscribe to the next one. If the RequestSTate is cancelled, then the processing will throw and stop subsequent request to be subscribed. Otherwise, the request will produce result, then onComplete(), then the Flowable.concat() will subscribe to the next one. And so on
    David Karnok
    @akarnokd
    So is setDisposable. The behavior then is provided by the Emitter. I don't see any case for misleading naming, just misunderstanding. The main use case for setCancellable is to release resources created within the emitter callback which are scoped for the duration of the emission. If you have external resources that are shared, you'll need other means to release them considering how a solution would interact with it.
    Yannick Lecaillez
    @ylecaillez
    But why "Cancellable" then ? Why not "setFinally" or "setFinalizer" or whatever ? I mean cancel() has a special meaning in Flowable. This is the thing i found misleading
    But of course YMMV, this is just a remark from a RxJava user :)
    I should not have used setCancellable() for my use case, this is an error on my side. BUT i would probably not used it at the first place if it was named "setFinalizer" or sthg like that.
    Yannick Lecaillez
    @ylecaillez
    Note that the javadoc of FlowbableEmitter.isCancelled() is also misleading: Returns true if the downstream cancelled the sequence.
    It'll also returns true if the stream is terminated (onComplete/onError)