RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
PublishSubject<Observable<T>>
, you can just call PublishSubject.OnNext(t)
inside put
. Do first subscribe to the PublishSubject
with PublishSubject.asObservable().Merge().Subscribe(...)
(doesn't strictly need the asObservable if you subscribe locally). Just make sure you don't expose the Subject anywhere.
Observer.Create()
, but that doesn't allow you to call merge
anymore. So you kinda need private Observer ob;
public constructor() {
PublishSubject<Observable<T>> sub = PublishSubject.create();
sub.merge().subscribe(...);
ob = sub.asObserver();
}
public void put(Observable<T> t) {
ob.onNext(t);
}
public class SubjectTest {
private static final SubjectTest INSTANCE = new SubjectTest();
private PublishSubject<Integer> subject;
public SubjectTest() {
subject = PublishSubject.create();
subject.buffer(10).subscribe(System.out::println);
}
void put(Observable<Integer> obs) {
obs.subscribe(integer -> subject.onNext(integer));
}
public static void main(String[] args) {
ExecutorService executor = Executors.newWorkStealingPool(10);
Observable.range(1, 100).subscribe(integer -> {
executor.execute(() -> INSTANCE.put(Observable.just(integer)));
});
}
}
Observable<Observable<Integer>>
to Observable<Integer>
. Mmm
merge(Observable<? extends Observable<? extends T>> source)
, but i'm not 100% sure how you call it.
http://reactivex.io/documentation/operators/merge.html
Instead of passing multiple Observables (up to nine) into merge, you could also pass in a List<> (or other Iterable) of Observables, an Array of Observables, or even an Observable that emits Observables, and merge will merge their output into the output of a single Observable:
If you pass in an Observable of Observables, you have the option of also passing in a value indicating to merge the maximum number of those Observables it should attempt to be subscribed to simultaneously. Once it reaches this maximum subscription count, it will refrain from subscribing to any other Observables emitted by the source Observable until such time as one of the already-subscribed-to Observables issues an onCompleted notification.Javadoc: merge(Observable<Observable>)
Javadoc: merge(Observable<Observable>,int)
public class App {
private Observer<Observable<Integer>> ob;
public App() {
PublishSubject<Observable<Integer>> sub = PublishSubject.create();
rx.Observable.merge(sub, 10).subscribe(System.out::println);
ob = sub;
}
public void put(Observable<Integer> t) {
ob.onNext(t);
}
public static void main(String[] args) {
App app = new App();
Observable.range(1, 100).subscribe(integer -> {
app.put(Observable.just(integer));
});
}
}
public class App {
private Observer<Observable<Integer>> ob;
public App() {
PublishSubject<Observable<Integer>> sub = PublishSubject.create();
rx.Observable.merge(sub).buffer(10).subscribe(System.out::println);
ob = sub;
}
public void put(Observable<Integer> t) {
ob.onNext(t);
}
public static void main(String[] args) {
App app = new App();
Observable.range(1, 100).subscribe(integer -> {
app.put(Observable.just(integer));
});
}
}
Hello, everyone.
Can someone please explain why do the following two snippets of code behave differently?
Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber))
.flatMap(
b -> Observable.<Integer>create(subscriber -> {
subscriber.onNext(1);
subscriber.onCompleted();
}).subscribeOn(Schedulers.io()) // to imitate async request
)
.subscribe(System.out::println);
Observable.<Boolean>create(subscriber -> Observable.just(true).subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted))
.flatMap(
b -> Observable.<Integer>create(subscriber -> {
subscriber.onNext(1);
subscriber.onCompleted();
}).subscribeOn(Schedulers.io()) // to imitate async request
)
.subscribe(System.out::println);
When I run the first one it doesn’t print anything to the console, but the second one prints 1
as expected.