Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Yo there
    Paco
    @pakoito
    I have a hot observable sending like 50 signals per second
    those signals are tied to an id, so i can get 10 for id 1, 5 for id 2, none for id 3, etc...
    I need to throttle them to send, once a second, the latest value for each of the ids
    is groupBy, then sample, the correct approach?
    AFAIK groupBy actually concatenates values, so it requires termination
    which my hot observable can't assure
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    signalsForIds
      .buffer(1, SECONDS)
      .flatMap(listOfSignals -> { ... merge signals for same ids and emit as values })
    Paco
    @pakoito
    so, imperative code
    there's a functional solution
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    sure
    Nothing stops you from writing it as functional solution inside of flatMap
    Paco
    @pakoito
    you're grouping by time first, sorting by type second, taking latest third
    with groupBy into sample you sort by type first, then group + latest in one operation
    yeah groupby doesn't require termination, it's just the marbles being silly
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    signalsForIds
      .buffer(1, SECONDS)
      .flatMap(listOfSignals -> Observable.from(listOfSignals).groupBy(signal -> signal.id).last())
    Paco
    @pakoito
    fair enough
    thanks
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    :+1:
    Oh shi, no, it'll emit only last item from the last group
    Paco
    @pakoito
    don't worry, change the buffer to a window
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    problem in flatMap
    Paco
    @pakoito
    This message was deleted
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    The problem is to apply last() correctly so it 'll emit last item per group and not last item of the whole stream
    Paco
    @pakoito
    This message was deleted
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Again will emit last item of the last group :)
    Paco
    @pakoito
    signalsForIds
      .groupBy(signal -> signal.id)
      .flatMap(obsSignals-> obsSignals.window(1, SECONDS).last())
    I'll need to check on monday, np
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Looks like an overhead because of using window per each group rather than window whole stream and then divide it to groups
    Paco
    @pakoito
    it's more expensive, isn't it?
    okay
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    yep, seems so. Anyway your problem is solvable, but requires some non-trivial combination of operators :smile: Write unit tests and experiment with implementation!
    David Stemmer
    @weefbellington
    @pakoito just spitballing, but have you considered something like this?
    Observable<Data> signal1 = hotSignal.filter(data -> data.id == 1).startWith(emptyData);
    Observable<Data> signal2 = hotSignal.filter(data -> data.id == 2).startWith(emptyData);
    Observable<Data> signal3 = hotSignal.filter(data -> data.id == 3).startWith(emptyData);
    
    Observable<String> latestData = Observable.combineLatest(
        signal1, signal2, signal3,
        (data1, data2, data3) -> mergeData(data1, data2, data3)
    );
    
    latestData.sample(1, TimeUnit.SECONDS).subscribe(mergedData -> System.out.println(mergedData));
    David Stemmer
    @weefbellington
    the idea being -- you filter your main signal into multiple, one for each ID, and then sample the latest values from each filtered signal at timed intervals
    Vasily Styagov
    @VasyaFromRussia
    Observable first = Observable.create(new Observable.OnSubscribe<Object>() {
                        @Override
                        public void call(Subscriber<? super Object> subscriber) {
                            subscriber.add(Subscriptions.create(() -> {
                                log(“first unsubscribe”);         
                            }));
                        }
                    }).doOnUnsubscribe(() -> log(“first”));
       Observable second = Observable.create(…).doOnUnsubscribe(() -> log(“second”));
       Observable result = first.mergeWith(second).doOnUnsubscribe(() -> loge(“result”));
       Subscription subscription = result.subscribe(…);
       //…
       subscription.unsubscribe();
    It logs only “result”. Looks like unsubscription is not propagated to merge’s child observables. So how to handle unsubscription inside of first observable’s Observable.OnSubscribe?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @weefbellington seems that your solution is suitable only for fixed/known variations of ids
    David Stemmer
    @weefbellington
    @artem-zinnatullin there's no reason to couldn't use groupBy instead of filtee
    David Stemmer
    @weefbellington
    errr that last thing was not something I meant to press enter on
    but now I can't delete it
    disregard the above comment
    @artem-zinnatullin you're right, but if the number of streams is finite and the IDs are known, combineLatest is a pretty natural approach
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    yep, I agree, but usually I try to use more generic solutions just in case of changes in future :)
    David Stemmer
    @weefbellington
    I'm playing around with an alternate solution using collect
    David Stemmer
    @weefbellington
    @artem-zinnatullin @pakoito here is another solution that builds on the window/flatMap approach:
    final Func0<HashMap<Integer, String>> initialState = () -> new HashMap<>();
    final HashMap<Integer, String> output = new HashMap<>();
    
    source.window(1, TimeUnit.SECONDS)
       .flatMap(observable -> observable.collect(
          initialState,
          (mostRecent, data) -> mostRecent.put(data.id, data.content))
       )
    .map(input -> {
       output.putAll(input);
       return output;
    }).subscribe(reduced -> System.out.println(output));
    that final .map is kind of kludgy, I'm pondering a better way to produce the output dictionary