RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
newObservable = observable.share().lift(new DelayedUnsubscription(5, TimeUnit.SECONDS))
, and I then subscribe to newObservable
)
onNext
will have on a PublishSubject
after PublishSubject#onCompleted
has been called?
onNext
item be emitted, will it fail to be emitted silently, or will the call fail with an exception?
onNext [...] The Observable will not call this method again after it calls either Observer.onCompleted() or Observer.onError(java.lang.Throwable).
Hi, I have some flow which I can't resolve with provided operations. Or I just don't see something important.
I have sequence A
which produces sequence B
. Then later I combine them with combineLatest
operator, and get two notifies on each A
update. But I want one and latest.
Short illustration:
Observable<Long> o1 = Observable.interval(1, TimeUnit.SECONDS).map(x -> x + 1);
Observable<Long> o2 = o1.map(v -> -v);
Observable.combineLatest(o1, o2, (v1, v2) -> "o1: " + v1 + ", o2: " + v2)
.subscribe(System.out::println);
Provides output like this:
o1: 1, o2: -1
o1: 1, o2: -2
o1: 2, o2: -2
o1: 3, o2: -2
o1: 3, o2: -3
o1: 3, o2: -4
o1: 4, o2: -4
o1: 4, o2: -5
o1: 5, o2: -5
Ok. I missed one detail. B
can be merged with C
Observable<Long> a = Observable.interval(1, TimeUnit.SECONDS).map(x -> x + 1);
Observable<Long> b = a.map(v -> -v);
Observable<Long> c = Observable.interval(500, TimeUnit.MILLISECONDS).map(x -> x + 1000);
Observable.zip(a, Observable.merge(b, c), (v1, v2) -> "o1: " + v1 + ", o2: " + v2).subscribe(System.out::println);
Then zip
does wrong job too
combineLatestRight
. Does that exists in RxJava?
PublishSubject<Integer> seq = PublishSubject.create();
Observable<Integer> seq2= seq.map(x ->{
timesCalled += 1;
return x * 2;
});
seq2.subscribe(e -> System.out.println("Hello 1: " + e));
seq2.subscribe(e -> System.out.println("Hello 2: " + e));
seq.onNext(1);
System.out.println("Times map called: " + timesCalled);
This code shows that map
in this case called twice. (Is that a error or not?). And I get 2 different objects in each subscription, expected same.
publish
creates ConnectableObservable
. Which waits connect
before emitting somethig. Sounds like not really aimed at current problem, right? Or I get it wrong? Or I need publish just to create new observable which keeps needed objects.
map
it again by the way