These are chat archives for ReactiveX/RxJava

30th
Oct 2015
Gabriel Borges
@borgesgabriel
Oct 30 2015 17:46

Guys -- complete beginner here, still hasn't got a grasp of some of the concepts with ReactiveX.
So I have the following functions, all synchronous: a(), b(), c(), d() and e(). Those functions are to be called once. b() and c() depend on a(), d() depends on b() and e() depends on c(). I'd like this to happen: a->[b->d||c->e], arrow and || being series and parallel, respectively. I've got this so far:

Observable.defer(() -> Observable.just(a()))
          .map(v -> {
            Observable.just(b(v))
                .map(v2 -> d(v2));
            Observable.just(c(v))
                .map(v3 -> e(v3))
            return (Void) null;
          });

So this is what I'd want to happen when the outer observable is subscribed to: a() is called, returning v. This is then fed into b() and c(), being mapped to d() and e() as they return. If everything goes well, no exceptions are thrown and the subscriber receives null. However, this is not what happens, and just is called before the deferred a(), if I'm not mistaken. Doing Observable.defer(() -> Observable.just(b())) for b() and c() causes them never to be called (obviously), as they're never subscribed to.
Any tips? Thanks!

Dorus
@Dorus
Oct 30 2015 19:17
So much going wrong here, i'm not sure where to start. Others can probably help more, but i give you some advice.
Observable.defer takes a function that is called on subscription. You never subscribe to it, so the inner function is never called. Observable.just takes a value, and yield that value when you subscribe. You never subscribe so it never yield a value, however, as you enter a() as a parameter, that parameter is called on creating the Observable.just, that is (i think), when defer is called. Not 100% sure here, might be called on creating the lambda too. Next you write .map(v2 -> d(v2)), but again, this code is never executed because you do not subscribe to the Observable. You just prepare two observables and then discard them.
Gabriel Borges
@borgesgabriel
Oct 30 2015 19:33

@Dorus, sorry about that, should've been clearer. I do subscribe to the observable -- just didn't put that bit there. So this is the unabridged version, all in one go:

Observable
          .defer(() -> Observable.just(a()))
          .map(v -> {
            Observable.just(b(v))
                .map(v2 -> d(v2));
            Observable.just(c(v))
                .map(v3 -> e(v3))
            return (Void) null;
          })
          .subscribeOn(Schedulers.newThread())
          .compose(/* ... */)
          .subscribe(result -> {},
                     e -> {});

The two observables are, as I thought of it, only apparently discarded because if b(), c(), d() and e() go well, no exceptions are thrown and the map block reaches its end, returning null. I do understand that if just is called at any point that isn't when v contains a proper value (that is, that the deferred call has been made), then this is all pointless and doesn't do what I expect it to. However, it seems a bit weird to me that the just calls would be made before a() returns: what argument is passed to b() and c() then (again, a() wouldn't be 'ready' at this point)?

Dorus
@Dorus
Oct 30 2015 19:34
 v -> {
        Observable.just(b(v))
            .map(v2 -> d(v2));
        Observable.just(c(v))
            .map(v3 -> e(v3))
        return (Void) null;
      }
That's the bit where you make observables but never subscribe to them
I think you meant to do something like flatMap or merge there.
or perhapts even zip
now i think about it
Gabriel Borges
@borgesgabriel
Oct 30 2015 19:39
ahh gotcha. I wasn't subscribing to them because what they returned didn't matter to me -- I was basically interested in whether they finished without errors. But failed to see that nothing would come out of the observables since no one was observing. So I should subscribe (perhaps even merging the observables beforehand), and just ignore the results, I believe. Makes sense?
Dorus
@Dorus
Oct 30 2015 19:40
If they finish with errors you still need them to yield onError() and merge/zip/concat them so the error is propagated forwards.
David Stemmer
@weefbellington
Oct 30 2015 19:41
depending on what you're doing, you may also want to consider the Operator interface
but that is a little advanced
Gabriel Borges
@borgesgabriel
Oct 30 2015 19:42
will take a look into that, thanks, @weefbellington. @Dorus, thanks for your help!
Dorus
@Dorus
Oct 30 2015 19:53
Something like this
Observable.defer(() -> Observable.just(a()))
        .flatMap(v-> Observable.zip(
                Observable.defer(() -> Observable.just(b(v))).map( v1 -> d(v1)), 
                Observable.defer(() -> Observable.just(c(v))).map( v1 -> e(v1)),
                (a,b) -> 0))
Dorus
@Dorus
Oct 30 2015 20:17
Actually i ran that and it doesn't run b() and c() in parallel. Not sure why, but if i do this it does:
Observable.defer(() -> Observable.just(a()))
        .flatMap(v-> Observable.merge(
                Observable.just(v).observeOn(Schedulers.newThread())
                .map(v1 -> b(v1)).map(v1 -> d(v1)), 
                Observable.just(v).observeOn(Schedulers.newThread())
                .map(v1 -> c(v1)).map(v1 -> e(v1))))
Anyone know why merge and zip wont run the inner observables in parallel?
Dorus
@Dorus
Oct 30 2015 20:40
I'm thinking Observable.just is to blame, it executes immediately. Any less ugly way than observeOn(Schedulers.newThread()) to avoid that?