These are chat archives for ReactiveX/RxJava

18th
May 2017
Heikki Vesalainen
@hvesalai
May 18 2017 13:31
Hi all! I'm new to RxJava
I'm looking for something like a Flowable.reduceWith, but instead of immediate values, it would work with Singles... ie. where reduceWith of Flowable<T> takes a BiFunction<R,T,R> and returns Single<R>, I would like to give it BiFunction<R,T,Single<R>>and return Single<R>
Any ideas if this can be done with the currently available primitives or do I need to implement it my self?
Heikki Vesalainen
@hvesalai
May 18 2017 14:02
maybe something involving scan and concatMap?
David Karnok
@akarnokd
May 18 2017 15:00
If not doing custom operator, you need an out-of-band storage for R (and thus defer the whole stream if more than one Subscriber is expected):
AtomicReference<R> accumulator = new AtomicReference<R>(initialR);
source.concatMap(t ->
    function.apply(accumulator.get(), t)
        .doOnSuccess(r -> accumulator.set(r))
        .toFlowable()
)
.ignoreElements()
.andThen(Flowable.fromCallable(() -> accumulator.get()))
;
Forgot that there is no ignoreElement in Single.
Heikki Vesalainen
@hvesalai
May 18 2017 15:27
Ok, so I have to have the external variable
Heikki Vesalainen
@hvesalai
May 18 2017 15:46
What about the other way round, if I want to generate asynchronously, i.e. like generate, but where my generator function doesn't call onNext synchronously
I tried doing it with generate, but am getting some IllegalStateException's
David Karnok
@akarnokd
May 18 2017 16:37
I don't understrand what you want.
Heikki Vesalainen
@hvesalai
May 18 2017 16:47

I want a function with the signature

static Flowable<T> generate(T initialState, Function<T, Maybe<T>> generator)

Where the value of the Maybe<T> is fed to generator to get the next value

David Karnok
@akarnokd
May 18 2017 16:49
There is an expand operator that works with Flowables.
Heikki Vesalainen
@hvesalai
May 18 2017 16:49
and furthermore I would like it to be a cold stream, i.e. only generate values on request
This message was deleted
Heikki Vesalainen
@hvesalai
May 18 2017 17:08
So like this?
Flowable.just(initialState).compose(FlowableTransformers.expand(s -> generator(s).toFlowable)).skip(1)
David Karnok
@akarnokd
May 18 2017 17:09
Yes.
Heikki Vesalainen
@hvesalai
May 18 2017 17:10
with the .skip(1) because I don't want to emit the initialState
David Karnok
@akarnokd
May 18 2017 17:10
It's up to you what you do with the elements after compose.
Heikki Vesalainen
@hvesalai
May 18 2017 17:11
so how does that actually work?
What is compose
David Karnok
@akarnokd
May 18 2017 17:15
For such single element, you'd want to use ExpandStrategy.BREATH_FIRST as it doesn't have to keep a stack of open sequences.
compose let's you apply operators without leaving the fluent style of specifying operators. It takes a transformer function that is called with the assembled stream that far and should return a new stream with added operators/behavior.
Heikki Vesalainen
@hvesalai
May 18 2017 17:18
Ok, and so then FlowableTransformers.expand emits the values of the original stream and for each value recursively calls the expand function to create more values until there is nothing to expand
David Karnok
@akarnokd
May 18 2017 17:19
yes
Heikki Vesalainen
@hvesalai
May 18 2017 17:21
And does it does not create values before they are requested?
This is your code outside the RxJava library. Is there any intention to have this be included in the core library?
David Karnok
@akarnokd
May 18 2017 17:27
It honors backpressure so it follows the demand from your downstream.
The Extensions library is the holding place for uncommon, rare or otherwise peculiar operators that are currently unlikely to be included in RxJava itself.
Heikki Vesalainen
@hvesalai
May 18 2017 17:28
Uh... I thought my scenario was very normal.