RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Flowable.range(2, 200_000_000)
.groupBy(e -> {
// First 100 000 000 in different group
if (e < 100_000_000) {
return e;
}
// Last 100 000 000 in the same group
return 1;
})
.subscribe(g -> {
if (g.getKey() == 1) {
g.subscribe(TestSubscriber.create(0));
} else {
g.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
if (t.intValue() % 100_000 == 0) {
System.out.println("Received: " + t);
}
Schedulers.computation().scheduleDirect(this::cancel);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
});
You have to be patient but when 100.000.000 is reached, the following 100.000.000 will be queued internally by groupBy() because of the double request() bug (?)var code
Code
I have a need to do something once a stream is subscribed to for the first time. I have written the following transformer to do it:
public static <T> Observable.Transformer<T, T> doOnFirstSubscribe(final Action0 func) {
return new Observable.Transformer<T, T>() {
boolean firstSubscription = true;
@Override
public Observable<T> call(Observable<T> input) {
return input.doOnSubscribe(() -> {
if (firstSubscription) {
func.call();
firstSubscription = false;
}
});
}
};
}
i've written tests for it and they pass however during usage in my application it appears to call func when the stream is resubscribed to.
does my transformer seem correct?
doOnComplete
, doOnSubscribe
and doFinally
never called?// val obs = createObservable()
val obs = createcompletable()
@JvmStatic fun main(args: Array<String>) {
this.obs.doOnSubscribe {
println("on subscribe!")
}
this.obs.doOnComplete {
println("complete!")
}
this.obs.doFinally{
println("dofinally!")
}
this.obs.subscribe {
println("subscribe!")
}
}
fun createcompletable(): Completable =
Completable.create{ emitter ->
println("calling oncomplete")
emitter.onComplete()
}
fun createObservable(): Observable<Void> =
// Observable.defer {
Observable.create<Void> { emitter ->
println("calling oncomplete")
emitter.onComplete()
}
// }
subscribe
block is called only when using Completable
, with Observable<Void>
not even that is called