These are chat archives for ReactiveX/RxJava

31st
Dec 2015
Artem Kholodnyi
@defHLT
Dec 31 2015 11:37
hi #rxjava. I'm a little loss here.. How do I implement an observable that supports backpressure?
Javier Domingo Cansino
@txomon
Dec 31 2015 11:51
@mlatu you mean observer?
just in the subscription pipeline, specify the backpressure clauses
like myObservable.onBackPressureDrop().subscribe(mySubscriptor)
Artem Kholodnyi
@defHLT
Dec 31 2015 12:00
@txomon Thanks, but I'm aware of onBackPressureDrop and ..Buffer. I'm creating an observable that calls subscriber.onNext() too often and I've got MissingBackpressureException. I'd like to make my observable honor backpressure and not to call subscriber.onNext() until more items are requested
Javier Domingo Cansino
@txomon
Dec 31 2015 13:12
@mlatu maybe you are looking for retry() ?
Dorus
@Dorus
Dec 31 2015 13:12
I think he want to create his own observable, with backpreasure support.
Not something i'm familiar with either, but i should swear there would be tutorials on that subject. Cant find them however.
Artem Kholodnyi
@defHLT
Dec 31 2015 15:31
@Dorus is right. can't find those tutorials either
Artem Kholodnyi
@defHLT
Dec 31 2015 15:40
I'm viewing source of OnSubscribeRange and that seems to be what I need
Rudi Grinberg
@rgrinberg
Dec 31 2015 23:28
Suppose I have the following timeout scenario. I want to ping some external source every x seconds and wait for a response for n seconds. If the source fails to response, I’d like to to restart the connection and this pinging process (possibly with backoff)
Would rx help me with this?
Dorus
@Dorus
Dec 31 2015 23:28
source.timeout().retry() ?
Rudi Grinberg
@rgrinberg
Dec 31 2015 23:28
I could possibly use a publishsubject on every ping sent i mean
@Dorus so source would be an observable of ping responses?
Dorus
@Dorus
Dec 31 2015 23:29
source would connect to the external source yes.
Rudi Grinberg
@rgrinberg
Dec 31 2015 23:30
and I guess I’d use the overload oftimeout(10, TimeUnit.seconds, <observable that receives pings>)?
is that about right?
Dorus
@Dorus
Dec 31 2015 23:34
Ah, that overload is one option, but only allows you to resubscribe onces.
If you use retry() you have more freedom
Also, the observable that makes the connection, and then receives the pings and put them in onNext is the same right?
Rudi Grinberg
@rgrinberg
Dec 31 2015 23:37
I guess I could make it so, that part of my code doesn’t use rx yet and it’s painful :(
Dorus
@Dorus
Dec 31 2015 23:39
oh, that complicates things a little.
Rudi Grinberg
@rgrinberg
Dec 31 2015 23:39
a little? trying to cheer me up I see? :D
Dorus
@Dorus
Dec 31 2015 23:43
Well, probably not all that hard to wrap your current code into an observable, depending a little on how it looks like. Once you did that correctly, the rest is easy.
If you give me a general idea on how it's called i can probably cook something up as an example