Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Rudi Grinberg
    @rgrinberg
    it was that i’m an idiot
    .asObservable()
    how do i do .distinct() but only for consecutive elements?
    Dorus
    @Dorus
    Naah, programing is always about feeling like a idiot once you figure out what was wrong.
    distinctUntilChanged() ?
    Rudi Grinberg
    @rgrinberg
    thanks!
    johnfoconnor
    @johnfoconnor
    Im trying to figure out how to represent the following nested for loops in Rx constructs. Having trouble getting something elegant. What am i missing? here's the pseudo code http://pastebin.com/raw.php?i=G0k9ZR4N
    Simon Baslé
    @simonbasle
    @johnfoconnor use flatMap and Observable.range
    I can give you an example in Java
    private static int getLevelFor(String s) {
            return s.length();
        }
    
        public void forFor() {
            List<String> uids = Arrays.asList("A", "BB", "CCCCCC");
    
            Observable.from(uids)
                    .flatMap(new Func1<String, Observable<String>>() {
                        @Override
                        public Observable<String> call(final String s) {
                            return Observable.range(1, getLevelFor(s))
                                    .map(new Func1<Integer, String>() {
                                        @Override
                                        public String call(Integer integer) {
                                            return s + "," + integer;
                                        }
                                    });
                        }
                    })
            .toBlocking()
            .forEach(new Action1<Object>() {
                @Override
                public void call(Object o) {
                    System.out.println(o);
                }
            });
    }
    outputs:
    A,1
    BB,1
    BB,2
    CCCCCC,1
    CCCCCC,2
    CCCCCC,3
    CCCCCC,4
    CCCCCC,5
    CCCCCC,6
    (of course String would have to be replaced by a pair or you actual domain object)
    (and the toBlocking section is just here so that I can execute this in a standalone main or unit test)
    johnfoconnor
    @johnfoconnor
    @simonbasle thanks
    Gabriel Borges
    @borgesgabriel

    Guys -- complete beginner here, still hasn't got a grasp of some of the concepts with ReactiveX.
    So I have the following functions, all synchronous: a(), b(), c(), d() and e(). Those functions are to be called once. b() and c() depend on a(), d() depends on b() and e() depends on c(). I'd like this to happen: a->[b->d||c->e], arrow and || being series and parallel, respectively. I've got this so far:

    Observable.defer(() -> Observable.just(a()))
              .map(v -> {
                Observable.just(b(v))
                    .map(v2 -> d(v2));
                Observable.just(c(v))
                    .map(v3 -> e(v3))
                return (Void) null;
              });

    So this is what I'd want to happen when the outer observable is subscribed to: a() is called, returning v. This is then fed into b() and c(), being mapped to d() and e() as they return. If everything goes well, no exceptions are thrown and the subscriber receives null. However, this is not what happens, and just is called before the deferred a(), if I'm not mistaken. Doing Observable.defer(() -> Observable.just(b())) for b() and c() causes them never to be called (obviously), as they're never subscribed to.
    Any tips? Thanks!

    Dorus
    @Dorus
    So much going wrong here, i'm not sure where to start. Others can probably help more, but i give you some advice.
    Observable.defer takes a function that is called on subscription. You never subscribe to it, so the inner function is never called. Observable.just takes a value, and yield that value when you subscribe. You never subscribe so it never yield a value, however, as you enter a() as a parameter, that parameter is called on creating the Observable.just, that is (i think), when defer is called. Not 100% sure here, might be called on creating the lambda too. Next you write .map(v2 -> d(v2)), but again, this code is never executed because you do not subscribe to the Observable. You just prepare two observables and then discard them.
    Gabriel Borges
    @borgesgabriel

    @Dorus, sorry about that, should've been clearer. I do subscribe to the observable -- just didn't put that bit there. So this is the unabridged version, all in one go:

    Observable
              .defer(() -> Observable.just(a()))
              .map(v -> {
                Observable.just(b(v))
                    .map(v2 -> d(v2));
                Observable.just(c(v))
                    .map(v3 -> e(v3))
                return (Void) null;
              })
              .subscribeOn(Schedulers.newThread())
              .compose(/* ... */)
              .subscribe(result -> {},
                         e -> {});

    The two observables are, as I thought of it, only apparently discarded because if b(), c(), d() and e() go well, no exceptions are thrown and the map block reaches its end, returning null. I do understand that if just is called at any point that isn't when v contains a proper value (that is, that the deferred call has been made), then this is all pointless and doesn't do what I expect it to. However, it seems a bit weird to me that the just calls would be made before a() returns: what argument is passed to b() and c() then (again, a() wouldn't be 'ready' at this point)?

    Dorus
    @Dorus
     v -> {
            Observable.just(b(v))
                .map(v2 -> d(v2));
            Observable.just(c(v))
                .map(v3 -> e(v3))
            return (Void) null;
          }
    That's the bit where you make observables but never subscribe to them
    I think you meant to do something like flatMap or merge there.
    or perhapts even zip
    now i think about it
    Gabriel Borges
    @borgesgabriel
    ahh gotcha. I wasn't subscribing to them because what they returned didn't matter to me -- I was basically interested in whether they finished without errors. But failed to see that nothing would come out of the observables since no one was observing. So I should subscribe (perhaps even merging the observables beforehand), and just ignore the results, I believe. Makes sense?
    Dorus
    @Dorus
    If they finish with errors you still need them to yield onError() and merge/zip/concat them so the error is propagated forwards.
    David Stemmer
    @weefbellington
    depending on what you're doing, you may also want to consider the Operator interface
    but that is a little advanced
    Gabriel Borges
    @borgesgabriel
    will take a look into that, thanks, @weefbellington. @Dorus, thanks for your help!
    Dorus
    @Dorus
    Something like this
    Observable.defer(() -> Observable.just(a()))
            .flatMap(v-> Observable.zip(
                    Observable.defer(() -> Observable.just(b(v))).map( v1 -> d(v1)), 
                    Observable.defer(() -> Observable.just(c(v))).map( v1 -> e(v1)),
                    (a,b) -> 0))
    Dorus
    @Dorus
    Actually i ran that and it doesn't run b() and c() in parallel. Not sure why, but if i do this it does:
    Observable.defer(() -> Observable.just(a()))
            .flatMap(v-> Observable.merge(
                    Observable.just(v).observeOn(Schedulers.newThread())
                    .map(v1 -> b(v1)).map(v1 -> d(v1)), 
                    Observable.just(v).observeOn(Schedulers.newThread())
                    .map(v1 -> c(v1)).map(v1 -> e(v1))))
    Anyone know why merge and zip wont run the inner observables in parallel?
    Dorus
    @Dorus
    I'm thinking Observable.just is to blame, it executes immediately. Any less ugly way than observeOn(Schedulers.newThread()) to avoid that?
    fAns1k
    @fAns1k
    hi all, how create some snapshot from replay subject? i’ll give some snippet:
    .withLatestFrom(textSubject.toList(), (scanWineInfo, strings) -> {
                        int size = strings.size();
                        for (int i = 0; i < size; i++) {
                            String text = strings.get(i);
    
                            //check string
                            Log.i("Tesseract backlog", text);
                        }
    
                        return scanWineInfo;
                    })
    textSubject - is replay subject and it blocks all stream to move on
    Paco
    @pakoito
    Hi channel
    I need advice on how to tackle this problem
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Yo there
    Paco
    @pakoito
    I have a hot observable sending like 50 signals per second
    those signals are tied to an id, so i can get 10 for id 1, 5 for id 2, none for id 3, etc...
    I need to throttle them to send, once a second, the latest value for each of the ids
    is groupBy, then sample, the correct approach?
    AFAIK groupBy actually concatenates values, so it requires termination
    which my hot observable can't assure
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    signalsForIds
      .buffer(1, SECONDS)
      .flatMap(listOfSignals -> { ... merge signals for same ids and emit as values })
    Paco
    @pakoito
    so, imperative code
    there's a functional solution
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    sure
    Nothing stops you from writing it as functional solution inside of flatMap
    Paco
    @pakoito
    you're grouping by time first, sorting by type second, taking latest third
    with groupBy into sample you sort by type first, then group + latest in one operation