These are chat archives for ReactiveX/RxJava

19th
Nov 2018
Niranjan
@nnanda2016
Nov 19 2018 08:13
Hi…How to create an Observable from an iterator?
I didnt find any factory method to create one. Do I have to use emitter?
Niranjan
@nnanda2016
Nov 19 2018 08:28

Pardon my limited knowledge in RxJava; how can I achieve following? I am trying to implement something like below.

CustomIterator<String> iter = ... code to build custom iterator

Observable.<String>create(emitter -> {
        while(iter.hasNext()) {
            emitter.onNext(iter.next());
        }
    }, BackpressureMode.BUFFER)
    .map(id -> {
        Doc doc = fetchFromDB(id);
        convertDocToVO(doc);
    })
    .map(vo -> complicatedTask)
    .forEach(/* Do final processing */);
    ;

I need some help around complicatedTask. Current non-reactive implementation performs an action parallely (using FJP common pool) and for each VO document, it creates Future and drops that to a blocking queue. The consumer blocks on the Future sequentially and once available does some other processing. This model gives us two things:

  1. Parallel execution
  2. Blocking queue guarantees that a fixed no. of Futures are created.

Some clarifications:

  1. The method compicatedTask does all the processing in CPU; no I/O.
  2. Each VO has a collection property which needs some processing which is parallelized.
  3. The return type of compicatedTask is another type of DTO.

if I want to achieve similar in reactive, how should I do?

If i return Future from .map(vo -> complicatedTask), then it will lead to a lot of Futures in memory.
David Karnok
@akarnokd
Nov 19 2018 09:59
Observable.from((Iterable<String>)() -> iter).
Niranjan
@nnanda2016
Nov 19 2018 19:12
Thanks @akarnokd …can you provide some ideas for the second problem?
Ignacio Baca Moreno-Torres
@ibaca
Nov 19 2018 21:15
You really should read all the fromXxx methods :wink2:
Niranjan
@nnanda2016
Nov 19 2018 22:56
@ibaca you’re right…i totally miss this part…funny thing is, i am already doing it (converting an iterator to iterable)…but not sure why i missed that..it was my bad!