These are chat archives for ReactiveX/RxJava

2nd
Nov 2015
David Stemmer
@weefbellington
Nov 02 2015 05:15
@pakoito just spitballing, but have you considered something like this?
Observable<Data> signal1 = hotSignal.filter(data -> data.id == 1).startWith(emptyData);
Observable<Data> signal2 = hotSignal.filter(data -> data.id == 2).startWith(emptyData);
Observable<Data> signal3 = hotSignal.filter(data -> data.id == 3).startWith(emptyData);

Observable<String> latestData = Observable.combineLatest(
    signal1, signal2, signal3,
    (data1, data2, data3) -> mergeData(data1, data2, data3)
);

latestData.sample(1, TimeUnit.SECONDS).subscribe(mergedData -> System.out.println(mergedData));
David Stemmer
@weefbellington
Nov 02 2015 05:20
the idea being -- you filter your main signal into multiple, one for each ID, and then sample the latest values from each filtered signal at timed intervals
Vasily Styagov
@VasyaFromRussia
Nov 02 2015 08:37
Observable first = Observable.create(new Observable.OnSubscribe<Object>() {
                    @Override
                    public void call(Subscriber<? super Object> subscriber) {
                        subscriber.add(Subscriptions.create(() -> {
                            log(“first unsubscribe”);         
                        }));
                    }
                }).doOnUnsubscribe(() -> log(“first”));
   Observable second = Observable.create(…).doOnUnsubscribe(() -> log(“second”));
   Observable result = first.mergeWith(second).doOnUnsubscribe(() -> loge(“result”));
   Subscription subscription = result.subscribe(…);
   //…
   subscription.unsubscribe();
It logs only “result”. Looks like unsubscription is not propagated to merge’s child observables. So how to handle unsubscription inside of first observable’s Observable.OnSubscribe?
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Nov 02 2015 10:44
@weefbellington seems that your solution is suitable only for fixed/known variations of ids
David Stemmer
@weefbellington
Nov 02 2015 13:14
@artem-zinnatullin there's no reason to couldn't use groupBy instead of filtee
David Stemmer
@weefbellington
Nov 02 2015 14:25
errr that last thing was not something I meant to press enter on
but now I can't delete it
disregard the above comment
@artem-zinnatullin you're right, but if the number of streams is finite and the IDs are known, combineLatest is a pretty natural approach
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Nov 02 2015 15:58
yep, I agree, but usually I try to use more generic solutions just in case of changes in future :)
David Stemmer
@weefbellington
Nov 02 2015 16:29
I'm playing around with an alternate solution using collect
David Stemmer
@weefbellington
Nov 02 2015 18:19
@artem-zinnatullin @pakoito here is another solution that builds on the window/flatMap approach:
final Func0<HashMap<Integer, String>> initialState = () -> new HashMap<>();
final HashMap<Integer, String> output = new HashMap<>();

source.window(1, TimeUnit.SECONDS)
   .flatMap(observable -> observable.collect(
      initialState,
      (mostRecent, data) -> mostRecent.put(data.id, data.content))
   )
.map(input -> {
   output.putAll(input);
   return output;
}).subscribe(reduced -> System.out.println(output));
that final .map is kind of kludgy, I'm pondering a better way to produce the output dictionary
Dorus
@Dorus
Nov 02 2015 22:36
Isn't aggregate the operator that let you collect all element and forwards the result? Simply make a 1 second window and aggregate it into a map, since a map will only contain each key once, it will only keep the latest.
Dorus
@Dorus
Nov 02 2015 22:53
Oh i found it, the correct operator here is called Collect, not Reduce. Both are variants of aggregate.
Dorus
@Dorus
Nov 02 2015 22:59
nvm that's what you're doing
David Stemmer
@weefbellington
Nov 02 2015 23:59
Yeah, collect is very similar to scan/reduce