Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ignacio Baca Moreno-Torres
    @ibaca
    You really should read all the fromXxx methods :wink2:
    Niranjan
    @nnanda2016
    @ibaca you’re right…i totally miss this part…funny thing is, i am already doing it (converting an iterator to iterable)…but not sure why i missed that..it was my bad!
    Ignacio Baca Moreno-Torres
    @ibaca
    :smile: :+1:
    Vu Minh Hoang
    @vuminhhoang
    hi
    lukaszguz
    @lukaszguz

    Hi,
    I'm a bit confused about how doOnComplete works.

    Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
                    .doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
    
            Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
    
            groupBy.flatMap(characterStringGroupedFlowable -> bufferAndDelay(stats(characterStringGroupedFlowable)))
                    .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                    .test()
                    .await();
    
    private Flowable<String> stats(GroupedFlowable<Character, String> groupedFlowable) {
            return groupedFlowable.compose(upstream -> {
                AtomicLong count = new AtomicLong(0);
                return upstream
                        .doOnNext(s -> {
                            count.incrementAndGet();
                            println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - [" + s + "] - count " + count.get());
                        })
                        .doOnComplete(() -> println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - complete - count " + count.get()));
            });
        }
    
        private Flowable<String> bufferAndDelay(Flowable<String> flowable) {
            return flowable.publish(stringFlowable ->
                    stringFlowable.buffer(1)
                            .concatMap(bufferedStrings ->
                                    Flowable.just(bufferedStrings)
                                            .doOnNext(x -> println(Thread.currentThread().getName() + " - before delay - " + x))
                                            .delay(100, TimeUnit.MILLISECONDS, Schedulers.computation())
                                            .doOnNext(x -> println(Thread.currentThread().getName() + " - after delay - " + x))
                                            .flatMap(Flowable::fromIterable)
                            ));
        }

    Output:

    pool-1-thread-1 - group: a - stats - [aa] - count 1
    pool-1-thread-1 - before delay - [aa]
    pool-1-thread-1 - group: b - stats - [ba] - count 1
    pool-1-thread-1 - before delay - [ba]
    pool-1-thread-1 - group: a - stats - [ab] - count 2
    pool-1-thread-1 - group: b - stats - [bb] - count 2
    pool-1-thread-1 - end emit
    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    RxComputationThreadPool-1 - after delay - [aa]
    RxComputationThreadPool-2 - after delay - [ba]
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
    RxComputationThreadPool-1 - before delay - [ab]
    RxComputationThreadPool-2 - before delay - [bb]
    RxComputationThreadPool-4 - after delay - [bb]
    RxComputationThreadPool-3 - after delay - [ab]
    RxComputationThreadPool-3 - before delay - [ac]
    RxComputationThreadPool-5 - after delay - [ac]

    Result is that

    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    ...
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3

    Why stats - complete is before stats - [ac] - count 3? Am I break rule saying "emit event synchronously, not concurrently"
    When I remove publish from bufferAndDelay method order is correct.

    lukaszguz
    @lukaszguz

    Shorter version:

    Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
                    .doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
    
            Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
    
            groupBy.flatMap(characterStringGroupedFlowable -> {
                return stats(characterStringGroupedFlowable).publish(stringFlowable ->
                        stringFlowable.concatMap(s ->
                                Flowable.just(s)
                                        .delay(100, TimeUnit.MILLISECONDS)));
            })
                    .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                    .test()
                    .await();

    Output:

    pool-1-thread-1 - group: a - stats - [aa] - count 1
    pool-1-thread-1 - group: b - stats - [ba] - count 1
    pool-1-thread-1 - group: a - stats - [ab] - count 2
    pool-1-thread-1 - group: b - stats - [bb] - count 2
    pool-1-thread-1 - end emit
    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
    Dan O'Reilly
    @dano
    Once a disposable has been added to CompositeDisposable, am I correct in thinking that the CompositeDisposable will keep a reference to it until it's explicitly removed? Meaning, if the source of a given Disposable that has been added to the CompositeDisposable terminates, the Disposable can't get garbage collected as long as the CompositeDisposable keeps the reference to it?
    Dan O'Reilly
    @dano
    looks like subscribeAutoDispose from RxJava2Extensions will make sure the reference gets removed
    i think that's what i want
    lukaszguz
    @lukaszguz

    Is it a bug in rx2? Btw I obeyed your advice and I replace part

    stringFlowable.concatMap(s ->
                                Flowable.just(s)
                                        .delay(100, TimeUnit.MILLISECONDS)));

    with

    stringFlowable.compose(FlowableTransformers.spanout(10, 10, TimeUnit.MILLISECONDS, scheduler))

    now it's working :)
    Thx :)

    Aditya Chaturvedi
    @apexkid
    Hello guys. Greetings. I need some help from the Rx community here. I am new to Rx and using RxJava to write an Android app. I want to chain the response from a GET api returning a Single to a POST api returning a Completable. I am not able to achieve that. I posted a SO question but no help till now. Please suggest how to do this. https://stackoverflow.com/questions/53633071/how-to-pass-output-of-a-single-to-a-completable-in-rxjava
    Vu Minh Hoang
    @vuminhhoang
    Did you resolve problem?
    Aditya Chaturvedi
    @apexkid
    No
    Vu Minh Hoang
    @vuminhhoang
    I am not really know what do you want . Basically, you get the Single<String> then want to send this String to api2 for posrting api ?
    Is that right ?
    Aditya Chaturvedi
    @apexkid
    Correct
    Vu Minh Hoang
    @vuminhhoang
    Okay
    Firstly,You have to get a String
    api.getToken().blockingGet()
    It will returning the String for you
    Next,
    Aditya Chaturvedi
    @apexkid
    I can do single.subscribe( r -> completableAPi(r).subscribe ... ) but the code will be messy. I want to chain operations the way i mentioned in the SO
    Vu Minh Hoang
    @vuminhhoang
    pass the value to post api
    Aditya Chaturvedi
    @apexkid
    This is android so cannot do blocking calls.
    Vu Minh Hoang
    @vuminhhoang
    You can
    api2.saveToken(token = {api1.getToken().blockingget})
    Aditya Chaturvedi
    @apexkid
    ok let me try this
    Vu Minh Hoang
    @vuminhhoang
    Using lambda in this case
    Aditya Chaturvedi
    @apexkid
    Not working. The subscription itself is not getting triggered.
    completableAPI(singleAPI.blockingGet()).subscribeOn(..).observeOn(..).subcribe(...)
    Vu Minh Hoang
    @vuminhhoang
    wait
    Aditya Chaturvedi
    @apexkid
    When i do singleAPI.map(result -> completableAPI(result) it returns me a Single<Completable>. My usecase is that i just want to return a Completable on which i can subscribe.
    If i do Completable.fromSingle(...single<completable> here...) then it the completable call is ignored.
    and if i do singleAPi.flatmapCompletable(result -> completableAPI(result)), I am able to get an object of type Completable but the inner call happens on the main thread even if i do subscribeOn(Schedulers.io()).
    Too many things that i am confused with here.
    hayaticakici
    @hayaticakici
    test
    Vu Minh Hoang
    @vuminhhoang
    still there ?
    I just tried
    it worked !
    Aditya Chaturvedi
    @apexkid
    @vuminhhoang Can you share code snippet on gist?
    Ignacio Baca Moreno-Torres
    @ibaca
    OMG hehe please try not to block and never ever subribe inside a subscription :astonished:
    As you said, you can chain the calls, for example using flatMap
    Aditya Chaturvedi
    @apexkid
    @ibaca Sounds weird to be as well. Whats the point of Rx if we have to block. :|
    That's true but i am not able to translate the Single<Completable> to Completable
    Ignacio Baca Moreno-Torres
    @ibaca
    What is the single and what is the Completable?
    In the app domain (ex a user get and a archive post)
    Aditya Chaturvedi
    @apexkid
    @ibaca Will it be possible to take a look at the stackoverflow link i shared? I have written in detail there. You will get entire context of the problem
    Ignacio Baca Moreno-Torres
    @ibaca
    Oh hehe OK
    Aditya Chaturvedi
    @apexkid
    Thanks! :)
    Ignacio Baca Moreno-Torres
    @ibaca
    uhm but you have already solved the chain problem