RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Flowable.range(2, 200_000_000)
.groupBy(e -> {
// First 100 000 000 in different group
if (e < 100_000_000) {
return e;
}
// Last 100 000 000 in the same group
return 1;
})
.subscribe(g -> {
if (g.getKey() == 1) {
g.subscribe(TestSubscriber.create(0));
} else {
g.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
if (t.intValue() % 100_000 == 0) {
System.out.println("Received: " + t);
}
Schedulers.computation().scheduleDirect(this::cancel);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
});
You have to be patient but when 100.000.000 is reached, the following 100.000.000 will be queued internally by groupBy() because of the double request() bug (?)var code
Code
I have a need to do something once a stream is subscribed to for the first time. I have written the following transformer to do it:
public static <T> Observable.Transformer<T, T> doOnFirstSubscribe(final Action0 func) {
return new Observable.Transformer<T, T>() {
boolean firstSubscription = true;
@Override
public Observable<T> call(Observable<T> input) {
return input.doOnSubscribe(() -> {
if (firstSubscription) {
func.call();
firstSubscription = false;
}
});
}
};
}
i've written tests for it and they pass however during usage in my application it appears to call func when the stream is resubscribed to.
does my transformer seem correct?
doOnComplete
, doOnSubscribe
and doFinally
never called?// val obs = createObservable()
val obs = createcompletable()
@JvmStatic fun main(args: Array<String>) {
this.obs.doOnSubscribe {
println("on subscribe!")
}
this.obs.doOnComplete {
println("complete!")
}
this.obs.doFinally{
println("dofinally!")
}
this.obs.subscribe {
println("subscribe!")
}
}
fun createcompletable(): Completable =
Completable.create{ emitter ->
println("calling oncomplete")
emitter.onComplete()
}
fun createObservable(): Observable<Void> =
// Observable.defer {
Observable.create<Void> { emitter ->
println("calling oncomplete")
emitter.onComplete()
}
// }
subscribe
block is called only when using Completable
, with Observable<Void>
not even that is called
onNext
calls into try-catches. In fact, it is discouraged in RxJava 2 because it can misdirect exceptions from bugs. If you have something that can throw inside your onNext
, wrap that into try-catch and don't let it escape. Btw, are you learning RxJava 2?
Function
can throw so it is wrapped and managed. onNext
should not throw as the specification mandates thus is not wrapped.