RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
public class App {
private Observer<Observable<Integer>> ob;
public App() {
PublishSubject<Observable<Integer>> sub = PublishSubject.create();
rx.Observable.merge(sub, 10).subscribe(System.out::println);
ob = sub;
}
public void put(Observable<Integer> t) {
ob.onNext(t);
}
public static void main(String[] args) {
App app = new App();
Observable.range(1, 100).subscribe(integer -> {
app.put(Observable.just(integer));
});
}
}
public class App {
private Observer<Observable<Integer>> ob;
public App() {
PublishSubject<Observable<Integer>> sub = PublishSubject.create();
rx.Observable.merge(sub).buffer(10).subscribe(System.out::println);
ob = sub;
}
public void put(Observable<Integer> t) {
ob.onNext(t);
}
public static void main(String[] args) {
App app = new App();
Observable.range(1, 100).subscribe(integer -> {
app.put(Observable.just(integer));
});
}
}
Hello, everyone.
Can someone please explain why do the following two snippets of code behave differently?
Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber))
.flatMap(
b -> Observable.<Integer>create(subscriber -> {
subscriber.onNext(1);
subscriber.onCompleted();
}).subscribeOn(Schedulers.io()) // to imitate async request
)
.subscribe(System.out::println);
Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted))
.flatMap(
b -> Observable.<Integer>create(subscriber -> {
subscriber.onNext(1);
subscriber.onCompleted();
}).subscribeOn(Schedulers.io()) // to imitate async request
)
.subscribe(System.out::println);
When I run the first one it doesn’t print anything to the console, but the second one prints 1
as expected.
delaySubscription
but for unsubscription? I'd like to be able to have the subscriber be effectively unsubscribed, but the observable should not be unsubscribed immediately (and sent data should be sent to oblivion). Use case: a GPS observable that takes time to acquire a fix initially that is share()
d. A delayed unsubscription would let the GPS active for a few seconds in case another subscriber gets interested fast enough. I guess that could be a variant of refCount()
too, that would take a delay before unsubscribing.
newObservable = observable.share().lift(new DelayedUnsubscription(5, TimeUnit.SECONDS))
, and I then subscribe to newObservable
)
onNext
will have on a PublishSubject
after PublishSubject#onCompleted
has been called?
onNext
item be emitted, will it fail to be emitted silently, or will the call fail with an exception?