These are chat archives for ReactiveX/RxJava

9th
May 2017
Vladimir Baryshnikov
@vovkab
May 09 2017 00:10
Tried rxjava2, seems like completely ignores calls to request(1).
Vladimir Baryshnikov
@vovkab
May 09 2017 00:22
ah, you have to remove super.onStart();, but it still not working correctly:
doOnRequest, after flatmap n: 128
doOnRequest, before flatmap n: 128
Maybe I'm doing something wrong, but even with .rebatchRequests(1) it will emit whole range 1..10 for no reason, even no one requested it :(
Chris Hatton
@chris-hatton
May 09 2017 04:32
Hi, I'm struggling to understand why I cannot make an Observer subscribe to an Observable and unsubscribe it again, without 'extra' code.
Sound basic, sound ridiculous? I hope so! Consider this function which closes the gap, but feels like it should be part of Rx already:
    fun <T> Observable<T>.disposableSubscribe(observer:Observer<T>) : Disposable {
        return this.subscribe(observer::onNext,observer::onError,observer::onComplete)
    }
(Its Kotlin, RxJava 2.0)
Put another way... why does observable.subscribe(observer) not return a disposable?
Vladimir Baryshnikov
@vovkab
May 09 2017 05:55
@chris-hatton try subscribeWith()
Hiếu Slash
@hieutrtr
May 09 2017 10:00
hi there
im new in rx
could you guys tell me how RX compares with spark. Any use cases for both ?
Vladimir Baryshnikov
@vovkab
May 09 2017 16:34
Any ideas about what is wrong with backpressure and control flow? Why both rx1 and rx2 doesn't follow request(n)?
Vladimir Baryshnikov
@vovkab
May 09 2017 18:46
To ensure order of items, I tried to use concatMap() but for some reasons it prefetches 2 items by default and completely ignores request(n) :(
Jeremy Lyman
@maarek
May 09 2017 19:34
I have a queue that I am wanting to wrap in a Flowable. Items are added to the queue and emitted one at a time. The part that I'm trying to figure out is if I can emit only until an awk occurs. In my current impl, the awk is the removal of the currently emitted item from the queue. I don't want the end subscriber to handle the removal of the item from the queue but they should process the emitted item. Once their onNext is finished can something upstream handle the queue.remove() and the observable emit a new item.
Does that make sense? Is it possible to have something like a consumer/subscriber that other subscribers are attached to and knows when the downstream subscriber is finished before doing more work?
Vladimir Baryshnikov
@vovkab
May 09 2017 20:09
@maarek I'm trying to solve something like this too. I need to handle items strictly one by one. I thought I could use backpressure and reactive pull, but it doesn't work the way I would expect.
Jeremy Lyman
@maarek
May 09 2017 20:24
That's what I was playing with as well.
But I don't want a ResourceSubscriber to know about the disk cache that I'm working with.
I'd like a doAfterNext that occured after each item is nabbed by the subscriber, not during emission.
Jeremy Lyman
@maarek
May 09 2017 20:42
I suppose in my case, a new item doesn't emit until some action occurs. So I don't need to manage the request(1) mechanism. I just need to know when a subscriber is for sure finished with its onNext callback.
Vladimir Baryshnikov
@vovkab
May 09 2017 20:53
I think this is where request(1) should work, basically saying I'm ready to handle more items. And it works up to some degree, except some operators overproduce and don't follow request(n).
And then we would be able to do something like Observable.from(queue), and handle items one by one, control flow...
David Karnok
@akarnokd
May 09 2017 21:52
@vovkab 1.x is sometimes odd, you have to call request(0) in the constructor to suppress the default Long.MAX_VALUE that may get requested before onStart even runs.
request(n) is a negotiation between subsequent operators and there is no guarantee that any request amount reaches the top producer unchanged. Many operators have their own batching and requestig patterns: even if you request 2 items, observeOn will request 128 and flatMap may go Long.MAX_VALUE
Vladimir Baryshnikov
@vovkab
May 09 2017 21:55
is there a way to enforce it? observables could be very expensive, and we don't want 128 if we only ever need is 1
David Karnok
@akarnokd
May 09 2017 21:57
Many operators can be parametrized on buffer size that often translates to prefetch amounts, observeOn and flatMap can be limited to 1 but you should probably rethink the design of your flows.
Vladimir Baryshnikov
@vovkab
May 09 2017 21:59
Requirements is pretty simple - consume items on demand. I think the issue here is that many operators over produce for no reason, even when control flow explicitly requested.
David Karnok
@akarnokd
May 09 2017 22:04
RxJava was designed for streaming, that means prefetching some data so in case of a backpressure situation, the pipeline is kept busy as much as possible. Sounds like you want some ping-pong like request-response processing over a longer and modulated chain.
Vladimir Baryshnikov
@vovkab
May 09 2017 22:46
I understand, but how you suppose to control flow if most operators doesn't follow it?
Maybe we need something like maxRequestN?
Jeremy Lyman
@maarek
May 09 2017 23:38
@akarnokd I think I see your answer to my question on StackOverflow. :P This wasn't my SO question but it's sort of what I was looking for. http://stackoverflow.com/questions/27197915/make-sure-item-is-delivered-to-onnext-listener-in-rxjava
Some mechanism on to know whether a Consumer had actually consumed an event