Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Gabriel Borges
    @borgesgabriel
    ahh gotcha. I wasn't subscribing to them because what they returned didn't matter to me -- I was basically interested in whether they finished without errors. But failed to see that nothing would come out of the observables since no one was observing. So I should subscribe (perhaps even merging the observables beforehand), and just ignore the results, I believe. Makes sense?
    Dorus
    @Dorus
    If they finish with errors you still need them to yield onError() and merge/zip/concat them so the error is propagated forwards.
    David Stemmer
    @weefbellington
    depending on what you're doing, you may also want to consider the Operator interface
    but that is a little advanced
    Gabriel Borges
    @borgesgabriel
    will take a look into that, thanks, @weefbellington. @Dorus, thanks for your help!
    Dorus
    @Dorus
    Something like this
    Observable.defer(() -> Observable.just(a()))
            .flatMap(v-> Observable.zip(
                    Observable.defer(() -> Observable.just(b(v))).map( v1 -> d(v1)), 
                    Observable.defer(() -> Observable.just(c(v))).map( v1 -> e(v1)),
                    (a,b) -> 0))
    Dorus
    @Dorus
    Actually i ran that and it doesn't run b() and c() in parallel. Not sure why, but if i do this it does:
    Observable.defer(() -> Observable.just(a()))
            .flatMap(v-> Observable.merge(
                    Observable.just(v).observeOn(Schedulers.newThread())
                    .map(v1 -> b(v1)).map(v1 -> d(v1)), 
                    Observable.just(v).observeOn(Schedulers.newThread())
                    .map(v1 -> c(v1)).map(v1 -> e(v1))))
    Anyone know why merge and zip wont run the inner observables in parallel?
    Dorus
    @Dorus
    I'm thinking Observable.just is to blame, it executes immediately. Any less ugly way than observeOn(Schedulers.newThread()) to avoid that?
    fAns1k
    @fAns1k
    hi all, how create some snapshot from replay subject? i’ll give some snippet:
    .withLatestFrom(textSubject.toList(), (scanWineInfo, strings) -> {
                        int size = strings.size();
                        for (int i = 0; i < size; i++) {
                            String text = strings.get(i);
    
                            //check string
                            Log.i("Tesseract backlog", text);
                        }
    
                        return scanWineInfo;
                    })
    textSubject - is replay subject and it blocks all stream to move on
    Paco
    @pakoito
    Hi channel
    I need advice on how to tackle this problem
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Yo there
    Paco
    @pakoito
    I have a hot observable sending like 50 signals per second
    those signals are tied to an id, so i can get 10 for id 1, 5 for id 2, none for id 3, etc...
    I need to throttle them to send, once a second, the latest value for each of the ids
    is groupBy, then sample, the correct approach?
    AFAIK groupBy actually concatenates values, so it requires termination
    which my hot observable can't assure
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    signalsForIds
      .buffer(1, SECONDS)
      .flatMap(listOfSignals -> { ... merge signals for same ids and emit as values })
    Paco
    @pakoito
    so, imperative code
    there's a functional solution
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    sure
    Nothing stops you from writing it as functional solution inside of flatMap
    Paco
    @pakoito
    you're grouping by time first, sorting by type second, taking latest third
    with groupBy into sample you sort by type first, then group + latest in one operation
    yeah groupby doesn't require termination, it's just the marbles being silly
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    signalsForIds
      .buffer(1, SECONDS)
      .flatMap(listOfSignals -> Observable.from(listOfSignals).groupBy(signal -> signal.id).last())
    Paco
    @pakoito
    fair enough
    thanks
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    :+1:
    Oh shi, no, it'll emit only last item from the last group
    Paco
    @pakoito
    don't worry, change the buffer to a window
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    problem in flatMap
    Paco
    @pakoito
    This message was deleted
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    The problem is to apply last() correctly so it 'll emit last item per group and not last item of the whole stream
    Paco
    @pakoito
    This message was deleted
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Again will emit last item of the last group :)
    Paco
    @pakoito
    signalsForIds
      .groupBy(signal -> signal.id)
      .flatMap(obsSignals-> obsSignals.window(1, SECONDS).last())
    I'll need to check on monday, np
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Looks like an overhead because of using window per each group rather than window whole stream and then divide it to groups
    Paco
    @pakoito
    it's more expensive, isn't it?
    okay
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    yep, seems so. Anyway your problem is solvable, but requires some non-trivial combination of operators :smile: Write unit tests and experiment with implementation!
    David Stemmer
    @weefbellington
    @pakoito just spitballing, but have you considered something like this?
    Observable<Data> signal1 = hotSignal.filter(data -> data.id == 1).startWith(emptyData);
    Observable<Data> signal2 = hotSignal.filter(data -> data.id == 2).startWith(emptyData);
    Observable<Data> signal3 = hotSignal.filter(data -> data.id == 3).startWith(emptyData);
    
    Observable<String> latestData = Observable.combineLatest(
        signal1, signal2, signal3,
        (data1, data2, data3) -> mergeData(data1, data2, data3)
    );
    
    latestData.sample(1, TimeUnit.SECONDS).subscribe(mergedData -> System.out.println(mergedData));