Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ben Christensen
    @benjchristensen

    @tensory that's not necessary since that would just add more scheduling overhead and not really save the CPUs. For computational work there is a finite amount of work the CPUs can do, so moving to another thread isn't going to change anything, just use more CPU to move the work.

    The reason that Schedulers.io() exists is to provide threads that you can block ... without blocking the event loops on Schedulers.computation(). The io() thread pool is unbounded, so keeps growing if you block threads and ask for more. The computation() pool is bounded to the number of CPUs, so you don't want to block them.

    So, you can always do blocking IO + computation on the io() threads. On the computation threads though, only do computation, never blocking IO.

    Ari Lacenski
    @tensory
    Okay, thank you, @benjchristensen. I was concerned about adding too much work on the io thread, but I didn't realize it was a thread pool.
    cavemansspa
    @cavemansspa

    i'm using subscribeOn(Schedulers.io()) and noticed in my application log file that the tread name printed is e.g.: INFO 10:35:09.530 [RxCachedThreadScheduler-2207]

    does that mean the rx thread cache has 2000 plus threads, or it's just that the thread ids are simply incrementing as they are created / released?

    Dorus
    @Dorus
    Schedulers.io() is made to make new threads so you dont lock up on io heavy tasks.
    :point_up: 7 juli 2016 18:21
    cavemansspa
    @cavemansspa
    @Dorus understood, but it's not clear if these threads are being re-used.
    Dorus
    @Dorus

    mmm i scanned the code quickly and i think it should be caching the threads:

        /**
         * Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}
         * except using {@code threadFactory} for thread creation.
         */
        @Experimental
        public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
            if (threadFactory == null) throw new NullPointerException("threadFactory == null");
            return new CachedThreadScheduler(threadFactory);
        }

    https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/plugins/RxJavaSchedulersHook.java#L72-L77

    cavemansspa
    @cavemansspa
    okay -- so it's unbounded per @benjchristensen. but i assume once you're done with a burst of async requests, threads will be re-used.
    Dorus
    @Dorus
    i would think so yes, but i dont know the code either. If you use aysnc requests i wouldnt expect it to use many threads at all.
    I ran a small sample and it gives RxIoScheduler-2 for the first thread.
    2207 is high
    Ran it 4 times and is uses RxIoScheduler-2 4 times.
    mm, now it does rack up the number
    cavemansspa
    @cavemansspa
    right, i was thinking the RxCachedScheduler might have an upper limit and queue up work. i can see where i can get a burst of 2000 requests. seems to handle fine, but wanting to understand the potential affects.
    Dorus
    @Dorus
            Action1<String> printMe =
                    new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println(Thread.currentThread().getName());
                }
            };
    
            Observable<String> ob = Observable.just("").delay(1, TimeUnit.SECONDS, Schedulers.io()).subscribeOn(Schedulers.io());
            ob.subscribe(printMe);
            ob.subscribe(printMe);
            ob.subscribe(printMe);
            ob.subscribe(printMe);
    -->
    RxIoScheduler-5
    RxIoScheduler-8
    RxIoScheduler-9
    RxIoScheduler-7
    David Karnok
    @akarnokd
    @cavemansspa IO threads have a TTL of 60 seconds by default when they are shut down. Their thread name contains an ever incrementing number. I'm guessing you don't have 2000+ threads hanging around in your app.
    cavemansspa
    @cavemansspa
    @akarnokd -- okay, good to know.
    cavemansspa
    @cavemansspa
    so Schedulers.io() is unbounded? can we potentially blow up some upper limit of threads or RxCachedScheduler pooling manages that?
    David Karnok
    @akarnokd
    Yes, unbounded.
    cavemansspa
    @cavemansspa
    does unbounded mean it internally handles the case where we request more threads than available on jvm / machine?
    David Karnok
    @akarnokd
    No, it crashes with OutOfMemoryError; there are no internal limits. Use Schedulers.from() with a custom ThreadPoolExecutorService.
    cavemansspa
    @cavemansspa
    ok, thanks for clarification. although nowadays, i believe number of threads on a 64 bit jvm is quite large, so prob won't hit an issue in my use-case. but good to know the alternative.
    Selina Li
    @ItsSelina
    Is there an operator to concatenate an A with an Observable<A>, to get an Observable<A>?
    Ben Christensen
    @benjchristensen
    streamOfA.concatWith(just(anA))
    Selina Li
    @ItsSelina
    Perfect, thanks
    r é m i 
    @remirobert
    Hi everyone, I am trying to do something with RxJava, I have a List of N Observable, which return the same kind of value. All of the Observables are async. I want to combine all of them and subscribe an List of [Value].
    What can I use ? Zip, or combine ?
    I don't know how to use FunN
    Behzodbek Qodirov
    @behzodbek
    Hi guys!
    I'm wondering about creating pure function and I need your help guys!

    Let's say I have 2 kind of classes and I have to write mapper function.
    I need to map from

    public class PlayListEntity {
    //another vars
        private CategoryEntity category;
    }

    into

    public class PlayList  {
    //another vars
        private Category category;
    }
    I've a class mapper
    @Singleton
    public class PlaylistEntityDataMapper implements Function<PlayList, PlayListEntity> {
        @Inject
        public PlaylistEntityDataMapper() {
        }
    
        @Override
        public PlayList map(PlayListEntity entity) {
            PlayList playList=new PlayList();
         // Convert here entity into playlist
            return playList;
        }
    }
    Behzodbek Qodirov
    @behzodbek
    The problem is: in order to map my PlaylistEntity into Playlist I need mapper function for Category, meanwhile, Pure function does not depend on another objects.
    How to write independent mapper? Or how to map like this structures with RxJava?
    Dorus
    @Dorus
    @remirobert Do they all return the same value? N values? You can indeed use zip: Observable.zip(Ob1, Ob2, Ob3, (v1, v2, v3) -> {...})
    Wont work if they return a independent number of values.
    @behzodbek I cant read that. Use a 4 space jump or surround with
    ```Java
    [your code]
    ```
    (you can edit)
    Behzodbek Qodirov
    @behzodbek
    @Dorus Done
    Dorus
    @Dorus
    They can depend on a local object right?
    Would still be pure
    You can even do withLatestFrom(CatMapperObj)
    Behzodbek Qodirov
    @behzodbek
    I've a class with a inner field which is not premetive. Inner fields have to be converted too
    I need another converter for inner type...
    how to implement with RX?
    Dorus
    @Dorus
    You lost me at 'premetive'
    Behzodbek Qodirov
    @behzodbek
    @Dorus I mean primitive
    Behzodbek Qodirov
    @behzodbek
    Another problem here:
    What I need:
    1. App should fetch data from database and show that old data.
    2. Start Async request in order to fetch data from REST API
    3. Update DB
    4. Update UI
      (Data base model and RestAPI models are not the same class)
    How to implement this scenario with RXJava more properly?
    Eugene Popovich
    @httpdispatch
    Hello everybody. Does anybody know whether it is possible to use retry logic without the exception itself? For example retrofit returns response with error (isError() true)? Or i should rethrow wrapped exception?
    Eugene Popovich
    @httpdispatch
    Had to implement the following workaround
    public static <T> Observable.Transformer<Result<T>, Result<T>> retryOnError(int maxRetryCount) {
            return observable -> observable
                    .flatMap(response -> response.isError() ? Observable.error(response.error()) : Observable.just(response))
                    .retry((count, throwable) -> {
                                Timber.e(throwable, "retryOnError");
                                Timber.d("retryOnError: count=%d; maxRetryCount=%d", count, maxRetryCount);
                                if (count > maxRetryCount) {
                                    Timber.d("retryOnError: don't retry, max retry count reached");
                                    return false;
                                } else {
                                    Timber.d("retryOnError: retry");
                                    return true;
                                }
                            }
                    )
                    .onErrorResumeNext(t -> Observable.just(Result.error(t)));
        }