These are chat archives for ReactiveX/RxJava

12th
Jul 2016
cavemansspa
@cavemansspa
Jul 12 2016 15:16

i'm using subscribeOn(Schedulers.io()) and noticed in my application log file that the tread name printed is e.g.: INFO 10:35:09.530 [RxCachedThreadScheduler-2207]

does that mean the rx thread cache has 2000 plus threads, or it's just that the thread ids are simply incrementing as they are created / released?

Dorus
@Dorus
Jul 12 2016 15:17
Schedulers.io() is made to make new threads so you dont lock up on io heavy tasks.
:point_up: 7 juli 2016 18:21
cavemansspa
@cavemansspa
Jul 12 2016 15:19
@Dorus understood, but it's not clear if these threads are being re-used.
Dorus
@Dorus
Jul 12 2016 15:25

mmm i scanned the code quickly and i think it should be caching the threads:

    /**
     * Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}
     * except using {@code threadFactory} for thread creation.
     */
    @Experimental
    public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
        if (threadFactory == null) throw new NullPointerException("threadFactory == null");
        return new CachedThreadScheduler(threadFactory);
    }

https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/plugins/RxJavaSchedulersHook.java#L72-L77

cavemansspa
@cavemansspa
Jul 12 2016 15:30
okay -- so it's unbounded per @benjchristensen. but i assume once you're done with a burst of async requests, threads will be re-used.
Dorus
@Dorus
Jul 12 2016 15:31
i would think so yes, but i dont know the code either. If you use aysnc requests i wouldnt expect it to use many threads at all.
I ran a small sample and it gives RxIoScheduler-2 for the first thread.
2207 is high
Ran it 4 times and is uses RxIoScheduler-2 4 times.
mm, now it does rack up the number
cavemansspa
@cavemansspa
Jul 12 2016 15:35
right, i was thinking the RxCachedScheduler might have an upper limit and queue up work. i can see where i can get a burst of 2000 requests. seems to handle fine, but wanting to understand the potential affects.
Dorus
@Dorus
Jul 12 2016 15:37
        Action1<String> printMe =
                new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(Thread.currentThread().getName());
            }
        };

        Observable<String> ob = Observable.just("").delay(1, TimeUnit.SECONDS, Schedulers.io()).subscribeOn(Schedulers.io());
        ob.subscribe(printMe);
        ob.subscribe(printMe);
        ob.subscribe(printMe);
        ob.subscribe(printMe);
-->
RxIoScheduler-5
RxIoScheduler-8
RxIoScheduler-9
RxIoScheduler-7
David Karnok
@akarnokd
Jul 12 2016 16:32
@cavemansspa IO threads have a TTL of 60 seconds by default when they are shut down. Their thread name contains an ever incrementing number. I'm guessing you don't have 2000+ threads hanging around in your app.
cavemansspa
@cavemansspa
Jul 12 2016 17:51
@akarnokd -- okay, good to know.
cavemansspa
@cavemansspa
Jul 12 2016 18:03
so Schedulers.io() is unbounded? can we potentially blow up some upper limit of threads or RxCachedScheduler pooling manages that?
David Karnok
@akarnokd
Jul 12 2016 19:02
Yes, unbounded.
cavemansspa
@cavemansspa
Jul 12 2016 19:09
does unbounded mean it internally handles the case where we request more threads than available on jvm / machine?
David Karnok
@akarnokd
Jul 12 2016 19:24
No, it crashes with OutOfMemoryError; there are no internal limits. Use Schedulers.from() with a custom ThreadPoolExecutorService.
cavemansspa
@cavemansspa
Jul 12 2016 19:27
ok, thanks for clarification. although nowadays, i believe number of threads on a 64 bit jvm is quite large, so prob won't hit an issue in my use-case. but good to know the alternative.