These are chat archives for ReactiveX/RxJava

27th
Aug 2018
Nicholas Bransby-Williams
@nbransby
Aug 27 2018 00:05
think ive got it: first.buffer(second)
.map { it.size - 1 }
.scan(0) { acc, current -> acc + current }
.filter { it == 0 }
骨来(PeterLi)
@pli2014
Aug 27 2018 03:25
Schedulers.io() is un-bound thread pool, so when there is a blocked process,the number of threads will be increased unsafely.
 @Test
    public void threadCounter() throws Exception {
        int j = 0;
        while (j++ < 1000) {
            Flowable
                .fromCallable(() -> {
                    Thread.sleep(1000);
                    return RandomUtils.nextInt();
                })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(c -> {
                    //System.out.println("c:" + c);
                });
        }

        IoScheduler io = (IoScheduler) Schedulers.io();

        int i = 0;
        while (i++ < 10000) {
            System.out.println("io size:" + io.size());
            Thread.sleep(1000);
        }
    }
David Karnok
@akarnokd
Aug 27 2018 07:43
@igorbzin_gitlab It is important you provide code that demonstrates your problem. The implication of viewModel.getPlace() changes the setup quite a bit. I assume this fires from the main thread and thus subscribeOn has no effect. Replace it with observeOn.
@nbransby How should that single emitted resulting item relate to the inputs?
Nicholas Bransby-Williams
@nbransby
Aug 27 2018 12:20
@akarnokd my use case is keeping track of open tabs and the result should signal that all tabs have been closed, I have this but havent actually tested it yet
            //close the window when all tabs are closed
            tabDidOpen.buffer(tabDidClose)
                    .map { it.size - 1 }
                    .scan(0) { acc, current -> acc + current }
                    .filter { it == 0 }
                    .subscribe { closeWindow() }