Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Serban Balamaci
    @balamaci
    so the only backpressure action happens in onBackpressureDrop
    David Karnok
    @akarnokd
    Serban Balamaci
    @balamaci
    Hi @akarnokd , I know about the overloaded onBackpressureBuffer, I just wanted to validate that it makes no sense to chain onBackpresureXXX one after another
    and I also think it makes no sense to have any BackpressureStrategy other than MISSING in Flowable.create() if you plan to follow it with an onBackpressureXXX.
    Serban Balamaci
    @balamaci
    Incidently it seem in reactor-core there is no equivalent of the rxjava's variant of onBackpressure with and Action and an overflowStrategy
    David Karnok
    @akarnokd
    They are a bit behind with appropriating features from RxJava 2, even so they have now two dedicated and financed persons for that.
    huirong628
    @huirong628
    hello,everyone
    Wolfhard Prell
    @Cir0X

    Hey! Are there some operators with which I can achieve something described in the following?

    For example I have a inheritance heriachy like: Condition as parent and ConditionA, ConditionB, ConditionC are children of Condition.
    Than I have a Method which looks like this:

    public Observable<Condition> getCondition() {...}

    which returns indefinitely Condition's.

    The goal would be a chaining an operator which caches the Condition's and when there is one available of each (ConditionA, ConditionB, ConditionC),
    it should should filter them as described in the filter.
    Maybe something like this:

    getCondition()
        .cacheFilter<ConditionA, ConditionB, ConditionC>((a, b, c) -> a.value == 0 && b.value == 0 && c.value == 0)
            .do(somethingA())
        .cacheFilter<ConditionA, ConditionB, ConditionC>((a, b, c) -> a.value == 0 && b.value == 1 && c.value == 1)
            .do(somethingB())
        .cacheFilter<ConditionA, ConditionB, ConditionC>((a, b, c) -> a.value == 1 && b.value == 0 && c.value == 0)
            .do(somethingB())
        .subscribe();
    Ann
    @WeiWW
    Hello!
    Dorus
    @Dorus
    @Cir0X Instead of emitting all conditions on 1 stream, you should have 3 streams and combine these streams. Could be done with a number of operators, my gut feeling is combineLatest(conditionA, conditionB, conditionC) would work best. You also seem to be doing a.value everywhere, you could add in a .map(a -> a.value) so you can look at the value directly later.
    Next there are a few options to act on this stream. I wouldn't filter if you want to do all operations on the same stream, as filter takes out elements and you wont be able to act on them after that. Instead you can make a large do or subscribe block where you test all conditions one by one, or you can simply subscribe to the condition stream multiple times, and filter out the correct elements. Myself i would then do .filter((arr) -> !arr[0]).filter((arr) -> !arr[1]).filter((arr) -> !arr[2]) instead of !arr[0] && !arr[1] && !arr[2].
    If you subscribe multiple times, that means the source stream will also run multiple times. Depending on the source, you might want to multicast instead. You can do that with share().
    Another interesting solution would be to use .publish(selector), like this:
    getCondition().publish(_conditions -> Observable.merge(
        _conditions.filter(a -> a[0] == 0).filter(a -> a[1] == 0).filter(a -> a[2] == 0).do(() -> sometihngA()),
        _conditions.filter(a -> a[0] == 0).filter(a -> a[1] == 1).filter(a -> a[2] == 1).do(() -> sometihngB()),
        _conditions.filter(a -> a[0] == 0).filter(a -> a[1] == 1).filter(a -> a[2] == 0).do(() -> sometihngC())
      )).subscribe();
    Mmm looking back i think the && style would be just as good :)
    Alex Reisberg
    @a-reisberg
    Quick question: the rxjava 1.* has this .asObservable for subjects
    rxjava 2.* doesn't have that anymore
    Thanks!
    so if I have a subject and I want to turn it to an observable, what's a good way to do it?
    Serban Balamaci
    @balamaci
    you can make it a Flowable which is basically an Observable with a backpressure strategy by doing
                .toFlowable(BackpressureStrategy.MISSING)
    or whatever overflow strategy you prefer maybe .toFlowable(BackpressureStrategy.BUFFER)
    Alex Reisberg
    @a-reisberg
    Thanks!
    I just found hide
    can I use that too?
    Serban Balamaci
    @balamaci
    I don't know what hide does, I think it prevents rxjava to do some optimizations like operator fusion, so it should not relate to what you want to do.
    Alex Reisberg
    @a-reisberg
    I see. All I really want to do is to hide the publish aspect of my Subject
    Serban Balamaci
    @balamaci
    I don't understand, both Subject, Observable, Flowables are some kind of Publishers as they allow subscribing to them. I understand that Observables are kind of deprecated in rxjava2 in favor of Flowables which force you to think about backpressure upfront(you need to specify a backpressure strategy when you create them), so why would you need an Observable. Anyway looks like a Flowable can simply be turned into an Observable by doing flowable.toObservable
    Alex Reisberg
    @a-reisberg
    I guess I'm trying to move from Rx 1 and haven't fully understood exactly what the difference is
    also my question was unclear. On a Subject you can do a onNext
    (which is essentially publishing to other Observers)
    and I wanted to hide the onNext
    I didn't know that Observables are in the process of being deprecated?
    Justin Tuchek
    @justintuchek
    I highly doubt Observable is on any path to being deprecated. It’s just different from Flowable, where it wouldn’t make sense to discard events (such of cursor movements).
    Serban Balamaci
    @balamaci
    I'd say not deprecated, but not much different than Flowable as I do not see reasons where I'd want to use Observables instead of Flowables(which have the benefit that you are aware of backpressure and what the backpressure strategy is and you won't be surprised at runtime by a MissingBackpressuException)
    Justin Tuchek
    @justintuchek
    From reading the Observable source code from 2.x.x it seems to be utilizing an unbounded amount for input events - and from the docs of 2.0 it seems like you are going to hit an OOM instead.
    Serban Balamaci
    @balamaci
    I think so, then it would be equivalent to a Flowable if we'd do a .toFlowable(BackpressureStrategy.BUFFER)
    which is also unbounded
    but at least you know upfront you might expect an OOM
    Justin Tuchek
    @justintuchek
    The @BackPressureSupport(…) and @SchedulerSupport(…) annotations are really helpful when going through the code :thumbsup:
    Serban Balamaci
    @balamaci
    true
    David Karnok
    @akarnokd
    @a-reisberg A Subject is already an Observable so you don't have to do anything. asObservable has been renamed to hide in 2.x and it adds an extra layer so casting back to Subject no longer works. Just define your code as Observable<T> someMethod() { return subject; ).
    Alex Reisberg
    @a-reisberg
    @akarnokd I did end up using hide. The reason is that I want to use onNext internally and only allow others to subscribe. Thanks a lot!
    AriMeidan
    @MeidanAri_twitter
    Hi, I have question.
    I Have this scenario - I want to make a remote call and get results, I want to do something with the data, and then notify to subscribers that this call has completed.
    I dont want that every subscriber will be able to make this remote call, So i want someone to request the call and other to subscribe to its completion.
    David Karnok
    @akarnokd
    @MeidanAri_twitter use .cache()
    Luciano
    @lJoublanc
    Got a ~10k line project , and just received MissingBackPressureException which isn't picked up by my unit tests. How do I debug this?
    Caused by: rx.exceptions.MissingBackpressureException
            at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:353)
            at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:319)
            at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
            at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
            at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
            at rx.internal.operators.OperatorSampleWithTime$SamplerSubscriber.emitIfNonEmpty(OperatorSampleWithTime.jav
            at rx.internal.operators.OperatorSampleWithTime$SamplerSubscriber.call(OperatorSampleWithTime.java:98)
    DavidMihola
    @DavidMihola
    Hi, in RxJava(2), is there something like Guava's Optional.fromNullable(T)- I would need this quite often to wrap null values in Maybe.empty() and all others in Maybe.just()...
    Luciano
    @lJoublanc
    In scala you can do Observable from Option(possiblyNullValue) as Option implements Iterable. Not sure about java though.
    Alex Reisberg
    @a-reisberg
    This is an rxandroid question but since there's no dedicated gitter channel. I got the following proguard error:
    Warning: org.jctools.maps.ConcurrentAutoTable$CAT: can't find referenced class sun.misc.Unsafe
    is it safe to issue a dontwarn for it?
    DavidMihola
    @DavidMihola
    @lJoublanc Thanks - however, I can't think of anything like this in Java/on Android.
    DavidMihola
    @DavidMihola
    Also, while I'm here: In RxJava1 I would occasionally use a Subject as the Subscriber to an Observable, i. e. "pipe" the output of the Observable into the Subject. I can't find a way to do this in RxJava2 - subscribe(Observer) does accept the Subject, but does not return a Disposable, and since Subject is not a DisposableObserver, I can't use subscribeWith either... Should I just wrap my Subject in a DisposableObserver before subscribing or is there a simpler way I am missing?
    Ronen
    @ronenhamias
    is there a way to call
    ```
    PublishSubject.<Message>create().toSerialized()