Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ari Meidan
    @AriMeidan
    Oh, i get it. so i'll get the t that was passes.
    *passed
    David Karnok
    @akarnokd
    You can edit comments.
    Ari Meidan
    @AriMeidan
    I ignore it because i just need the trigger to call a method.
    David Karnok
    @akarnokd
    Yes, adapt it to your needs.
    Ari Meidan
    @AriMeidan

    Now i've changed it so when onNext is called i emit the next value from the array. I need to show an infinite loop of screens each showen for a different time.

        private Observable<Integer> screenTimeSteam() {
            return Observable.create(emitter -> {
                StreamFinishEventListener finishListener = new StreamFinishEventListener(){
                    int i = 0;
                    @Override
                    public void onFinished() {
                        int item = i % timers.length;
                        i++;
                        emitter.onNext(timers[item]);
                    }
                };
                setFinishListener(finishListener);
                finishListener.onFinished();
            });
        }//screenTimeSteam

    Is there an easier way?

    David Karnok
    @akarnokd
    How about doing the flatMap I suggested with a repeat after fromArray?
    Arsala Bangash
    @ArsalaBangash

    Hi! I need some help knowing which operators to use in the following scenario:

    I have a button that submits a code to the backend and also a button that would cancel the submission of the code. I already have a method that makes an API call to the backend and returns an Observable<Result<String>>, let's call this submitCode()

    I would like to do the following:

    submitButtonClickStream -> trigger submitCode() -> cancel submission if cancel stream onNexts -> subscribe to submitCode result and implement success and failure handlers

    Ari Meidan
    @AriMeidan

    @akarnokd

    How about doing the flatMap I suggested with a repeat after fromArray?

    I get error for memory allocation.

    Niranjan
    @nnanda2016
    Hi…How to create an Observable from an iterator?
    I didnt find any factory method to create one. Do I have to use emitter?
    Niranjan
    @nnanda2016

    Pardon my limited knowledge in RxJava; how can I achieve following? I am trying to implement something like below.

    CustomIterator<String> iter = ... code to build custom iterator
    
    Observable.<String>create(emitter -> {
            while(iter.hasNext()) {
                emitter.onNext(iter.next());
            }
        }, BackpressureMode.BUFFER)
        .map(id -> {
            Doc doc = fetchFromDB(id);
            convertDocToVO(doc);
        })
        .map(vo -> complicatedTask)
        .forEach(/* Do final processing */);
        ;

    I need some help around complicatedTask. Current non-reactive implementation performs an action parallely (using FJP common pool) and for each VO document, it creates Future and drops that to a blocking queue. The consumer blocks on the Future sequentially and once available does some other processing. This model gives us two things:

    1. Parallel execution
    2. Blocking queue guarantees that a fixed no. of Futures are created.

    Some clarifications:

    1. The method compicatedTask does all the processing in CPU; no I/O.
    2. Each VO has a collection property which needs some processing which is parallelized.
    3. The return type of compicatedTask is another type of DTO.

    if I want to achieve similar in reactive, how should I do?

    If i return Future from .map(vo -> complicatedTask), then it will lead to a lot of Futures in memory.
    David Karnok
    @akarnokd
    Observable.from((Iterable<String>)() -> iter).
    Niranjan
    @nnanda2016
    Thanks @akarnokd …can you provide some ideas for the second problem?
    Ignacio Baca Moreno-Torres
    @ibaca
    You really should read all the fromXxx methods :wink2:
    Niranjan
    @nnanda2016
    @ibaca you’re right…i totally miss this part…funny thing is, i am already doing it (converting an iterator to iterable)…but not sure why i missed that..it was my bad!
    Ignacio Baca Moreno-Torres
    @ibaca
    :smile: :+1:
    Vu Minh Hoang
    @vuminhhoang
    hi
    lukaszguz
    @lukaszguz

    Hi,
    I'm a bit confused about how doOnComplete works.

    Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
                    .doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
    
            Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
    
            groupBy.flatMap(characterStringGroupedFlowable -> bufferAndDelay(stats(characterStringGroupedFlowable)))
                    .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                    .test()
                    .await();
    
    private Flowable<String> stats(GroupedFlowable<Character, String> groupedFlowable) {
            return groupedFlowable.compose(upstream -> {
                AtomicLong count = new AtomicLong(0);
                return upstream
                        .doOnNext(s -> {
                            count.incrementAndGet();
                            println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - [" + s + "] - count " + count.get());
                        })
                        .doOnComplete(() -> println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - complete - count " + count.get()));
            });
        }
    
        private Flowable<String> bufferAndDelay(Flowable<String> flowable) {
            return flowable.publish(stringFlowable ->
                    stringFlowable.buffer(1)
                            .concatMap(bufferedStrings ->
                                    Flowable.just(bufferedStrings)
                                            .doOnNext(x -> println(Thread.currentThread().getName() + " - before delay - " + x))
                                            .delay(100, TimeUnit.MILLISECONDS, Schedulers.computation())
                                            .doOnNext(x -> println(Thread.currentThread().getName() + " - after delay - " + x))
                                            .flatMap(Flowable::fromIterable)
                            ));
        }

    Output:

    pool-1-thread-1 - group: a - stats - [aa] - count 1
    pool-1-thread-1 - before delay - [aa]
    pool-1-thread-1 - group: b - stats - [ba] - count 1
    pool-1-thread-1 - before delay - [ba]
    pool-1-thread-1 - group: a - stats - [ab] - count 2
    pool-1-thread-1 - group: b - stats - [bb] - count 2
    pool-1-thread-1 - end emit
    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    RxComputationThreadPool-1 - after delay - [aa]
    RxComputationThreadPool-2 - after delay - [ba]
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
    RxComputationThreadPool-1 - before delay - [ab]
    RxComputationThreadPool-2 - before delay - [bb]
    RxComputationThreadPool-4 - after delay - [bb]
    RxComputationThreadPool-3 - after delay - [ab]
    RxComputationThreadPool-3 - before delay - [ac]
    RxComputationThreadPool-5 - after delay - [ac]

    Result is that

    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    ...
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3

    Why stats - complete is before stats - [ac] - count 3? Am I break rule saying "emit event synchronously, not concurrently"
    When I remove publish from bufferAndDelay method order is correct.

    lukaszguz
    @lukaszguz

    Shorter version:

    Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
                    .doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
    
            Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
    
            groupBy.flatMap(characterStringGroupedFlowable -> {
                return stats(characterStringGroupedFlowable).publish(stringFlowable ->
                        stringFlowable.concatMap(s ->
                                Flowable.just(s)
                                        .delay(100, TimeUnit.MILLISECONDS)));
            })
                    .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                    .test()
                    .await();

    Output:

    pool-1-thread-1 - group: a - stats - [aa] - count 1
    pool-1-thread-1 - group: b - stats - [ba] - count 1
    pool-1-thread-1 - group: a - stats - [ab] - count 2
    pool-1-thread-1 - group: b - stats - [bb] - count 2
    pool-1-thread-1 - end emit
    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
    Dan O'Reilly
    @dano
    Once a disposable has been added to CompositeDisposable, am I correct in thinking that the CompositeDisposable will keep a reference to it until it's explicitly removed? Meaning, if the source of a given Disposable that has been added to the CompositeDisposable terminates, the Disposable can't get garbage collected as long as the CompositeDisposable keeps the reference to it?
    Dan O'Reilly
    @dano
    looks like subscribeAutoDispose from RxJava2Extensions will make sure the reference gets removed
    i think that's what i want
    lukaszguz
    @lukaszguz

    Is it a bug in rx2? Btw I obeyed your advice and I replace part

    stringFlowable.concatMap(s ->
                                Flowable.just(s)
                                        .delay(100, TimeUnit.MILLISECONDS)));

    with

    stringFlowable.compose(FlowableTransformers.spanout(10, 10, TimeUnit.MILLISECONDS, scheduler))

    now it's working :)
    Thx :)

    Aditya Chaturvedi
    @apexkid
    Hello guys. Greetings. I need some help from the Rx community here. I am new to Rx and using RxJava to write an Android app. I want to chain the response from a GET api returning a Single to a POST api returning a Completable. I am not able to achieve that. I posted a SO question but no help till now. Please suggest how to do this. https://stackoverflow.com/questions/53633071/how-to-pass-output-of-a-single-to-a-completable-in-rxjava
    Vu Minh Hoang
    @vuminhhoang
    Did you resolve problem?
    Aditya Chaturvedi
    @apexkid
    No
    Vu Minh Hoang
    @vuminhhoang
    I am not really know what do you want . Basically, you get the Single<String> then want to send this String to api2 for posrting api ?
    Is that right ?
    Aditya Chaturvedi
    @apexkid
    Correct
    Vu Minh Hoang
    @vuminhhoang
    Okay
    Firstly,You have to get a String
    api.getToken().blockingGet()
    It will returning the String for you
    Next,
    Aditya Chaturvedi
    @apexkid
    I can do single.subscribe( r -> completableAPi(r).subscribe ... ) but the code will be messy. I want to chain operations the way i mentioned in the SO
    Vu Minh Hoang
    @vuminhhoang
    pass the value to post api
    Aditya Chaturvedi
    @apexkid
    This is android so cannot do blocking calls.
    Vu Minh Hoang
    @vuminhhoang
    You can
    api2.saveToken(token = {api1.getToken().blockingget})
    Aditya Chaturvedi
    @apexkid
    ok let me try this
    Vu Minh Hoang
    @vuminhhoang
    Using lambda in this case
    Aditya Chaturvedi
    @apexkid
    Not working. The subscription itself is not getting triggered.
    completableAPI(singleAPI.blockingGet()).subscribeOn(..).observeOn(..).subcribe(...)
    Vu Minh Hoang
    @vuminhhoang
    wait
    Aditya Chaturvedi
    @apexkid
    When i do singleAPI.map(result -> completableAPI(result) it returns me a Single<Completable>. My usecase is that i just want to return a Completable on which i can subscribe.
    If i do Completable.fromSingle(...single<completable> here...) then it the completable call is ignored.
    and if i do singleAPi.flatmapCompletable(result -> completableAPI(result)), I am able to get an object of type Completable but the inner call happens on the main thread even if i do subscribeOn(Schedulers.io()).
    Too many things that i am confused with here.
    hayaticakici
    @hayaticakici
    test