Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Paco
    @pakoito
    it's more expensive, isn't it?
    okay
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    yep, seems so. Anyway your problem is solvable, but requires some non-trivial combination of operators :smile: Write unit tests and experiment with implementation!
    David Stemmer
    @weefbellington
    @pakoito just spitballing, but have you considered something like this?
    Observable<Data> signal1 = hotSignal.filter(data -> data.id == 1).startWith(emptyData);
    Observable<Data> signal2 = hotSignal.filter(data -> data.id == 2).startWith(emptyData);
    Observable<Data> signal3 = hotSignal.filter(data -> data.id == 3).startWith(emptyData);
    
    Observable<String> latestData = Observable.combineLatest(
        signal1, signal2, signal3,
        (data1, data2, data3) -> mergeData(data1, data2, data3)
    );
    
    latestData.sample(1, TimeUnit.SECONDS).subscribe(mergedData -> System.out.println(mergedData));
    David Stemmer
    @weefbellington
    the idea being -- you filter your main signal into multiple, one for each ID, and then sample the latest values from each filtered signal at timed intervals
    Vasily Styagov
    @VasyaFromRussia
    Observable first = Observable.create(new Observable.OnSubscribe<Object>() {
                        @Override
                        public void call(Subscriber<? super Object> subscriber) {
                            subscriber.add(Subscriptions.create(() -> {
                                log(“first unsubscribe”);         
                            }));
                        }
                    }).doOnUnsubscribe(() -> log(“first”));
       Observable second = Observable.create(…).doOnUnsubscribe(() -> log(“second”));
       Observable result = first.mergeWith(second).doOnUnsubscribe(() -> loge(“result”));
       Subscription subscription = result.subscribe(…);
       //…
       subscription.unsubscribe();
    It logs only “result”. Looks like unsubscription is not propagated to merge’s child observables. So how to handle unsubscription inside of first observable’s Observable.OnSubscribe?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @weefbellington seems that your solution is suitable only for fixed/known variations of ids
    David Stemmer
    @weefbellington
    @artem-zinnatullin there's no reason to couldn't use groupBy instead of filtee
    David Stemmer
    @weefbellington
    errr that last thing was not something I meant to press enter on
    but now I can't delete it
    disregard the above comment
    @artem-zinnatullin you're right, but if the number of streams is finite and the IDs are known, combineLatest is a pretty natural approach
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    yep, I agree, but usually I try to use more generic solutions just in case of changes in future :)
    David Stemmer
    @weefbellington
    I'm playing around with an alternate solution using collect
    David Stemmer
    @weefbellington
    @artem-zinnatullin @pakoito here is another solution that builds on the window/flatMap approach:
    final Func0<HashMap<Integer, String>> initialState = () -> new HashMap<>();
    final HashMap<Integer, String> output = new HashMap<>();
    
    source.window(1, TimeUnit.SECONDS)
       .flatMap(observable -> observable.collect(
          initialState,
          (mostRecent, data) -> mostRecent.put(data.id, data.content))
       )
    .map(input -> {
       output.putAll(input);
       return output;
    }).subscribe(reduced -> System.out.println(output));
    that final .map is kind of kludgy, I'm pondering a better way to produce the output dictionary
    Dorus
    @Dorus
    Isn't aggregate the operator that let you collect all element and forwards the result? Simply make a 1 second window and aggregate it into a map, since a map will only contain each key once, it will only keep the latest.
    Dorus
    @Dorus
    Oh i found it, the correct operator here is called Collect, not Reduce. Both are variants of aggregate.
    Dorus
    @Dorus
    nvm that's what you're doing
    David Stemmer
    @weefbellington
    Yeah, collect is very similar to scan/reduce
    Dorus
    @Dorus
    Also i'm not sure what's going on in the final map. You seem to add back all previous data (in case some id's where unused last second). Is that truly by the spec? If so, i would think there is no need to use window, just scan the original source instead of collect, and sample the result.
    Also reduced == output, so you can just used reduced in subscribe.
    David Stemmer
    @weefbellington
    @Dorus doesn't scan/reduce only emit a single value when the original source completes? In this case the source observable is hot, it may never complete. Maybe I'm misunderstanding what you're suggesting.
    It doesn't look like reducedis the same as output to me, either. The output of each window is a Map. The final step is merging the map emitted by the current window with the map from the previous window.
    Dorus
    @Dorus
    @weefbellington scan emit all intermediate values also. Not sure if that bode well if you are emitting the same mutable datatype each time.
    The map ends with return output;. If i understand that correctly that means that in .subscribe(reduced -> ...) , reduced is the output variable returned by map.
    David Stemmer
    @weefbellington
    @Dorus aha! So that is the difference between scan and reduce! I had thought they were aliases for the same operation. In that case, yes, scan with the values stored in a map is the way to go!
    @Dorus thanks for pointing that out!
    johnfoconnor
    @johnfoconnor
    is there a way to force an observable to emit it's result(s) "naturally" as a return value instead of as a callback within subscribe?
    result = Observable.Range(1, 3).Sum().subscribeAndReturn()
    print(result)  // 6
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin

    no natural way, because it's a model of work — Observable pushes data to you.

    You can block unit it finish via toBlocking

    johnfoconnor
    @johnfoconnor
    sure, it just seems more intuitive in certain cases to avoid the callback mess. especially for something like myArray = Observable.Range(1, 3).toArray()
    obviously it would be toBlocking under the covers
    Dorus
    @Dorus
    tbh if you do not need any of the async properties of rx, rx is probably not the right tool for the job.
    There are ways to use it ofcourse, but it's never "natural".
    David Stemmer
    @weefbellington

    @Dorus updated the previous gist to use scan, thanks for the input:

    source.scan(new HashMap<>(), (output, data) -> {
        output.put(data.id, data.content);
        return output;
    }).sample(1, TimeUnit.SECONDS).subscribe(output -> System.out.println(output));

    https://gist.github.com/weefbellington/952414cb7076976bfd49

    @pakoito @artem-zinnatullin ^ in case you are still interested in this puzzle :)
    Dorus
    @Dorus
    @weefbellington The only thing to keep in mind is that output.put has side effects, and you are emitting the same collection every time, since sample might allows the previous operators to run concurrent, output might receive new put calls during the onNext call in subscribe. Might be a good idea to use ConcurrentHashMap or clone the collection somewhere.
    Your previous example only updated the collection once every second, so this was less problematic.
    Paco
    @pakoito
    @weefbellington Hey, many thanks for the snippet. Sorry I have been out of the chat, I need to get a desktop client.
    I went through the second example and it seems to fit the bill. I can also add an extra signal to have an invalidation policy for data that's not fresh :)
    like every 5 seconds add a signal that vacates those positions whose timestamped value is older than 5 seconds
    David Stemmer
    @weefbellington
    @Dorus good point. I might be a little worried about the efficiency of cloning a new HashMap with each emission. I know that languages like Haskell that don't have mutable data structures are really good at creating a lot of them at once and then recollecting the memory, but I don't know how Java would handle it. So I'd probably go with the ConcurrentHashMap
    Paco
    @pakoito
    Java is very inefficient when cloning maps, and there's no memory sharing libs out there. It is possible within the JVM tho, as Clojure and other languages have them.
    Even worse on Dalvick/ART tho.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Wait, what do you mean by "very inefficient"? Collections just stores references to the objects, not the objects themselves, copy == more references
    Paco
    @pakoito
    I would doublecheck that, or else the shared memory concept would be pointless.
    me or you