Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Simon Baslé
    @simonbasle
    (and the toBlocking section is just here so that I can execute this in a standalone main or unit test)
    johnfoconnor
    @johnfoconnor
    @simonbasle thanks
    Gabriel Borges
    @borgesgabriel

    Guys -- complete beginner here, still hasn't got a grasp of some of the concepts with ReactiveX.
    So I have the following functions, all synchronous: a(), b(), c(), d() and e(). Those functions are to be called once. b() and c() depend on a(), d() depends on b() and e() depends on c(). I'd like this to happen: a->[b->d||c->e], arrow and || being series and parallel, respectively. I've got this so far:

    Observable.defer(() -> Observable.just(a()))
              .map(v -> {
                Observable.just(b(v))
                    .map(v2 -> d(v2));
                Observable.just(c(v))
                    .map(v3 -> e(v3))
                return (Void) null;
              });

    So this is what I'd want to happen when the outer observable is subscribed to: a() is called, returning v. This is then fed into b() and c(), being mapped to d() and e() as they return. If everything goes well, no exceptions are thrown and the subscriber receives null. However, this is not what happens, and just is called before the deferred a(), if I'm not mistaken. Doing Observable.defer(() -> Observable.just(b())) for b() and c() causes them never to be called (obviously), as they're never subscribed to.
    Any tips? Thanks!

    Dorus
    @Dorus
    So much going wrong here, i'm not sure where to start. Others can probably help more, but i give you some advice.
    Observable.defer takes a function that is called on subscription. You never subscribe to it, so the inner function is never called. Observable.just takes a value, and yield that value when you subscribe. You never subscribe so it never yield a value, however, as you enter a() as a parameter, that parameter is called on creating the Observable.just, that is (i think), when defer is called. Not 100% sure here, might be called on creating the lambda too. Next you write .map(v2 -> d(v2)), but again, this code is never executed because you do not subscribe to the Observable. You just prepare two observables and then discard them.
    Gabriel Borges
    @borgesgabriel

    @Dorus, sorry about that, should've been clearer. I do subscribe to the observable -- just didn't put that bit there. So this is the unabridged version, all in one go:

    Observable
              .defer(() -> Observable.just(a()))
              .map(v -> {
                Observable.just(b(v))
                    .map(v2 -> d(v2));
                Observable.just(c(v))
                    .map(v3 -> e(v3))
                return (Void) null;
              })
              .subscribeOn(Schedulers.newThread())
              .compose(/* ... */)
              .subscribe(result -> {},
                         e -> {});

    The two observables are, as I thought of it, only apparently discarded because if b(), c(), d() and e() go well, no exceptions are thrown and the map block reaches its end, returning null. I do understand that if just is called at any point that isn't when v contains a proper value (that is, that the deferred call has been made), then this is all pointless and doesn't do what I expect it to. However, it seems a bit weird to me that the just calls would be made before a() returns: what argument is passed to b() and c() then (again, a() wouldn't be 'ready' at this point)?

    Dorus
    @Dorus
     v -> {
            Observable.just(b(v))
                .map(v2 -> d(v2));
            Observable.just(c(v))
                .map(v3 -> e(v3))
            return (Void) null;
          }
    That's the bit where you make observables but never subscribe to them
    I think you meant to do something like flatMap or merge there.
    or perhapts even zip
    now i think about it
    Gabriel Borges
    @borgesgabriel
    ahh gotcha. I wasn't subscribing to them because what they returned didn't matter to me -- I was basically interested in whether they finished without errors. But failed to see that nothing would come out of the observables since no one was observing. So I should subscribe (perhaps even merging the observables beforehand), and just ignore the results, I believe. Makes sense?
    Dorus
    @Dorus
    If they finish with errors you still need them to yield onError() and merge/zip/concat them so the error is propagated forwards.
    David Stemmer
    @weefbellington
    depending on what you're doing, you may also want to consider the Operator interface
    but that is a little advanced
    Gabriel Borges
    @borgesgabriel
    will take a look into that, thanks, @weefbellington. @Dorus, thanks for your help!
    Dorus
    @Dorus
    Something like this
    Observable.defer(() -> Observable.just(a()))
            .flatMap(v-> Observable.zip(
                    Observable.defer(() -> Observable.just(b(v))).map( v1 -> d(v1)), 
                    Observable.defer(() -> Observable.just(c(v))).map( v1 -> e(v1)),
                    (a,b) -> 0))
    Dorus
    @Dorus
    Actually i ran that and it doesn't run b() and c() in parallel. Not sure why, but if i do this it does:
    Observable.defer(() -> Observable.just(a()))
            .flatMap(v-> Observable.merge(
                    Observable.just(v).observeOn(Schedulers.newThread())
                    .map(v1 -> b(v1)).map(v1 -> d(v1)), 
                    Observable.just(v).observeOn(Schedulers.newThread())
                    .map(v1 -> c(v1)).map(v1 -> e(v1))))
    Anyone know why merge and zip wont run the inner observables in parallel?
    Dorus
    @Dorus
    I'm thinking Observable.just is to blame, it executes immediately. Any less ugly way than observeOn(Schedulers.newThread()) to avoid that?
    fAns1k
    @fAns1k
    hi all, how create some snapshot from replay subject? i’ll give some snippet:
    .withLatestFrom(textSubject.toList(), (scanWineInfo, strings) -> {
                        int size = strings.size();
                        for (int i = 0; i < size; i++) {
                            String text = strings.get(i);
    
                            //check string
                            Log.i("Tesseract backlog", text);
                        }
    
                        return scanWineInfo;
                    })
    textSubject - is replay subject and it blocks all stream to move on
    Paco
    @pakoito
    Hi channel
    I need advice on how to tackle this problem
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Yo there
    Paco
    @pakoito
    I have a hot observable sending like 50 signals per second
    those signals are tied to an id, so i can get 10 for id 1, 5 for id 2, none for id 3, etc...
    I need to throttle them to send, once a second, the latest value for each of the ids
    is groupBy, then sample, the correct approach?
    AFAIK groupBy actually concatenates values, so it requires termination
    which my hot observable can't assure
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    signalsForIds
      .buffer(1, SECONDS)
      .flatMap(listOfSignals -> { ... merge signals for same ids and emit as values })
    Paco
    @pakoito
    so, imperative code
    there's a functional solution
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    sure
    Nothing stops you from writing it as functional solution inside of flatMap
    Paco
    @pakoito
    you're grouping by time first, sorting by type second, taking latest third
    with groupBy into sample you sort by type first, then group + latest in one operation
    yeah groupby doesn't require termination, it's just the marbles being silly
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    signalsForIds
      .buffer(1, SECONDS)
      .flatMap(listOfSignals -> Observable.from(listOfSignals).groupBy(signal -> signal.id).last())
    Paco
    @pakoito
    fair enough
    thanks
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    :+1:
    Oh shi, no, it'll emit only last item from the last group
    Paco
    @pakoito
    don't worry, change the buffer to a window
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    problem in flatMap
    Paco
    @pakoito
    This message was deleted
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    The problem is to apply last() correctly so it 'll emit last item per group and not last item of the whole stream
    Paco
    @pakoito
    This message was deleted