Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    oh you found one?
    Thomas Wilgenbus
    @regnarock
    oh no. Just looking at the link you gave me
    Dorus
    @Dorus
    yeah i'm trying to use something similar to that to get what you need. But dinner is ready, so i gotta go now. Might get back to it later, but you might eb able to puzzle it out yourself :)
    Thomas Wilgenbus
    @regnarock
    Yes I think so too. You have been of a great help, thank you very much :D. And eat well
    Dorus
    @Dorus
    @regnarock I'm not complaining, but i just realized you changed your specs. You first said B only emited 1 item.
    Still, some trick to query both A and B at the same time is still faster.
    I'm thinking, could it be as easy a .Replay on A?
    Ivru
    @Ivru
    Hi there. Trying a few thing with Rx (android implementation) and I've got a few questions. Let's say I want to use Rx to fetch some data from 2 API call (callA and callB). callB depends on callA. Also, every data from both calls must be store in a DB. After a few try, I've come to the solution with 2 observables : one for callA that will query the API and then store the data using a map transformation, and a second one for callB (that will also store the data using a map). In order to be sure that callA is done before calling callB, I'm using a concatWith on callA.
    My question is: is there something more elegant? I was thinking about having one observable per API call (let say call1 and call2) and 1 per storage process (store1 and store2). Thing is, I didn't manage to do so since there isn't any implementation of "Then" operator. Or am I missing the point with Rx ?
    Dmitriy Tarasevich
    @darussian
    @Ivru have you tried using Observable#flatmap on callA. That's what I usually do if I have a call that's dependent on another.
          Observable<Integer> intObservable = Observable.just("Test")
                    .flatMap(new Func1<String, Observable<Integer>>() {
                        @Override
                        public Observable<Integer> call(String s) {
                            return Observable.just(s.length());
                        }
                    });
    Ben Christensen
    @benjchristensen
    FlatMap is the 'then' that supports followup work that is asynchronous. Map is then 'then' for followup work that is just synchronous computational (transformation, no IO).
    flatmap is the most important operator to learn and use for composition.
    Ivru
    @Ivru
    Gonna try the flat one then. Thank you @darussian @benjchristensen
    Ivru
    @Ivru
    Nope :)
    Ivru
    @Ivru
    I've changed this a bit. But still got a problem: My observer get notice by the sectorInsert Observable instead of the zip. Only the first 3 observable are being execute (sectorsFetch, sectorsInsert ans employeesFetch). Any idea of what am I doing wrong ? https://gist.github.com/Ivru/55c41a95edba448828f6a70e361e575d
    Ivru
    @Ivru
    My bad. The Zip wasn't the good option here
    No, actually I don't get it. My zip shouldn't be called unless both, sectorsInsert and employeesInsert are done, right ?
    Ivru
    @Ivru
    "...whenever each of the Observables has emitted a new itemZip"
    pradeepkusingh
    @pradeepkusingh
    @benjchristensen : I am new for reactive programming and just start exploring it, can you please suggest few examples and docs which I can use for get familiarized with JAVA RX ?
    demial
    @hellodemial_twitter
    This message was deleted
    pradeepsingh1234
    @pradeepsingh1234
    thanks @benjchristensen
    Ari Lacenski
    @tensory
    If you're using the Schedulers.io scheduler to observeOn, but you want to do some computation on the emitted values before writing back to an io location, should you observeOn a different thread?
    Ben Christensen
    @benjchristensen

    @tensory that's not necessary since that would just add more scheduling overhead and not really save the CPUs. For computational work there is a finite amount of work the CPUs can do, so moving to another thread isn't going to change anything, just use more CPU to move the work.

    The reason that Schedulers.io() exists is to provide threads that you can block ... without blocking the event loops on Schedulers.computation(). The io() thread pool is unbounded, so keeps growing if you block threads and ask for more. The computation() pool is bounded to the number of CPUs, so you don't want to block them.

    So, you can always do blocking IO + computation on the io() threads. On the computation threads though, only do computation, never blocking IO.

    Ari Lacenski
    @tensory
    Okay, thank you, @benjchristensen. I was concerned about adding too much work on the io thread, but I didn't realize it was a thread pool.
    cavemansspa
    @cavemansspa

    i'm using subscribeOn(Schedulers.io()) and noticed in my application log file that the tread name printed is e.g.: INFO 10:35:09.530 [RxCachedThreadScheduler-2207]

    does that mean the rx thread cache has 2000 plus threads, or it's just that the thread ids are simply incrementing as they are created / released?

    Dorus
    @Dorus
    Schedulers.io() is made to make new threads so you dont lock up on io heavy tasks.
    :point_up: 7 juli 2016 18:21
    cavemansspa
    @cavemansspa
    @Dorus understood, but it's not clear if these threads are being re-used.
    Dorus
    @Dorus

    mmm i scanned the code quickly and i think it should be caching the threads:

        /**
         * Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}
         * except using {@code threadFactory} for thread creation.
         */
        @Experimental
        public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
            if (threadFactory == null) throw new NullPointerException("threadFactory == null");
            return new CachedThreadScheduler(threadFactory);
        }

    https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/plugins/RxJavaSchedulersHook.java#L72-L77

    cavemansspa
    @cavemansspa
    okay -- so it's unbounded per @benjchristensen. but i assume once you're done with a burst of async requests, threads will be re-used.
    Dorus
    @Dorus
    i would think so yes, but i dont know the code either. If you use aysnc requests i wouldnt expect it to use many threads at all.
    I ran a small sample and it gives RxIoScheduler-2 for the first thread.
    2207 is high
    Ran it 4 times and is uses RxIoScheduler-2 4 times.
    mm, now it does rack up the number
    cavemansspa
    @cavemansspa
    right, i was thinking the RxCachedScheduler might have an upper limit and queue up work. i can see where i can get a burst of 2000 requests. seems to handle fine, but wanting to understand the potential affects.
    Dorus
    @Dorus
            Action1<String> printMe =
                    new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println(Thread.currentThread().getName());
                }
            };
    
            Observable<String> ob = Observable.just("").delay(1, TimeUnit.SECONDS, Schedulers.io()).subscribeOn(Schedulers.io());
            ob.subscribe(printMe);
            ob.subscribe(printMe);
            ob.subscribe(printMe);
            ob.subscribe(printMe);
    -->
    RxIoScheduler-5
    RxIoScheduler-8
    RxIoScheduler-9
    RxIoScheduler-7
    David Karnok
    @akarnokd
    @cavemansspa IO threads have a TTL of 60 seconds by default when they are shut down. Their thread name contains an ever incrementing number. I'm guessing you don't have 2000+ threads hanging around in your app.
    cavemansspa
    @cavemansspa
    @akarnokd -- okay, good to know.
    cavemansspa
    @cavemansspa
    so Schedulers.io() is unbounded? can we potentially blow up some upper limit of threads or RxCachedScheduler pooling manages that?
    David Karnok
    @akarnokd
    Yes, unbounded.
    cavemansspa
    @cavemansspa
    does unbounded mean it internally handles the case where we request more threads than available on jvm / machine?
    David Karnok
    @akarnokd
    No, it crashes with OutOfMemoryError; there are no internal limits. Use Schedulers.from() with a custom ThreadPoolExecutorService.
    cavemansspa
    @cavemansspa
    ok, thanks for clarification. although nowadays, i believe number of threads on a 64 bit jvm is quite large, so prob won't hit an issue in my use-case. but good to know the alternative.
    Selina Li
    @ItsSelina
    Is there an operator to concatenate an A with an Observable<A>, to get an Observable<A>?
    Ben Christensen
    @benjchristensen
    streamOfA.concatWith(just(anA))
    Selina Li
    @ItsSelina
    Perfect, thanks
    r é m i 
    @remirobert
    Hi everyone, I am trying to do something with RxJava, I have a List of N Observable, which return the same kind of value. All of the Observables are async. I want to combine all of them and subscribe an List of [Value].