Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    MightySeal
    @MightySeal
    I have tried using PublishSubject but it is not possible to limit background threads to 1. Is it possible?
    Simon Baslé
    @simonbasle
    @MightySeal how do you mean, limit background thread to 1?
    from a Producer-Consumer point of view, keep in mind that if no one subscribes to a PublishSubject it will kind of throw away the items fed to it (since it doesn't cache anything, and you only receive emissions that come after you subscribe)
    MightySeal
    @MightySeal
    @simonbasle it's ok. Limit threads — i mean i want only one task being executed in another thread and other tasks will wait until current is done.
    Moreover i would like to have possibility to add new tasks at any time i need.
    Simon Baslé
    @simonbasle
    do you use RxJava elsewhere? I'm not sure this approach fits well in the reactive model...
    sounds like an Executors.newSingleThreadExecutor()? (backed by a single-thread "pool" and an unbounded queue for queueing up tasks)
    in RxJava, Schedulers.computation() is close except its pool is sized to the number of CPUs
    MightySeal
    @MightySeal
    @simonbasle looks like. I'm new to rx and just trying to implement some tasks in this way. Thanks =)
    Simon Baslé
    @simonbasle
    mmh so if you want to do this in Rx, the submission of your tasks should be preceded by something like .observeOn(mySingleScheduler)
    what kind of tasks are you dealing with?
    where mySingleScheduler is prepared earlier as Schedulers.from(Executors.newSingleThreadExecutor())
    (where mySingleScheduler is prepared earlier as Schedulers.from(Executors.newSingleThreadExecutor()))
    (oups, sorry duplicate lines, lost the connection a bit so I wrote twice)
    MightySeal
    @MightySeal
    @simonbasle android file downloading. The whole idea is the following: i have a list of files, i start download and can add files to queue at any time. The files are not very large.
    Simon Baslé
    @simonbasle
    ah Android
    so you probably want to do the actual downloading in the background and then notify somehow on the UI thread?
    did you have a look at RxAndroid?
    MightySeal
    @MightySeal
    @simonbasle yes, already using it. But still can't figure out how to have only one task running simultaneously.
    Simon Baslé
    @simonbasle
    I'm not entirely sure (and maybe an Android dev here can confirm), but I think what you can do is something like
    //with s being a PublishSubject<String> or <URL>, and download() returning a status somehow
    final Scheduler singleScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
    s
      .subscribeOn(singleScheduler)
      .map(s -> download(s))
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(status -> showStatusInUI(status));
    (I'm using lambda syntax for conciseness and readability here)
    MightySeal
    @MightySeal
    Yep, i'm familiar with lambdas =)
    Will try, thanks again
    MightySeal
    @MightySeal
    Well, looks like it's better to implement this feature as usual and use Observables for easier async behavoir handling.
    Vasco Figueira
    @vlfig

    Greetings all,

    I was looking at ways to throttle my async requests. I'm familiar with the "zip with a timer" pattern but I wanted something more akin to "limit number of in-flight requests".

    Any pointers?

    The setup is a typical Observable[SomeArgs]flatMapped with the Observable.from(httpResponseFuture)(RxScala, but details shouldn't be important here)
    Simon Baslé
    @simonbasle
    @vlfig what do you want to do if the number of requests is above your limit?
    drop some requests?
    Vasco Figueira
    @vlfig
    I want to pause consuming upstream when I've reached the limit
    I'm using AnormCypher to issue statements to Neo4j. I'm drowning it and netty's complaining about "Too many open files in system". Essentially I need to somehow relate backpressure to some state that I maintain, I guess.
    Simon Baslé
    @simonbasle
    what is the source of the statements? is it dynamic, or do you know in advance how many statements you'll issue?
    Vasco Figueira
    @vlfig
    They come from a huge file. I'd prefer to assume dynamic.
    Simon Baslé
    @simonbasle
    ok so this should be possible to get a stream of these requests (eg. using Observable.from) in a way that support reactive pull backpressure
    then you can have your Subscriber call requestwith a sane amount that Netty will be able to cope with?
    Vasco Figueira
    @vlfig
    Heh, exactly the page I was reading, at the exact place. :-)
    Thanks, let me have a look, and a think.
    Simon Baslé
    @simonbasle
    haha :)
    Denis Stepanov
    @dstepanov
    Hi, how can I map the exception of Observable but result keep the same?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @dstepanov can you please provide more info, you got Observable that emits some kind of items and you in case of exception you want to do what?
    David Stemmer
    @weefbellington
    hello! I have an RxJava specific query on the topic of Observable.Operators
    I have an Operator that needs to do some work in the background. Specifically, it needs to talk to a system service (the Android DrmManagerClient), set a listener on that service, wait for the service to do some work, and emit the output of that work
    David Stemmer
    @weefbellington
    for the most part this is pretty straightforward -- the Operator returns a Subscriberthat sets up the listener in the onNext method. When the listener callback fires, it calls onNext on the subscriber that handles the output
    however, in this case the callback fires on a thread that is different than the original subscribeOnthread -- it's being handled inside a DrmManagerClient event handler thread
    so my question is: what is the right thing to do in this case? Should I return control to the thread created by the original subscribeOn scheduler (in this example, Schedulers.io)? If so, what is the best way to do that?
    David Stemmer
    @weefbellington
    (I can think a bunch of naiive ways to block a thread while waiting for a resource to become available, like Object#wait/Object#notify, but I was hoping there might be something more idiomatic to RxJava)
    Matt Langston
    @mattblang
    If I have a flatMap of Observables, is there any way to handle an exception then short-circuit the rest of the calls (not call them)?
    Denis Stepanov
    @dstepanov
    I would like to map the exception to other exception, let's say I know there is an exception that wraps the original one. Would be nice to have observable.mapError(throwable -> {logic})
    Robert Winkler
    @RobWin
    @dstepanov Do you know http://javaslang.com/ ?