These are chat archives for ReactiveX/RxJava

27th
Feb 2017
Abbi
@wasabbi
Feb 27 2017 12:23
)
Alex Krause
@alex0ptr
Feb 27 2017 20:59

Hi, we are having problems getting our head around a backpressure problem. We want to pull files in parallel from a remote and slow location in order to analyze them in the next step with a computation bound analyze step, which should also be done in parallel. Unfortunately the analyzing step is slower than getting the files, which leads to our disk running out of memory. Therefore we need to adjust backpressure settings to slow down the producer of file downloads?
Our code looks something like this:

Flowable.fromIterable(fileListing)
  .flatMap(file ->
    Single.fromCallable(file -> getFileFromRemoteLocation(file))
      .subscribeOn(Schedulers.io())
  )
  .map() // backpressure needed here? e.g. after 10 files stop getting more
  .flatMap(localFile -> 
    Single.fromCallable(file -> analyzeFile(file))
      .subscribeOn(Schedulers.computation())
  )

How do we communicate appropriate backpressure in this case?

David Karnok
@akarnokd
Feb 27 2017 21:28
@alex0ptr You can adjust the concurrency level of flatMap:
Flowable.fromIterable(files)
.flatMap(file -> Flowable.fromCallable(() -> getFileFromRemoteLocation(file))
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .map(f -> analyzeFile(f))
, 10)
...
Alex Krause
@alex0ptr
Feb 27 2017 21:31
@akarnokd That is surprising? Is there no way I can tell the first flatmap that it should not produce too many elements for the second?
David Karnok
@akarnokd
Feb 27 2017 21:33
You have to set the concurrency level of the second flatMap as well, but you can just combine the two into a single one.
Alex Krause
@alex0ptr
Feb 27 2017 21:34
Yeah they are however in two different classes for seperation of concerns.
So if I set both flatMaps to just 10 it woul work too? @akarnokd
David Karnok
@akarnokd
Feb 27 2017 21:36
Yes.
Alex Krause
@alex0ptr
Feb 27 2017 21:36
Awesome. Thanks @akarnokd