RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Hello, everyone.
Can someone please explain why do the following two snippets of code behave differently?
Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber))
.flatMap(
b -> Observable.<Integer>create(subscriber -> {
subscriber.onNext(1);
subscriber.onCompleted();
}).subscribeOn(Schedulers.io()) // to imitate async request
)
.subscribe(System.out::println);
Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted))
.flatMap(
b -> Observable.<Integer>create(subscriber -> {
subscriber.onNext(1);
subscriber.onCompleted();
}).subscribeOn(Schedulers.io()) // to imitate async request
)
.subscribe(System.out::println);
When I run the first one it doesn’t print anything to the console, but the second one prints 1
as expected.
delaySubscription
but for unsubscription? I'd like to be able to have the subscriber be effectively unsubscribed, but the observable should not be unsubscribed immediately (and sent data should be sent to oblivion). Use case: a GPS observable that takes time to acquire a fix initially that is share()
d. A delayed unsubscription would let the GPS active for a few seconds in case another subscriber gets interested fast enough. I guess that could be a variant of refCount()
too, that would take a delay before unsubscribing.
newObservable = observable.share().lift(new DelayedUnsubscription(5, TimeUnit.SECONDS))
, and I then subscribe to newObservable
)
onNext
will have on a PublishSubject
after PublishSubject#onCompleted
has been called?
onNext
item be emitted, will it fail to be emitted silently, or will the call fail with an exception?
onNext [...] The Observable will not call this method again after it calls either Observer.onCompleted() or Observer.onError(java.lang.Throwable).
Hi, I have some flow which I can't resolve with provided operations. Or I just don't see something important.
I have sequence A
which produces sequence B
. Then later I combine them with combineLatest
operator, and get two notifies on each A
update. But I want one and latest.
Short illustration:
Observable<Long> o1 = Observable.interval(1, TimeUnit.SECONDS).map(x -> x + 1);
Observable<Long> o2 = o1.map(v -> -v);
Observable.combineLatest(o1, o2, (v1, v2) -> "o1: " + v1 + ", o2: " + v2)
.subscribe(System.out::println);
Provides output like this:
o1: 1, o2: -1
o1: 1, o2: -2
o1: 2, o2: -2
o1: 3, o2: -2
o1: 3, o2: -3
o1: 3, o2: -4
o1: 4, o2: -4
o1: 4, o2: -5
o1: 5, o2: -5
Ok. I missed one detail. B
can be merged with C
Observable<Long> a = Observable.interval(1, TimeUnit.SECONDS).map(x -> x + 1);
Observable<Long> b = a.map(v -> -v);
Observable<Long> c = Observable.interval(500, TimeUnit.MILLISECONDS).map(x -> x + 1000);
Observable.zip(a, Observable.merge(b, c), (v1, v2) -> "o1: " + v1 + ", o2: " + v2).subscribe(System.out::println);
Then zip
does wrong job too
combineLatestRight
. Does that exists in RxJava?