RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Guys -- complete beginner here, still hasn't got a grasp of some of the concepts with ReactiveX.
So I have the following functions, all synchronous: a()
, b()
, c()
, d()
and e()
. Those functions are to be called once. b()
and c()
depend on a()
, d()
depends on b()
and e()
depends on c()
. I'd like this to happen: a->[b->d||c->e]
, arrow and ||
being series and parallel, respectively. I've got this so far:
Observable.defer(() -> Observable.just(a()))
.map(v -> {
Observable.just(b(v))
.map(v2 -> d(v2));
Observable.just(c(v))
.map(v3 -> e(v3))
return (Void) null;
});
So this is what I'd want to happen when the outer observable is subscribed to: a()
is called, returning v
. This is then fed into b()
and c()
, being mapped to d()
and e()
as they return. If everything goes well, no exceptions are thrown and the subscriber receives null
. However, this is not what happens, and just
is called before the deferred a()
, if I'm not mistaken. Doing Observable.defer(() -> Observable.just(b()))
for b()
and c()
causes them never to be called (obviously), as they're never subscribed to.
Any tips? Thanks!
Observable.defer
takes a function that is called on subscription. You never subscribe to it, so the inner function is never called. Observable.just
takes a value, and yield that value when you subscribe. You never subscribe so it never yield a value, however, as you enter a()
as a parameter, that parameter is called on creating the Observable.just
, that is (i think), when defer
is called. Not 100% sure here, might be called on creating the lambda too. Next you write .map(v2 -> d(v2))
, but again, this code is never executed because you do not subscribe to the Observable. You just prepare two observables and then discard them.
@Dorus, sorry about that, should've been clearer. I do subscribe to the observable -- just didn't put that bit there. So this is the unabridged version, all in one go:
Observable
.defer(() -> Observable.just(a()))
.map(v -> {
Observable.just(b(v))
.map(v2 -> d(v2));
Observable.just(c(v))
.map(v3 -> e(v3))
return (Void) null;
})
.subscribeOn(Schedulers.newThread())
.compose(/* ... */)
.subscribe(result -> {},
e -> {});
The two observables are, as I thought of it, only apparently discarded because if b()
, c()
, d()
and e()
go well, no exceptions are thrown and the map
block reaches its end, returning null
. I do understand that if just
is called at any point that isn't when v
contains a proper value (that is, that the deferred call has been made), then this is all pointless and doesn't do what I expect it to. However, it seems a bit weird to me that the just
calls would be made before a()
returns: what argument is passed to b()
and c()
then (again, a()
wouldn't be 'ready' at this point)?
Observable.defer(() -> Observable.just(a()))
.flatMap(v-> Observable.zip(
Observable.defer(() -> Observable.just(b(v))).map( v1 -> d(v1)),
Observable.defer(() -> Observable.just(c(v))).map( v1 -> e(v1)),
(a,b) -> 0))
b()
and c()
in parallel. Not sure why, but if i do this it does:Observable.defer(() -> Observable.just(a()))
.flatMap(v-> Observable.merge(
Observable.just(v).observeOn(Schedulers.newThread())
.map(v1 -> b(v1)).map(v1 -> d(v1)),
Observable.just(v).observeOn(Schedulers.newThread())
.map(v1 -> c(v1)).map(v1 -> e(v1))))
.withLatestFrom(textSubject.toList(), (scanWineInfo, strings) -> {
int size = strings.size();
for (int i = 0; i < size; i++) {
String text = strings.get(i);
//check string
Log.i("Tesseract backlog", text);
}
return scanWineInfo;
})
flatMap