These are chat archives for ReactiveX/RxJava

16th
Mar 2016
Samuel Tardieu
@samueltardieu
Mar 16 2016 13:00
Any idea on how to do elegantly what I describe in https://github.com/cgeo/cgeo/issues/5533#issuecomment-197284324 with RxJava?
We have a list of geocaches that we want to refresh (in a geocaching application), and checking if they do have static maps stored requires a disk access. But we want to favor refresh of caches without static maps because the Google Maps API quota is quite low, and we want to give a chance to those caches. And we want to do all that with a minimum delay, and limit disk accesses if the user cancels the operation. With Akka streams, it would be very easy to build a graph as described with the Graph DSL. With RxJava, it looks less straightforward.
Edoardo Vacchi
@evacchi
Mar 16 2016 13:23
Hi all; problem: I have a (remote) datasource represented as an iterator that I want to share between different computations; I also need that each of these computations consume the data at their own pace, dropping on backpressure; in other words, the iterator should be consumed at maximum speed (the iterator itself will synchronously block when no data is available). This is what I've come up with https://gist.github.com/evacchi/cf75d4199a8d6af285f6
I expected to see 1) and 2) produce different output, instead they seem to be running round-robin at the speed of the slow computation...
(rx java 2.x branch)
dwursteisen
@dwursteisen
Mar 16 2016 14:18

@samueltardieu

 obs = geoCaches.share();
withMap = obs.filter(c -> c.withStaticMap()).buffer(5);
withoutMap = obs.filter(c -> c.withoutStaticMap()).buffer();
result = withMap.mergeWith(withoutMap);

Something like this ?

Samuel Tardieu
@samueltardieu
Mar 16 2016 14:37
@dwursteisen Will mergeWith prefer withMap over withoutMap?
Also, I can't filter as easily as the information with/without static maps is not in the geocache object. It requires a disk access, so that would be: geocaches -> (geoCache, hasStaticMaps?) -> cache -> filter…
(first enrich the geocache with the static maps information to do it only once)
But the important think is how backpressure and input preference is handled by mergeWith. In Akka streams, there is an explicit mergePreferred which favours one entry over the others.
dwursteisen
@dwursteisen
Mar 16 2016 16:24
@samueltardieu : mergeWith will subscribe sequentially I think. So I will first emit items from withMap.
For the backpressure part, I don't know
Samuel Tardieu
@samueltardieu
Mar 16 2016 16:26
@dwursteisen Yes I know, but it won't do what I want: I want the first source to be preferred if elements are available, and the other one to be used if not. The idea is to have one of the source be a priority source. Backpressure is important here, as we have to wait for the subscriber to request elements before looking if the priority source has some to give.
For example, in my case, the subscriber does network requests, and only a few of them can be done in parallel. As soon as a slot is available because a previous request has terminated, then the subscriber will request a new element from upstream to start a new request. That's when I want to use the priority source if it has an element ready, or the other one otherwise.
dwursteisen
@dwursteisen
Mar 16 2016 16:30
hum; ok.
if elements are available : when do you know it's available ? Have you check switch operators ?
Samuel Tardieu
@samueltardieu
Mar 16 2016 16:40
I should not have to know. Look at the definition of mergePreferred in Akka streams: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stages-overview.html#mergepreferred
When downstream (the subscriber) requests an element, this request is forwarded to the various sources (observables). If several sources have an element ready, then the one from the preferred source is sent to the subscriber (and another one is queried for later). Elements from the non-preferred sources are only sent when no elements from the preferred source are available and the subscriber requests some.
It's not easy to explain when we are at the boundary of two systems with different vocabularies :-) We have source/observable, flow/operator, sink/subscriber
dwursteisen
@dwursteisen
Mar 16 2016 16:47
ok. I see the problem. But I don't know (yet) how to write it using RxJava.
hum. Your (prefered) Observable should be the first to emit an item, in fact ?
prefered.mergeWith(other) will (defacto) emit items from prefered as it be subscribed first. It should be pretty closed to your description ? (but the mergefrom Akka can't be done out of the box with Rx, I think, as Rx don't do random subscription order)
Samuel Tardieu
@samueltardieu
Mar 16 2016 16:51
I can't know. I go through all the geocaches, and each geocache puts itself into one of the other observable depending on whether or not it has a static map. I don't know where the first geocaches will fit. And since this is a huge geocaches collection, I don't want to go too fast in producing the results as determining whether or not a geocache has a static map requires disk access and consumes battery. So if the user cancels the operation in the middle, I don't want to have wasted too many disk accesses and CPU. That's why I want to follow the subscriber demand.
Yes, but preferred might require a long time to generate in full if many geocaches do not fit the criterium. So the network downloader (the subscriber) will have to wait, while it could in the meantime download some non-preferred geocaches that we have encountered when going through the list of geocaches.
For example, let's imagine that you have 1000 caches from 1 to 1000 with the first and the last fullfilling the criterium, the others won't. By using a regular concat, I'll have to go through the list, 1 will get downloaded, 2 to 999 will be put in the non-preferred observable, we will get to 1000 eventually and download it, then we will download 2 to 999. What I would like is download 1, put 2 to 600 aside for example, then the subscriber has a network slot available so it will get 2 because there is no preferred geocache at this time, scanning will continue from 601 to 1000, then 1000 will be handed to the subscriber then 3 to 999.
Samuel Tardieu
@samueltardieu
Mar 16 2016 16:58
On the other hand, if geocaches 1 to 998 meet the criteria and 999 and 1000 don't, we will hand 1 to the network downloader, continue to scan 2 to 6 then stop because the downloader has no slot available so it will backpressure, then when it requests another geocache to download it will be handed 2 and we will scan 7 and pause again, etc. So we follow the demand and don't do useless work by analyzing the geocaches in this case, if the user cancels we won't have trashed the disk retrieving useless information.
dwursteisen
@dwursteisen
Mar 16 2016 17:01
Have you a test which reproduce the issue ? I don't have the solution, but it's a interesting use case.
Samuel Tardieu
@samueltardieu
Mar 16 2016 17:02
Not yet, but I'll have one soon if I write such an operator :-)
dwursteisen
@dwursteisen
Mar 16 2016 17:02
(and the test may help to fully understand the issue)
:D
Samuel Tardieu
@samueltardieu
Mar 16 2016 19:04
@dwursteisen I'm out now, but you can look at the test cases at https://github.com/cgeo/cgeo/blob/master/tests/src/cgeo/geocaching/utils/RxUtilsTest.java#L117