RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
http://reactivex.io/documentation/operators/merge.html
Instead of passing multiple Observables (up to nine) into merge, you could also pass in a List<> (or other Iterable) of Observables, an Array of Observables, or even an Observable that emits Observables, and merge will merge their output into the output of a single Observable:
If you pass in an Observable of Observables, you have the option of also passing in a value indicating to merge the maximum number of those Observables it should attempt to be subscribed to simultaneously. Once it reaches this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.Javadoc: merge(Observable<Observable>)
Javadoc: merge(Observable<Observable>,int)
public class App {
private Observer<Observable<Integer>> ob;
public App() {
PublishSubject<Observable<Integer>> sub = PublishSubject.create();
rx.Observable.merge(sub, 10).subscribe(System.out::println);
ob = sub;
}
public void put(Observable<Integer> t) {
ob.onNext(t);
}
public static void main(String[] args) {
App app = new App();
Observable.range(1, 100).subscribe(integer -> {
app.put(Observable.just(integer));
});
}
}
public class App {
private Observer<Observable<Integer>> ob;
public App() {
PublishSubject<Observable<Integer>> sub = PublishSubject.create();
rx.Observable.merge(sub).buffer(10).subscribe(System.out::println);
ob = sub;
}
public void put(Observable<Integer> t) {
ob.onNext(t);
}
public static void main(String[] args) {
App app = new App();
Observable.range(1, 100).subscribe(integer -> {
app.put(Observable.just(integer));
});
}
}
Hello, everyone.
Can someone please explain why do the following two snippets of code behave differently?
Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber))
.flatMap(
b -> Observable.<Integer>create(subscriber -> {
subscriber.onNext(1);
subscriber.onCompleted();
}).subscribeOn(Schedulers.io()) // to imitate async request
)
.subscribe(System.out::println);
Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted))
.flatMap(
b -> Observable.<Integer>create(subscriber -> {
subscriber.onNext(1);
subscriber.onCompleted();
}).subscribeOn(Schedulers.io()) // to imitate async request
)
.subscribe(System.out::println);
When I run the first one it doesn’t print anything to the console, but the second one prints 1
as expected.
delaySubscription
but for unsubscription? I'd like to be able to have the subscriber be effectively unsubscribed, but the observable should not be unsubscribed immediately (and sent data should be sent to oblivion). Use case: a GPS observable that takes time to acquire a fix initially that is share()
d. A delayed unsubscription would let the GPS active for a few seconds in case another subscriber gets interested fast enough. I guess that could be a variant of refCount()
too, that would take a delay before unsubscribing.
newObservable = observable.share().lift(new DelayedUnsubscription(5, TimeUnit.SECONDS))
, and I then subscribe to newObservable
)