These are chat archives for ReactiveX/RxJava

14th
Sep 2017
Stas Shusha
@journeyman
Sep 14 2017 16:45
Hi, could anyone please help me understand why this test fails?! In Rx.NET it works perfectly fine and seems logical :) And I couldnt find the explanation in the docs
    @Test
    fun test() {
        val connectable = Observable.just(1,2,3).replay(1)
        val testSubscriber = TestSubscriber.create<Int>()

        connectable.connect()
        connectable.subscribe(testSubscriber)

        testSubscriber.assertReceivedOnNext(listOf(3))
    }
the actual result is all 3 items: [1,2,3]
David Karnok
@akarnokd
Sep 14 2017 17:14
Because of backpressure. replay() won't request unless there is a consumer for it.
Stas Shusha
@journeyman
Sep 14 2017 17:15
found an answer: related to backpressure changes in rxjava 1.0.14
@akarnokd oh, thankx)
David Karnok
@akarnokd
Sep 14 2017 17:24
You are welcome, twice I guess ;)
Stas Shusha
@journeyman
Sep 14 2017 17:49
sure)
though you haven't propose any workarounds
    fun <T> Observable<T>.replayForced() : ConnectableObservable<T> {
        val upstream = this.replay()
        val sub = upstream.subscribe()
        upstream.doOnUnsubscribe { sub.unsubscribe() }
        return upstream
    }