These are chat archives for ReactiveX/RxJava

28th
Jun 2016
Włodzimierz Rożkow
@rozhok
Jun 28 2016 07:51
hey guys, need a quick advice about rx.
background: we have microservices-based application, where some of services make calls to each other. we need to do this in parallel/async manner. values returned from queries are heterogeneous so they couldn't be zipped properly afaik
does rx will help me here or I may stick with ol' good CompletableFuture?
Thomas Wilgenbus
@regnarock
Jun 28 2016 09:21

Hi there! Question about a combination of operators to solve a specific need of mine:
Prelude: I have two async Observables A and B. A emits 1+ elements and B always 1. I want to combine the two such as the ouput of A: [A1, A2, A3] and B: [B1] -> [A1B1, A2B1, A3B1].

my solution:

A.toList().zipWith(B, (list_of_A, B1) -> Lists.transform(list_of_A, An -> combine(An, B1)))

it does work but I am unsure if this is the right way to do it?
(note: I used Lists.tranform from guava for readability purpose)

Dorus
@Dorus
Jun 28 2016 09:22
@regnarock Flatmap?
B.flatMap(() -> A, (b, a) -> {...})
mmm, i'm looking up the names, i think it's flatMapIterable actually
Thomas Wilgenbus
@regnarock
Jun 28 2016 09:26
I don't understand how flatMapIterable would help me there. A is already an Observable<An>. Here, my solution was to turn A into Observable<List<An>> to then zip it with B.
Dorus
@Dorus
Jun 28 2016 09:27
nah i was wrong. You need to use the collectoinSelector
i tought only flatMapIterable had that, but both do.
Thomas Wilgenbus
@regnarock
Jun 28 2016 09:27
Ok. Idk that, lookin at it! Coming back later, thx :)
bah cant share urls with brackets ;(
Thomas Wilgenbus
@regnarock
Jun 28 2016 09:30
Second link worked like a charm :)
Dorus
@Dorus
Jun 28 2016 09:31
yeah, works if i post just the url :)
Thomas Wilgenbus
@regnarock
Jun 28 2016 13:36
Thanks @Dorus! It worked like a charm
Thomas Wilgenbus
@regnarock
Jun 28 2016 15:42
Hm.. After further tests, it seems this version of flatMap is not yet (or not entirely) the solution: the second Observable gets subscribed for each object of the first. Resulting in many network calls
Dorus
@Dorus
Jun 28 2016 15:42
ooh right
What about combineLatest?
you just have to make sure the first call yields quicker
Thomas Wilgenbus
@regnarock
Jun 28 2016 15:43
That was my first try at this problem!
Exactly. and I can't
Dorus
@Dorus
Jun 28 2016 15:44
So you need to buffer the items from A untill B yields.
Something like window could work.
lets see if there is an easier solution first
Thomas Wilgenbus
@regnarock
Jun 28 2016 15:46
I actually wrote a custom operator to buffer the items, but after having troubles managing different Threads in my Rx stream, I thought I should look for an easier solution (the problem does sound quite trivial)
Dorus
@Dorus
Jun 28 2016 15:48
I'm instantly thinking about this one again:
Would pausableBuffered help?
Thomas Wilgenbus
@regnarock
Jun 28 2016 15:49
I'm looking at it, thx :). brb
Dorus
@Dorus
Jun 28 2016 15:49
Mmm the docs lead to onBackpressureBuffer
oh you found one?
Thomas Wilgenbus
@regnarock
Jun 28 2016 15:51
oh no. Just looking at the link you gave me
Dorus
@Dorus
Jun 28 2016 15:56
yeah i'm trying to use something similar to that to get what you need. But dinner is ready, so i gotta go now. Might get back to it later, but you might eb able to puzzle it out yourself :)
Thomas Wilgenbus
@regnarock
Jun 28 2016 15:57
Yes I think so too. You have been of a great help, thank you very much :D. And eat well
Dorus
@Dorus
Jun 28 2016 17:01
@regnarock I'm not complaining, but i just realized you changed your specs. You first said B only emited 1 item.
Still, some trick to query both A and B at the same time is still faster.
I'm thinking, could it be as easy a .Replay on A?