Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    MightySeal
    @MightySeal
    Well, looks like it's better to implement this feature as usual and use Observables for easier async behavoir handling.
    Vasco Figueira
    @vlfig

    Greetings all,

    I was looking at ways to throttle my async requests. I'm familiar with the "zip with a timer" pattern but I wanted something more akin to "limit number of in-flight requests".

    Any pointers?

    The setup is a typical Observable[SomeArgs]flatMapped with the Observable.from(httpResponseFuture)(RxScala, but details shouldn't be important here)
    Simon Baslé
    @simonbasle
    @vlfig what do you want to do if the number of requests is above your limit?
    drop some requests?
    Vasco Figueira
    @vlfig
    I want to pause consuming upstream when I've reached the limit
    I'm using AnormCypher to issue statements to Neo4j. I'm drowning it and netty's complaining about "Too many open files in system". Essentially I need to somehow relate backpressure to some state that I maintain, I guess.
    Simon Baslé
    @simonbasle
    what is the source of the statements? is it dynamic, or do you know in advance how many statements you'll issue?
    Vasco Figueira
    @vlfig
    They come from a huge file. I'd prefer to assume dynamic.
    Simon Baslé
    @simonbasle
    ok so this should be possible to get a stream of these requests (eg. using Observable.from) in a way that support reactive pull backpressure
    then you can have your Subscriber call requestwith a sane amount that Netty will be able to cope with?
    Vasco Figueira
    @vlfig
    Heh, exactly the page I was reading, at the exact place. :-)
    Thanks, let me have a look, and a think.
    Simon Baslé
    @simonbasle
    haha :)
    Denis Stepanov
    @dstepanov
    Hi, how can I map the exception of Observable but result keep the same?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @dstepanov can you please provide more info, you got Observable that emits some kind of items and you in case of exception you want to do what?
    David Stemmer
    @weefbellington
    hello! I have an RxJava specific query on the topic of Observable.Operators
    I have an Operator that needs to do some work in the background. Specifically, it needs to talk to a system service (the Android DrmManagerClient), set a listener on that service, wait for the service to do some work, and emit the output of that work
    David Stemmer
    @weefbellington
    for the most part this is pretty straightforward -- the Operator returns a Subscriberthat sets up the listener in the onNext method. When the listener callback fires, it calls onNext on the subscriber that handles the output
    however, in this case the callback fires on a thread that is different than the original subscribeOnthread -- it's being handled inside a DrmManagerClient event handler thread
    so my question is: what is the right thing to do in this case? Should I return control to the thread created by the original subscribeOn scheduler (in this example, Schedulers.io)? If so, what is the best way to do that?
    David Stemmer
    @weefbellington
    (I can think a bunch of naiive ways to block a thread while waiting for a resource to become available, like Object#wait/Object#notify, but I was hoping there might be something more idiomatic to RxJava)
    Matt Langston
    @mattblang
    If I have a flatMap of Observables, is there any way to handle an exception then short-circuit the rest of the calls (not call them)?
    Denis Stepanov
    @dstepanov
    I would like to map the exception to other exception, let's say I know there is an exception that wraps the original one. Would be nice to have observable.mapError(throwable -> {logic})
    Robert Winkler
    @RobWin
    @dstepanov Do you know http://javaslang.com/ ?
    There is a Try monad. Maybe you can use it for your requirement
    https://github.com/aol/cyclops is another library which can be used with RXJava
    @mattblang If you need a real circuit-breaker, you could use https://github.com/RobWin/circuitbreaker-java8
    but I assume yon mean something different with short-circuit.
    Denis Stepanov
    @dstepanov
    I just want to keep the original error in the Observable, not the wrapper like ExecutionException etc.
    Matt Langston
    @mattblang

    @RobWin Maybe I should use different wording than short-circuit. Basically, I just want to end a series of calls in a flatMap if an Exception happens. See this SO ticket, I detailed my issue with a sample.

    http://stackoverflow.com/questions/31440271/short-circuit-a-flatmap-if-an-exception-occurs

    Dries De Smet
    @Busata
    hi, if I have an observable.from(listwithintegers).flatmap(new func1<Integer,SomeOtherClass>{ ... CODE ... }), and I want the code in flatmap to be executed in parallel since the ...CODE... blocks, why can't I just add a subscribeonscheduler on the flatmap? it works if the code is Observable.just(integer).map(blocking code).subscribeon, but not in the flatmap itself
    Dries De Smet
    @Busata
    https://gist.github.com/Busata/ccd000ff6426a9cdb4d3 eg, this, is it possible to let the flatmap run in parallel? (with that code it doesn't)
    David Stemmer
    @weefbellington
    @dstepanov are you using RxJava or another ReactiveX library? If you're using RxJava, maybe you could try (.onErrorResumeNext)
    @dstepanov the input to the .onErrorResumeNext would be a function that unwraps the Exception and returns a new Observable, which you would create with Observable.error(unwrappedException)
    David Stemmer
    @weefbellington
    @Busata I could be wrong but the way I read this is that when .subscribe is called it will execute the flatmap code on a computation worker thread, and that worker thread will block each time Thread.sleep is called
    in other words, the observable code will be running on a worker thread, but the code in flatmap will be serialized, not parallelized
    AFAIK there used to be a .parallel operator in RxJava but they removed it because its behavior was confusing
    what I might try (and I'm not sure if this is the right approach) is using Schedulers.trampoline to schedule some work in the flatmap function, and then return thread execution to the original Schedulers.io thread
    David Stemmer
    @weefbellington
    .flatMap(new Func1<Integer, Observable<Hotel>>() {
        @Override
            public Observable<Hotel> call(Integer coordinate) {
                Schedulers.trampoline().createWorker().schedule(new Action0() {
                     // ...your code here...
                });
            }
    })
    it's similar to the problem I posted above
    Pavel Meledin
    @btbvoy
    How to build Rx* application that will be ready:
    1. To scale on multiple machines? How to manage parallel execution of requests inside cluster of machines? How to split request processing between nodes in cluster?
    2. How to calculate amount of used resource and bandwidth of single node. In order to have some kind of flat progress bar that will indicate that 78 of 100% is used and approximately 20-70 extra requests certain node can handle and later we need to put extra node into cluster?
    David Stemmer
    @weefbellington
    @Busata that might be tricky because you need to return an Observable though...you could also try:
    .flatMap(new Func1<Integer, Observable<Hotel>>() {
        @Override
            public Observable<Hotel> call(Integer coordinate) {
                return Observable.just(coordinate)
                    .map(new Func1<Integer, Hotel>() {
                        @Override
                        public Hotel call(Integer coordinate) {
                            // ...your code here...
                        }
                    }
                    .subscribeOn(Schedulers.io);
                });
            }
    })
    inlining this crap is so ugly, ugh
    Denis Stepanov
    @dstepanov
    @weefbellington Thanks! that actually works!
    David Stemmer
    @weefbellington
    @dstepanov which one lol
    Denis Stepanov
    @dstepanov
    .onErrorResumeNext(e -> Observable.error(e.getCause())) ;)
    David Stemmer
    @weefbellington
    spoiled Java 8 people with your lambdas