These are chat archives for ReactiveX/RxJava

28th
Aug 2018
Ignacio Baca Moreno-Torres
@ibaca
Aug 28 2018 18:30
@nbransby if you have a stream of “plus” and “minus” events, you can do something like this…
val lines = Observable.create<String> { while (true) it.onNext(readLine()!!) }
        .subscribeOn(Schedulers.newThread()).share()
val plus = lines.filter { "+" == it }.map { { acc: Int -> acc + 1 } }
val minus = lines.filter { "-" == it }.map { { acc: Int -> acc - 1 } }
val calc = mergeArray<(Int) -> Int>(plus, minus).scan(0) { acc, fn -> fn(acc) }
calc.doOnNext(::println).blockingLast()
plus and minus are of type Observable<(Int) -> Int>