RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
onBackpressureBuffer
where you can specify the overflow behavior.Hey! Are there some operators with which I can achieve something described in the following?
For example I have a inheritance heriachy like: Condition as parent and ConditionA, ConditionB, ConditionC are children of Condition.
Than I have a Method which looks like this:
public Observable<Condition> getCondition() {...}
which returns indefinitely Condition's.
The goal would be a chaining an operator which caches the Condition's and when there is one available of each (ConditionA, ConditionB, ConditionC),
it should should filter them as described in the filter.
Maybe something like this:
getCondition()
.cacheFilter<ConditionA, ConditionB, ConditionC>((a, b, c) -> a.value == 0 && b.value == 0 && c.value == 0)
.do(somethingA())
.cacheFilter<ConditionA, ConditionB, ConditionC>((a, b, c) -> a.value == 0 && b.value == 1 && c.value == 1)
.do(somethingB())
.cacheFilter<ConditionA, ConditionB, ConditionC>((a, b, c) -> a.value == 1 && b.value == 0 && c.value == 0)
.do(somethingB())
.subscribe();
combineLatest(conditionA, conditionB, conditionC)
would work best. You also seem to be doing a.value
everywhere, you could add in a .map(a -> a.value)
so you can look at the value directly later.do
or subscribe
block where you test all conditions one by one, or you can simply subscribe to the condition stream multiple times, and filter out the correct elements. Myself i would then do .filter((arr) -> !arr[0]).filter((arr) -> !arr[1]).filter((arr) -> !arr[2])
instead of !arr[0] && !arr[1] && !arr[2]
.share()
..publish(selector)
, like this:getCondition().publish(_conditions -> Observable.merge(
_conditions.filter(a -> a[0] == 0).filter(a -> a[1] == 0).filter(a -> a[2] == 0).do(() -> sometihngA()),
_conditions.filter(a -> a[0] == 0).filter(a -> a[1] == 1).filter(a -> a[2] == 1).do(() -> sometihngB()),
_conditions.filter(a -> a[0] == 0).filter(a -> a[1] == 1).filter(a -> a[2] == 0).do(() -> sometihngC())
)).subscribe();
&&
style would be just as good :)
.toFlowable(BackpressureStrategy.MISSING)
hide
Subject
you can do a onNext
Observer
s)
onNext
MissingBackPressureException
which isn't picked up by my unit tests. How do I debug this?Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:353)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:319)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OperatorSampleWithTime$SamplerSubscriber.emitIfNonEmpty(OperatorSampleWithTime.jav
at rx.internal.operators.OperatorSampleWithTime$SamplerSubscriber.call(OperatorSampleWithTime.java:98)
Subject
as the Subscriber
to an Observable
, i. e. "pipe" the output of the Observable
into the Subject
. I can't find a way to do this in RxJava2 - subscribe(Observer)
does accept the Subject
, but does not return a Disposable
, and since Subject
is not a DisposableObserver
, I can't use subscribeWith
either... Should I just wrap my Subject
in a DisposableObserver
before subscribing or is there a simpler way I am missing?
PublishSubject.<Message>create().toSerialized()