These are chat archives for ReactiveX/RxJava

Nov 2018
Nov 19 2018 08:13
Hi…How to create an Observable from an iterator?
I didnt find any factory method to create one. Do I have to use emitter?
Nov 19 2018 08:28

Pardon my limited knowledge in RxJava; how can I achieve following? I am trying to implement something like below.

CustomIterator<String> iter = ... code to build custom iterator

Observable.<String>create(emitter -> {
        while(iter.hasNext()) {
    }, BackpressureMode.BUFFER)
    .map(id -> {
        Doc doc = fetchFromDB(id);
    .map(vo -> complicatedTask)
    .forEach(/* Do final processing */);

I need some help around complicatedTask. Current non-reactive implementation performs an action parallely (using FJP common pool) and for each VO document, it creates Future and drops that to a blocking queue. The consumer blocks on the Future sequentially and once available does some other processing. This model gives us two things:

  1. Parallel execution
  2. Blocking queue guarantees that a fixed no. of Futures are created.

Some clarifications:

  1. The method compicatedTask does all the processing in CPU; no I/O.
  2. Each VO has a collection property which needs some processing which is parallelized.
  3. The return type of compicatedTask is another type of DTO.

if I want to achieve similar in reactive, how should I do?

If i return Future from .map(vo -> complicatedTask), then it will lead to a lot of Futures in memory.
David Karnok
Nov 19 2018 09:59
Observable.from((Iterable<String>)() -> iter).
Nov 19 2018 19:12
Thanks @akarnokd …can you provide some ideas for the second problem?
Ignacio Baca Moreno-Torres
Nov 19 2018 21:15
You really should read all the fromXxx methods :wink2:
Nov 19 2018 22:56
@ibaca you’re right…i totally miss this part…funny thing is, i am already doing it (converting an iterator to iterable)…but not sure why i missed was my bad!