Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Yannick Lecaillez
    @ylecaillez
    when response.cancel() is invoked i want to send a CANCEL request. But surely i don't want to send a CANCEL request when the response is completed.
    i manage to solve that using .doOnCancel(), but i found the method naming setCancelled() to be a bit misleading
    David Karnok
    @akarnokd
    There is no such method setCancelled.
    Yannick Lecaillez
    @ylecaillez
    Sorry, it is on FlowableEmitter
    and it is named setCancellable
    David Karnok
    @akarnokd
    Disposable and Cancellable are logically equivalent. Cancellable.cancel() allows the implementor to throw a checked exception. They are there to allow resource cleanup when the emitter is terminated or cancelled.
    Why don't want you to send a CANCEL to whatever source when your generator decides to end the sequence?
    Yannick Lecaillez
    @ylecaillez
    I have a RequestState which can be shared amongst multiple request (see it like a transaction). this request state manage the cancellation of the whole transaction.
    If RequestState.isCancelled() return true, subsequent request sharing the same RequestState will signal onError(CancelException) on the response Flowable.
    I've used setCancellable() to plug the RxJava cancel() to this RequestState.
    Not sure this is clear. But my bug was that on the first request completed, the RequestState.cancel() was invoked. The subsequent request were always cancelled without "reason".
    the FlowableEmitter.setCancellable() looks more like a Flowable.doOnTerminate() to me
    FloableEmitter.doOnTerminate()
    David Karnok
    @akarnokd
    It acts like doFinally actually. It is on purpose because most use cases involving a resource would leak that resource when the generator terminates. It sounds like you used the wrong abstraction or didn't properly anticipate the sharing of that request state.
    Yannick Lecaillez
    @ylecaillez
    What do you mean by "used the wrong abstraction" ?
    RequestStare are designed to be shared. Everything works fine if i bridge the RequestState using the doOnCancel()
    Flowable.create(...).doOnCancel(requestState::cancel) worked while Flowable.create(emitter -> emitter.setCancellable(requestState::cancel)) didn't
    My point is just that setCancellable() is a misleading method name given that it is invoked when flowable is cancel() but also when onComplete() or onError().
    David Karnok
    @akarnokd
    Why would you complete for some consumers in the create() while not for others?
    Yannick Lecaillez
    @ylecaillez
    Sorry i don't understand your question. Each requests shared by the RequestState have its own Flowable.create()
    because each requests is producing a Flowable<Response>
    once a request is onComplete(), a Flowable.concat(), subscribe to the next one. If the RequestSTate is cancelled, then the processing will throw and stop subsequent request to be subscribed. Otherwise, the request will produce result, then onComplete(), then the Flowable.concat() will subscribe to the next one. And so on
    David Karnok
    @akarnokd
    So is setDisposable. The behavior then is provided by the Emitter. I don't see any case for misleading naming, just misunderstanding. The main use case for setCancellable is to release resources created within the emitter callback which are scoped for the duration of the emission. If you have external resources that are shared, you'll need other means to release them considering how a solution would interact with it.
    Yannick Lecaillez
    @ylecaillez
    But why "Cancellable" then ? Why not "setFinally" or "setFinalizer" or whatever ? I mean cancel() has a special meaning in Flowable. This is the thing i found misleading
    But of course YMMV, this is just a remark from a RxJava user :)
    I should not have used setCancellable() for my use case, this is an error on my side. BUT i would probably not used it at the first place if it was named "setFinalizer" or sthg like that.
    Yannick Lecaillez
    @ylecaillez
    Note that the javadoc of FlowbableEmitter.isCancelled() is also misleading: Returns true if the downstream cancelled the sequence.
    It'll also returns true if the stream is terminated (onComplete/onError)
    David Karnok
    @akarnokd

    The governing rule here is 1.6:

    "If a Publisher signals either onError or onComplete on a Subscriber, that Subscriber’s Subscription MUST be considered cancelled.
    The intent of this rule is to make sure that a Subscription is treated the same no matter if it was cancelled, the Publisher signalled onError or onComplete."

    People coming from different backgrounds tend to get confused by different naming. There is often discussion how to name components so that it doesn't imply too much but doesn't go under the radar either. setCancellable follows the typical bean-setter naming indicating you set some object on this emitter.

    Yannick Lecaillez
    @ylecaillez
    Regarding 1.6: i agree but there is a difference on considering the Subscription cancelled and invoking cancel() in onError(), onComplete().
    I agree that the naming makes sense, i just fall in a trap :)
    David Karnok
    @akarnokd
    I've posted ReactiveX/RxJava#5844 to clarify the properties of the Emitter interfaces.
    Yannick Lecaillez
    @ylecaillez
    That's absolutely perfect !
    Thank you so much :)
    Plomipu Dmitri
    @EstoLimon_twitter
    Is there has chat room only about basics of java and jvm here ?
    m0rgan
    @m0rgan_twitter
    Hi guys!
    (and girls)
    I'm new here and in the Rx(Java/Swift) world, and I have a very dummy question :)
    I want to create a PublishSubject, but how can I, upon declaration, specify actions that should be done when an observer subscribes, or when the subject gets disposed?
    Thanks!
    Mostly the disposing part
    I tried this:
    BehaviorSubject<T> valueSubject = (BehaviorSubject<T>) BehaviorSubject.create(new ObservableOnSubscribe<T>() { @Override public void subscribe(ObservableEmitter<T> emitter) throws Exception { // do something here } });
    But I got a io.reactivex.internal.operators.observable.ObservableCreate cannot be cast to io.reactivex.subjects.BehaviorSubject
    m0rgan
    @m0rgan_twitter
    I'm feeling it isn't the right approach
    m0rgan
    @m0rgan_twitter
    I don't know how to respond to that
    David Karnok
    @akarnokd
    Based on your question, you should read a bit about RxJava and Subjects first.
    m0rgan
    @m0rgan_twitter
    That is what I have been doing and am currently doing, I just thought I could find some help here, my bad
    Thank you anyway
    David Karnok
    @akarnokd
    Subjects are self-contained components of RxJava and you can only create them via their own create method. create(ObservableOnSubscribe) is defined on Observable which creates an Observable, a cold source and will have nothing to do with the subject type. Many operators in RxJava are named after the behavior they do in an everyday language, such as "actions done when an observer subscribes" -> doOnSubscribe, when the subscription gets disposed: doOnDispose. Subjects themselves are not disposed.
    Based on the misunderstanding your question implies, I wanted to encourage you to familiarize yourself with the concepts and components of RxJava first, so the answer, which invokes more concepts and components, can be understood better. For this, I suggest reading Learning RxJava.
    m0rgan
    @m0rgan_twitter
    Thank you David, that helped a lot
    Grigorii Tkachuk
    @tkgreg
    Hi guys! I have next question, I'm trying to use rxjava because I heard a lot that it's easy to handle situation when your api under high pressure on your service layer. I decided to try but it looks like I do not understand main idea. So imagine I have next method which exposed thru the REST endpoint @RequestMapping(method = RequestMethod.POST, value = "/pushData", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public Single<String> pushData(@RequestBody String json) How I can slowdown a pressure on the logic inside this method? Is it a right way of thinking or I should slow it down the level of Undertow/Jetty/Netty by applying some strategy on it.
    David Karnok
    @akarnokd
    @tkgreg Single implies a single result so backpressure is not really at play there. If you want to reduce the call frequency to pushData itself, there are ways for it, such as using concatMap or flatMap with concurrency limit, spanning out the source events over time, etc. You could also research Retrofit use cases similar to your problem.
    Grigorii Tkachuk
    @tkgreg
    @akarnokd what is Retrofit?
    public Single<String> pushJsonDataAsync(String json) {
            return Single.create(source -> {
                try {
                    PushCrawlerDataRequest.Builder request = PushCrawlerDataRequest.newBuilder();
                    JsonFormat.parser().merge(json, request);
                    PushCrawlerDataResponse.Builder builder = pushData(request.build());
                    source.onSuccess(JsonFormat.printer().print(builder));
                } catch (Exception ex) {
                    source.onError(ex);
                }
            });
        }
    this is how I'm trying to use it right now