These are chat archives for ReactiveX/RxJava

16th
Jul 2015
MightySeal
@MightySeal
Jul 16 2015 08:13
Hello everyone. Have anyone implemented something similar to producer-consumer pattern using rxjava?
I have tried using PublishSubject but it is not possible to limit background threads to 1. Is it possible?
Simon Baslé
@simonbasle
Jul 16 2015 09:29
@MightySeal how do you mean, limit background thread to 1?
from a Producer-Consumer point of view, keep in mind that if no one subscribes to a PublishSubject it will kind of throw away the items fed to it (since it doesn't cache anything, and you only receive emissions that come after you subscribe)
MightySeal
@MightySeal
Jul 16 2015 09:35
@simonbasle it's ok. Limit threads — i mean i want only one task being executed in another thread and other tasks will wait until current is done.
Moreover i would like to have possibility to add new tasks at any time i need.
Simon Baslé
@simonbasle
Jul 16 2015 09:41
do you use RxJava elsewhere? I'm not sure this approach fits well in the reactive model...
sounds like an Executors.newSingleThreadExecutor()? (backed by a single-thread "pool" and an unbounded queue for queueing up tasks)
in RxJava, Schedulers.computation() is close except its pool is sized to the number of CPUs
MightySeal
@MightySeal
Jul 16 2015 09:44
@simonbasle looks like. I'm new to rx and just trying to implement some tasks in this way. Thanks =)
Simon Baslé
@simonbasle
Jul 16 2015 09:46
mmh so if you want to do this in Rx, the submission of your tasks should be preceded by something like .observeOn(mySingleScheduler)
what kind of tasks are you dealing with?
where mySingleScheduler is prepared earlier as Schedulers.from(Executors.newSingleThreadExecutor())
(where mySingleScheduler is prepared earlier as Schedulers.from(Executors.newSingleThreadExecutor()))
(oups, sorry duplicate lines, lost the connection a bit so I wrote twice)
MightySeal
@MightySeal
Jul 16 2015 09:53
@simonbasle android file downloading. The whole idea is the following: i have a list of files, i start download and can add files to queue at any time. The files are not very large.
Simon Baslé
@simonbasle
Jul 16 2015 09:56
ah Android
so you probably want to do the actual downloading in the background and then notify somehow on the UI thread?
did you have a look at RxAndroid?
MightySeal
@MightySeal
Jul 16 2015 10:00
@simonbasle yes, already using it. But still can't figure out how to have only one task running simultaneously.
Simon Baslé
@simonbasle
Jul 16 2015 10:03
I'm not entirely sure (and maybe an Android dev here can confirm), but I think what you can do is something like
//with s being a PublishSubject<String> or <URL>, and download() returning a status somehow
final Scheduler singleScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
s
  .subscribeOn(singleScheduler)
  .map(s -> download(s))
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(status -> showStatusInUI(status));
(I'm using lambda syntax for conciseness and readability here)
MightySeal
@MightySeal
Jul 16 2015 10:04
Yep, i'm familiar with lambdas =)
Will try, thanks again
MightySeal
@MightySeal
Jul 16 2015 11:54
Well, looks like it's better to implement this feature as usual and use Observables for easier async behavoir handling.
Vasco Figueira
@vlfig
Jul 16 2015 13:02

Greetings all,

I was looking at ways to throttle my async requests. I'm familiar with the "zip with a timer" pattern but I wanted something more akin to "limit number of in-flight requests".

Any pointers?

The setup is a typical Observable[SomeArgs]flatMapped with the Observable.from(httpResponseFuture)(RxScala, but details shouldn't be important here)
Simon Baslé
@simonbasle
Jul 16 2015 13:09
@vlfig what do you want to do if the number of requests is above your limit?
drop some requests?
Vasco Figueira
@vlfig
Jul 16 2015 13:14
I want to pause consuming upstream when I've reached the limit
I'm using AnormCypher to issue statements to Neo4j. I'm drowning it and netty's complaining about "Too many open files in system". Essentially I need to somehow relate backpressure to some state that I maintain, I guess.
Simon Baslé
@simonbasle
Jul 16 2015 13:22
what is the source of the statements? is it dynamic, or do you know in advance how many statements you'll issue?
Vasco Figueira
@vlfig
Jul 16 2015 13:23
They come from a huge file. I'd prefer to assume dynamic.
Simon Baslé
@simonbasle
Jul 16 2015 13:29
ok so this should be possible to get a stream of these requests (eg. using Observable.from) in a way that support reactive pull backpressure
then you can have your Subscriber call requestwith a sane amount that Netty will be able to cope with?
Vasco Figueira
@vlfig
Jul 16 2015 13:30
Heh, exactly the page I was reading, at the exact place. :-)
Thanks, let me have a look, and a think.
Simon Baslé
@simonbasle
Jul 16 2015 13:30
haha :)