Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    selmangurgen
    @selmangurgen
    Hello guys
    Im developing an reactive-app with spring webflux and using redis as database. I want continuously querying all data in my collection just like MongoDb with @Tailable annotation. Is there a way to do it in redis too?
    Oguzhan Fehmi Kunter
    @ofkntr
    Hi currently I am developing reactive webflux application with redis.it possible to use spring reactive redis sub / pub events ?
    marc
    @marcules

    Hi, I'm trying to figure out if my usage of RxJava (with Spring) is a) actually an use-case for reactive and b) if I'm doing it right, because I have to jump through some (perceived?) hoops to get to the result I want, so I'm doubting that I'm doing it right.

    I have a request R, that is passed through some "fixers" F(R) that require a Single/Mono and return a Single/Mono of the same type, then this Mono/Single F(R) is passed into "validators" V(F(R)) that require a Single/Mono of type R and return a Single/Mono of type bool; when all V(F(R)) are returned and true I want to send them to some "performers" P(F(R)) that require the Single/Mono of type R and return a result and take the first of the results.

    I have a problem with that setup, because I have to "reuse" F(R) for both the validators and the performers, so I would either have to wait after all validators are done and then pass the original F(R) value to P(F(R)) or have to zip the validation result with F(R) and pass that into P(F(R)) < is the second approach idiomatic?

    Also I have to "wait" for all F(R) to finish before passing them to the validators, unless I want to call the methods behind F(R) multiple times for each V(F(R)) - I'm missing a "andThen" method or something? Which passes the value from the Mono to the next Mono? (Is there such a concept and I've missed it? Because mono.then(otherMono) has no connection between the values)

    Helder Sousa
    @htmsousa
    I think you want to use something like a
    selmangurgen
    @selmangurgen
    Hi guys, i have a question, i have nested io-read operations in my reactive app using webflux stack. i want to get best performance with thread management.
    i tried single(), elastic(),parallel() and other commands. but i didnt see any difference with respect of their speeds
    i get data hierarctically, thatswhy i have nested flatmaps . what do you suggest about such io operation with Scheduler methods or any other way to get better performance
    selmangurgen
    @selmangurgen
    }).log().subscribeOn(Schedulers.elastic());
    return stageMap.then(Mono.just(seasonDto));
    }).log().subscribeOn(Schedulers.elastic());
    return seasonMap.then(Mono.just(tournamentDto));
    }).log().subscribeOn(Schedulers.elastic());
    return tournamentDtoFlux.subscribeOn(Schedulers.single());
    such nested read by filling parent object's fields
    IamZeek
    @ziaur25
    hello
    Ari Meidan
    @AriMeidan
    Hi, i need some help.
    I need to take an array of integers that each represent time before calling a method.
    I've tryied to create an Observable from Array, but i dont know how to make the delay for the onNext() to be called.
    David Karnok
    @akarnokd
    @AriMeidan flatMap them onto a timer:
    Observable.fromArray(100, 200, 500, 1000)
    .flatMap(t -> Observable.timer(t, TimeUnit.MILLISECONDS).map(v -> t))
    .blockingSubscribe(v -> System.out.println("Signal T=" + v));
    Ari Meidan
    @AriMeidan
    @akarnokd Whats the internal map for?
    Ari Meidan
    @AriMeidan
    @akarnokd you need to use this .flatMap(t -> Observable.timer(t, TimeUnit.MILLISECONDS).map(v -> t), 1) to limit the number of concurency.
    David Karnok
    @akarnokd
    So you don't just get a bunch of zeros.
    If you want a delay between items then yes.
    Ari Meidan
    @AriMeidan
    its just mapping to itself. Why zeros?
    Ari Meidan
    @AriMeidan
    Oh, i get it. so i'll get the t that was passes.
    *passed
    David Karnok
    @akarnokd
    You can edit comments.
    Ari Meidan
    @AriMeidan
    I ignore it because i just need the trigger to call a method.
    David Karnok
    @akarnokd
    Yes, adapt it to your needs.
    Ari Meidan
    @AriMeidan

    Now i've changed it so when onNext is called i emit the next value from the array. I need to show an infinite loop of screens each showen for a different time.

        private Observable<Integer> screenTimeSteam() {
            return Observable.create(emitter -> {
                StreamFinishEventListener finishListener = new StreamFinishEventListener(){
                    int i = 0;
                    @Override
                    public void onFinished() {
                        int item = i % timers.length;
                        i++;
                        emitter.onNext(timers[item]);
                    }
                };
                setFinishListener(finishListener);
                finishListener.onFinished();
            });
        }//screenTimeSteam

    Is there an easier way?

    David Karnok
    @akarnokd
    How about doing the flatMap I suggested with a repeat after fromArray?
    Arsala Bangash
    @ArsalaBangash

    Hi! I need some help knowing which operators to use in the following scenario:

    I have a button that submits a code to the backend and also a button that would cancel the submission of the code. I already have a method that makes an API call to the backend and returns an Observable<Result<String>>, let's call this submitCode()

    I would like to do the following:

    submitButtonClickStream -> trigger submitCode() -> cancel submission if cancel stream onNexts -> subscribe to submitCode result and implement success and failure handlers

    Ari Meidan
    @AriMeidan

    @akarnokd

    How about doing the flatMap I suggested with a repeat after fromArray?

    I get error for memory allocation.

    Niranjan
    @nnanda2016
    Hi…How to create an Observable from an iterator?
    I didnt find any factory method to create one. Do I have to use emitter?
    Niranjan
    @nnanda2016

    Pardon my limited knowledge in RxJava; how can I achieve following? I am trying to implement something like below.

    CustomIterator<String> iter = ... code to build custom iterator
    
    Observable.<String>create(emitter -> {
            while(iter.hasNext()) {
                emitter.onNext(iter.next());
            }
        }, BackpressureMode.BUFFER)
        .map(id -> {
            Doc doc = fetchFromDB(id);
            convertDocToVO(doc);
        })
        .map(vo -> complicatedTask)
        .forEach(/* Do final processing */);
        ;

    I need some help around complicatedTask. Current non-reactive implementation performs an action parallely (using FJP common pool) and for each VO document, it creates Future and drops that to a blocking queue. The consumer blocks on the Future sequentially and once available does some other processing. This model gives us two things:

    1. Parallel execution
    2. Blocking queue guarantees that a fixed no. of Futures are created.

    Some clarifications:

    1. The method compicatedTask does all the processing in CPU; no I/O.
    2. Each VO has a collection property which needs some processing which is parallelized.
    3. The return type of compicatedTask is another type of DTO.

    if I want to achieve similar in reactive, how should I do?

    If i return Future from .map(vo -> complicatedTask), then it will lead to a lot of Futures in memory.
    David Karnok
    @akarnokd
    Observable.from((Iterable<String>)() -> iter).
    Niranjan
    @nnanda2016
    Thanks @akarnokd …can you provide some ideas for the second problem?
    Ignacio Baca Moreno-Torres
    @ibaca
    You really should read all the fromXxx methods :wink2:
    Niranjan
    @nnanda2016
    @ibaca you’re right…i totally miss this part…funny thing is, i am already doing it (converting an iterator to iterable)…but not sure why i missed that..it was my bad!
    Ignacio Baca Moreno-Torres
    @ibaca
    :smile: :+1:
    Vu Minh Hoang
    @vuminhhoang
    hi
    lukaszguz
    @lukaszguz

    Hi,
    I'm a bit confused about how doOnComplete works.

    Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
                    .doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
    
            Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
    
            groupBy.flatMap(characterStringGroupedFlowable -> bufferAndDelay(stats(characterStringGroupedFlowable)))
                    .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                    .test()
                    .await();
    
    private Flowable<String> stats(GroupedFlowable<Character, String> groupedFlowable) {
            return groupedFlowable.compose(upstream -> {
                AtomicLong count = new AtomicLong(0);
                return upstream
                        .doOnNext(s -> {
                            count.incrementAndGet();
                            println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - [" + s + "] - count " + count.get());
                        })
                        .doOnComplete(() -> println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - complete - count " + count.get()));
            });
        }
    
        private Flowable<String> bufferAndDelay(Flowable<String> flowable) {
            return flowable.publish(stringFlowable ->
                    stringFlowable.buffer(1)
                            .concatMap(bufferedStrings ->
                                    Flowable.just(bufferedStrings)
                                            .doOnNext(x -> println(Thread.currentThread().getName() + " - before delay - " + x))
                                            .delay(100, TimeUnit.MILLISECONDS, Schedulers.computation())
                                            .doOnNext(x -> println(Thread.currentThread().getName() + " - after delay - " + x))
                                            .flatMap(Flowable::fromIterable)
                            ));
        }

    Output:

    pool-1-thread-1 - group: a - stats - [aa] - count 1
    pool-1-thread-1 - before delay - [aa]
    pool-1-thread-1 - group: b - stats - [ba] - count 1
    pool-1-thread-1 - before delay - [ba]
    pool-1-thread-1 - group: a - stats - [ab] - count 2
    pool-1-thread-1 - group: b - stats - [bb] - count 2
    pool-1-thread-1 - end emit
    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    RxComputationThreadPool-1 - after delay - [aa]
    RxComputationThreadPool-2 - after delay - [ba]
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
    RxComputationThreadPool-1 - before delay - [ab]
    RxComputationThreadPool-2 - before delay - [bb]
    RxComputationThreadPool-4 - after delay - [bb]
    RxComputationThreadPool-3 - after delay - [ab]
    RxComputationThreadPool-3 - before delay - [ac]
    RxComputationThreadPool-5 - after delay - [ac]

    Result is that

    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    ...
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3

    Why stats - complete is before stats - [ac] - count 3? Am I break rule saying "emit event synchronously, not concurrently"
    When I remove publish from bufferAndDelay method order is correct.

    lukaszguz
    @lukaszguz

    Shorter version:

    Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
                    .doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
    
            Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
    
            groupBy.flatMap(characterStringGroupedFlowable -> {
                return stats(characterStringGroupedFlowable).publish(stringFlowable ->
                        stringFlowable.concatMap(s ->
                                Flowable.just(s)
                                        .delay(100, TimeUnit.MILLISECONDS)));
            })
                    .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                    .test()
                    .await();

    Output:

    pool-1-thread-1 - group: a - stats - [aa] - count 1
    pool-1-thread-1 - group: b - stats - [ba] - count 1
    pool-1-thread-1 - group: a - stats - [ab] - count 2
    pool-1-thread-1 - group: b - stats - [bb] - count 2
    pool-1-thread-1 - end emit
    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
    Dan O'Reilly
    @dano
    Once a disposable has been added to CompositeDisposable, am I correct in thinking that the CompositeDisposable will keep a reference to it until it's explicitly removed? Meaning, if the source of a given Disposable that has been added to the CompositeDisposable terminates, the Disposable can't get garbage collected as long as the CompositeDisposable keeps the reference to it?
    Dan O'Reilly
    @dano
    looks like subscribeAutoDispose from RxJava2Extensions will make sure the reference gets removed
    i think that's what i want
    lukaszguz
    @lukaszguz

    Is it a bug in rx2? Btw I obeyed your advice and I replace part

    stringFlowable.concatMap(s ->
                                Flowable.just(s)
                                        .delay(100, TimeUnit.MILLISECONDS)));

    with

    stringFlowable.compose(FlowableTransformers.spanout(10, 10, TimeUnit.MILLISECONDS, scheduler))

    now it's working :)
    Thx :)

    Aditya Chaturvedi
    @apexkid
    Hello guys. Greetings. I need some help from the Rx community here. I am new to Rx and using RxJava to write an Android app. I want to chain the response from a GET api returning a Single to a POST api returning a Completable. I am not able to achieve that. I posted a SO question but no help till now. Please suggest how to do this. https://stackoverflow.com/questions/53633071/how-to-pass-output-of-a-single-to-a-completable-in-rxjava
    Vu Minh Hoang
    @vuminhhoang
    Did you resolve problem?
    Aditya Chaturvedi
    @apexkid
    No
    Vu Minh Hoang
    @vuminhhoang
    I am not really know what do you want . Basically, you get the Single<String> then want to send this String to api2 for posrting api ?
    Is that right ?