These are chat archives for ReactiveX/RxJava

23rd
Sep 2015
Simon Baslé
@simonbasle
Sep 23 2015 14:00
hi guys, can someone shed some light on how flatMap(Func1<T, Observable<R>> f, int maxConcurrent) works?
specifically, is that a mean of propagating a backpressure request upstream?
say I have a bursty source, like Observable.from(aCollectionOfThousandsOfStrings)
would that flatMap signature with maxConcurrent of, say, 100 ensure that no more than 100 invocations of the map function are made at a time?
(effectively consuming 100 strings from the source, making the corresponding asynchronous calls, waiting for completion of at least one of them before makin a 101th call?)
Dorus
@Dorus
Sep 23 2015 14:32
As far as i know only one invocation is made at a time.
The maxConcurrent refers to the max number of parallel active sources. It merges 100 collections and only start at #101 once one of them yields onComplete().
Simon Baslé
@simonbasle
Sep 23 2015 15:45
ok looks like it's what I'm after, a way of preventing a large list in an Observable.from(...) (or range) to swamp my io layer with too many requests in an instant
Something like:
Observable.range(1, 1000)
    .map(id -> buildUrlToResource(id))
    //driver can only cope with around 150 asynchronous requests
    .flatMap(url -> driver.fireRequestAsync(url),
100)
    .doOnNext(response -> showNotification(response.content())
    .toBlocking().last(); //wait for last response
Dorus
@Dorus
Sep 23 2015 15:47
Yes, that'll work. It will only fire up 100 fireRequestAsync in parallel.
Simon Baslé
@simonbasle
Sep 23 2015 15:47
cool, thanks @Dorus
(I made a unit test in my project that demonstrates this behavior ;))