Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ronen
    @ronenhamias
    and consumers subscribe on the subject
     @Nonnull
      @Override
      public final Observable<Message> listen() {
        checkState(!stopped, "Transport is stopped");
        return incomingMessagesSubject.onBackpressureBuffer(DEFAULT_BUFFER_LIMIT).asObservable();
      }
    currently using onBackpressureBuffer
    David Karnok
    @akarnokd
    I see. I don't know enough about Netty but RxNetty does seem to work without backpressure problems. Otherwise, you have to tie the reading of a channel to the request amount of the downstream by building a custom operator and not using PublishSubject.
    Ronen
    @ronenhamias
    i will look deeper into RxNetty
    Stanislav Shakirov
    @punksta

    Hello! I need to make request after complete of changing ui.
    pseudo-code

    someUiEvents()
    .debounce()
    .filter { it is EditComplete }
    .flatMap { doRequest }
    .doOnNext { updateUi }

    Can I cancel(unsubscribe) from doRequest if got new event?

    I can create subscription inside flatMap and cancel it every time I got emitting. But can it be solved ussing operators only?
    Dorus
    @Dorus
    @punksta yeah you can use switch for that.
    Ronen
    @ronenhamias
    does anyone knows good implementation for RX java with RabbitMQ?
    Renan Ferrari
    @renanferrari

    Hey guys!

    I'm trying to figure out how to wrap listeners that always calls its callbacks from a specific thread into an Observable that conforms to the Scheduler defined by .subscribeOn(). I have a detailed StackOverflow question here: http://stackoverflow.com/q/40853783/518179

    I've been trying to figure this one out for a couple of weeks now. Any help would be highly appreciated.

    Thank you!

    Dorus
    @Dorus
    @renanferrari try observeOn, or pass a scheduler to one of your operators.
    Renan Ferrari
    @renanferrari
    @Dorus That does not solve the problem.
    Dorus
    @Dorus
    @renanferrari Sorry i didnt read that close enough, will see if i can find some answers tomorrow (doesnt look like anyone else is around, weird normally you get more replies here). One question, do you want to change Firebase's SDK to use a different thread, or do you want to change Rx to make downstream operators use a different thread?
    Dorus
    @Dorus
    @renanferrari I looked into it a bit more, and it seems it's 100% pointless to use subscribeOn as firebase handles calling stuff in the background itself. However if you need to process data returned from firebase elsewhere (on another thread), you need to schedule it yourself. Use .observeOn for that.
    Can you explain why observeOn does not solve your problem?
    Renan Ferrari
    @renanferrari
    @Dorus You're right, .observeOn() does give me the ability to process the data returned by Firebase on another thread. That's what I'm doing already, but it just isn't the point of the question. The point is: when I pass a Scheduler through .subscribeOn() I expect the upstream to conform to that Scheduler's thread but that doesn't happen when the Observable has an internal listener that is being triggered from a callback on a different thread. When that happens, I lose the .subscribeOn() guarantee.
    The severity of this issue may not seem obvious at first, but what if that Observable was part of a library? What's the best practice there? Should the library enforce its clients to always call an .observeOn() after any call to that method? Should the library call an .observeOn() itself and call it a "default Scheduler"? In any of these cases the .subscribeOn() is just useless, and that doesn't seem right to me.
    @Dorus And thank you for the answer! I have updated my question to clarify that.
    Dorus
    @Dorus
    @renanferrari First of all, subscribeOn, somehow (and especially on RxJava) is a overused operator that rarely does what's expected. And indeed, as you have experienced, it doesn't do what you expected.
    In short, subscribeOn will register the events on the thread specified in subscribeOn, but that does not mean the callback from those events will happen on the same thread that was used ot register them. A proper library might indeed do so, but more often libraries wil use their own internal threads, or in case of Firebase, use the main thread apparently. This is a problem with Firebase, not Rx. If you implement something like this in Rx, it would be smart to use the trampoline sceduler (i.e. same thread scheduler) to prevent these problems.
    observeOn is merely a hotfix that will ensure the main thread isnt tied up long while you process an event.

    Oh and to come back to your question

    Should the library enforce its clients to always call an .observeOn() after any call to that method?

    No, instead it would be best practice for clients to not do any long computations on the event dispatcher. If you dont do anything long running, it's way more efficient to simply borrow the already active thread, as context switching is expensive on itself. If you do need to do long running computations, it's always a smart idea to schedule these in the background, preferably in parallelle.

    Renan Ferrari
    @renanferrari
    @Dorus I'm aware this is not a problem with RxJava, I just want to know how to make RxJava work, properly, with this kind of listener.
    Dorus
    @Dorus
    Oh and in Rx it's simple to do that: source.flatMap(e => longRunning(e), computationScheduler)
    David Karnok
    @akarnokd
    subscribeOn is provided as the means to move the subscription side effects to another thread. Many sources, such as most listener APIs don't have subscription side effects and subscribeOn is completely ineffective. When these sources emit, that can happen from any thread. UI-like sources emit from the main thread, dedicated sources emit from their own background thread. In order to process these the required place, you should use observeOn. In addition, you may want to run your own single-threaded scheduler via Schedulers.from(Executors.newSingleThreadedExecutor()) which guarantees observing on it from multiple stages will end up on the same thread in a serialized manner.
    Dorus
    @Dorus
    ^^ on the RxJs gitter people are always impressed with my knowledge, but in here @akarnokd words things so much better.
    David Karnok
    @akarnokd
    @Dorus how does that work in RxJS? Whenever a source signals, it queues that value and schedules a task to drain that particular queue on the given scheduler, right?
    Dorus
    @Dorus
    @akarnokd Well the Js people care surprisingly little about threading :)
    They always tell me Js is single-threaded.
    I'm a bit of a weirdo there too, because i know a lot about Rx so i'm good help there, but my Js knowledge is limited as i'm more of a Java/C# programmer in origine.
    David Karnok
    @akarnokd
    There is a logical evolution in the Rx ecosystem that works the other direction as well. For example, if you know the fusion-enabled RxJava 2 deep enough, you can leave off "features" and end up with Rx.NET or RxJS.
    Renan Ferrari
    @renanferrari
    @akarnokd Great explanation of .subscribeOn()! Thanks for that. I'm just not so sure if I understood your suggested solution with .observeOn() and also what you meant by "guarantees observing from multiple stages".
    David Karnok
    @akarnokd
    When you have observeOn() multiple times in a sequence because you route computation back and forth. Schedulers.computation() will hand out a Scheduler that provides separate thread (from a fixed set) to each use of observeOn and it is very unlikely two observeOn will run on the same thread.
    Serban Balamaci
    @balamaci
    Hi guys, just to confirm my understanding of the backpressure operators:
        Flowable<Integer> flowable = subject
                    .toFlowable(BackpressureStrategy.MISSING)
                    .onBackpressureBuffer(5)
                    .onBackpressureDrop(val -> log.info("Dropped {}", val));
            flowable = flowable.observeOn(Schedulers.io(), false, 3);
            subscribeWithSlowSubscriber(flowable);
    this is not working in a way like buffer 5 elements on overflow and then start dropping events
    because backpressureDrop subscribes to the previous backpressureBuffer operator and requests Long.MAX_VALUE from it, so backpressureBuffer always sees that the downstream is ready for all it has, so it never overflows
    Serban Balamaci
    @balamaci
    so the only backpressure action happens in onBackpressureDrop
    David Karnok
    @akarnokd
    Serban Balamaci
    @balamaci
    Hi @akarnokd , I know about the overloaded onBackpressureBuffer, I just wanted to validate that it makes no sense to chain onBackpresureXXX one after another
    and I also think it makes no sense to have any BackpressureStrategy other than MISSING in Flowable.create() if you plan to follow it with an onBackpressureXXX.
    Serban Balamaci
    @balamaci
    Incidently it seem in reactor-core there is no equivalent of the rxjava's variant of onBackpressure with and Action and an overflowStrategy
    David Karnok
    @akarnokd
    They are a bit behind with appropriating features from RxJava 2, even so they have now two dedicated and financed persons for that.
    huirong628
    @huirong628
    hello,everyone
    Wolfhard Prell
    @Cir0X

    Hey! Are there some operators with which I can achieve something described in the following?

    For example I have a inheritance heriachy like: Condition as parent and ConditionA, ConditionB, ConditionC are children of Condition.
    Than I have a Method which looks like this:

    public Observable<Condition> getCondition() {...}

    which returns indefinitely Condition's.

    The goal would be a chaining an operator which caches the Condition's and when there is one available of each (ConditionA, ConditionB, ConditionC),
    it should should filter them as described in the filter.
    Maybe something like this:

    getCondition()
        .cacheFilter<ConditionA, ConditionB, ConditionC>((a, b, c) -> a.value == 0 && b.value == 0 && c.value == 0)
            .do(somethingA())
        .cacheFilter<ConditionA, ConditionB, ConditionC>((a, b, c) -> a.value == 0 && b.value == 1 && c.value == 1)
            .do(somethingB())
        .cacheFilter<ConditionA, ConditionB, ConditionC>((a, b, c) -> a.value == 1 && b.value == 0 && c.value == 0)
            .do(somethingB())
        .subscribe();
    Ann
    @WeiWW
    Hello!
    Dorus
    @Dorus
    @Cir0X Instead of emitting all conditions on 1 stream, you should have 3 streams and combine these streams. Could be done with a number of operators, my gut feeling is combineLatest(conditionA, conditionB, conditionC) would work best. You also seem to be doing a.value everywhere, you could add in a .map(a -> a.value) so you can look at the value directly later.
    Next there are a few options to act on this stream. I wouldn't filter if you want to do all operations on the same stream, as filter takes out elements and you wont be able to act on them after that. Instead you can make a large do or subscribe block where you test all conditions one by one, or you can simply subscribe to the condition stream multiple times, and filter out the correct elements. Myself i would then do .filter((arr) -> !arr[0]).filter((arr) -> !arr[1]).filter((arr) -> !arr[2]) instead of !arr[0] && !arr[1] && !arr[2].
    If you subscribe multiple times, that means the source stream will also run multiple times. Depending on the source, you might want to multicast instead. You can do that with share().
    Another interesting solution would be to use .publish(selector), like this:
    getCondition().publish(_conditions -> Observable.merge(
        _conditions.filter(a -> a[0] == 0).filter(a -> a[1] == 0).filter(a -> a[2] == 0).do(() -> sometihngA()),
        _conditions.filter(a -> a[0] == 0).filter(a -> a[1] == 1).filter(a -> a[2] == 1).do(() -> sometihngB()),
        _conditions.filter(a -> a[0] == 0).filter(a -> a[1] == 1).filter(a -> a[2] == 0).do(() -> sometihngC())
      )).subscribe();
    Mmm looking back i think the && style would be just as good :)
    Alex Reisberg
    @a-reisberg
    Quick question: the rxjava 1.* has this .asObservable for subjects
    rxjava 2.* doesn't have that anymore
    Thanks!
    so if I have a subject and I want to turn it to an observable, what's a good way to do it?