Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Yannick Lecaillez
    @ylecaillez
    oh yes, i missed that one. Thanks !
    Helder Sousa
    @htmsousa
    @ylecaillez or @akarnokd do you know where I can find an example of window(boundary) usage? even if it is driven by external events
    Incubator
    @incube8r
    Hello anyone around?
    David Karnok
    @akarnokd
    @htmsousa Why use window(boundary)? I'd pick a tool for my use case, not the other way around.
    @incube8r Just ask your question.
    Helder Sousa
    @htmsousa
    @akarnokd thank you for your feedback.. at the end of the day, I guess I was over-complicating and Flowable.distinctUntilChanged() will be enough for my use case.. that said, if the API provides a method Flowable.window(boundaryIndicatorSupplier) it would be nice to have a little example showing the usage of that approach
    Helder Sousa
    @htmsousa

    as an academic example, here is something I came up with:

               // source of data
                Flowable<Integer> source = Flowable.fromArray(1, 1, 2, 2, 2, 3, 5, 5, 4, 4).publish().autoConnect(2);
                // create a boundary flowable that will emmit a new element when the sequence
                // goes from odd to even numbers (and vice versa)
                Flowable<Integer> boundary = source.distinctUntilChanged((p, n) -> p % 2 == n % 2);
    
                // create a window every time the boundary emmits a new element (each window
                // will contain a sequence of elements that are odd or even numbers)
                source
                    .window(boundary)
                    .doOnEach(w -> System.out.println("Source - New Window"))
                    .forEach(window -> window.subscribe(j -> System.out.println(String.format("Value %s fetched", j))));

    output

    Source - New Window
    Source - New Window
    Value 1 fetched
    Value 1 fetched
    Source - New Window
    Value 2 fetched
    Value 2 fetched
    Value 2 fetched
    Source - New Window
    Value 3 fetched
    Value 5 fetched
    Value 5 fetched
    Source - New Window
    Value 4 fetched
    Value 4 fetched
    Source - New Window
    David Karnok
    @akarnokd
    It depends on why you want an external signal to split the source into consecutive windows. There is an open issue about upgrading the wiki with examples. You are welcome to expand this page to match the style of this one.
    Martin Ottersland
    @SlyOtis
    Hey this has probable been asked before but I'm trying to understand the delay operator in rxJava. Does calling this with the same scheduler that im running another observable on block this from executing during the delay?
    David Karnok
    @akarnokd
    RxJava uses non-blocking delays, but the work after such delays may prevent other tasks on the same scheduler for running (i.e., Schedulers.single()).
    selmangurgen
    @selmangurgen
    Hello guys
    Im developing an reactive-app with spring webflux and using redis as database. I want continuously querying all data in my collection just like MongoDb with @Tailable annotation. Is there a way to do it in redis too?
    Oguzhan Fehmi Kunter
    @ofkntr
    Hi currently I am developing reactive webflux application with redis.it possible to use spring reactive redis sub / pub events ?
    marc
    @marcules

    Hi, I'm trying to figure out if my usage of RxJava (with Spring) is a) actually an use-case for reactive and b) if I'm doing it right, because I have to jump through some (perceived?) hoops to get to the result I want, so I'm doubting that I'm doing it right.

    I have a request R, that is passed through some "fixers" F(R) that require a Single/Mono and return a Single/Mono of the same type, then this Mono/Single F(R) is passed into "validators" V(F(R)) that require a Single/Mono of type R and return a Single/Mono of type bool; when all V(F(R)) are returned and true I want to send them to some "performers" P(F(R)) that require the Single/Mono of type R and return a result and take the first of the results.

    I have a problem with that setup, because I have to "reuse" F(R) for both the validators and the performers, so I would either have to wait after all validators are done and then pass the original F(R) value to P(F(R)) or have to zip the validation result with F(R) and pass that into P(F(R)) < is the second approach idiomatic?

    Also I have to "wait" for all F(R) to finish before passing them to the validators, unless I want to call the methods behind F(R) multiple times for each V(F(R)) - I'm missing a "andThen" method or something? Which passes the value from the Mono to the next Mono? (Is there such a concept and I've missed it? Because mono.then(otherMono) has no connection between the values)

    Helder Sousa
    @htmsousa
    I think you want to use something like a
    selmangurgen
    @selmangurgen
    Hi guys, i have a question, i have nested io-read operations in my reactive app using webflux stack. i want to get best performance with thread management.
    i tried single(), elastic(),parallel() and other commands. but i didnt see any difference with respect of their speeds
    i get data hierarctically, thatswhy i have nested flatmaps . what do you suggest about such io operation with Scheduler methods or any other way to get better performance
    selmangurgen
    @selmangurgen
    }).log().subscribeOn(Schedulers.elastic());
    return stageMap.then(Mono.just(seasonDto));
    }).log().subscribeOn(Schedulers.elastic());
    return seasonMap.then(Mono.just(tournamentDto));
    }).log().subscribeOn(Schedulers.elastic());
    return tournamentDtoFlux.subscribeOn(Schedulers.single());
    such nested read by filling parent object's fields
    IamZeek
    @ziaur25
    hello
    Ari Meidan
    @AriMeidan
    Hi, i need some help.
    I need to take an array of integers that each represent time before calling a method.
    I've tryied to create an Observable from Array, but i dont know how to make the delay for the onNext() to be called.
    David Karnok
    @akarnokd
    @AriMeidan flatMap them onto a timer:
    Observable.fromArray(100, 200, 500, 1000)
    .flatMap(t -> Observable.timer(t, TimeUnit.MILLISECONDS).map(v -> t))
    .blockingSubscribe(v -> System.out.println("Signal T=" + v));
    Ari Meidan
    @AriMeidan
    @akarnokd Whats the internal map for?
    Ari Meidan
    @AriMeidan
    @akarnokd you need to use this .flatMap(t -> Observable.timer(t, TimeUnit.MILLISECONDS).map(v -> t), 1) to limit the number of concurency.
    David Karnok
    @akarnokd
    So you don't just get a bunch of zeros.
    If you want a delay between items then yes.
    Ari Meidan
    @AriMeidan
    its just mapping to itself. Why zeros?
    Ari Meidan
    @AriMeidan
    Oh, i get it. so i'll get the t that was passes.
    *passed
    David Karnok
    @akarnokd
    You can edit comments.
    Ari Meidan
    @AriMeidan
    I ignore it because i just need the trigger to call a method.
    David Karnok
    @akarnokd
    Yes, adapt it to your needs.
    Ari Meidan
    @AriMeidan

    Now i've changed it so when onNext is called i emit the next value from the array. I need to show an infinite loop of screens each showen for a different time.

        private Observable<Integer> screenTimeSteam() {
            return Observable.create(emitter -> {
                StreamFinishEventListener finishListener = new StreamFinishEventListener(){
                    int i = 0;
                    @Override
                    public void onFinished() {
                        int item = i % timers.length;
                        i++;
                        emitter.onNext(timers[item]);
                    }
                };
                setFinishListener(finishListener);
                finishListener.onFinished();
            });
        }//screenTimeSteam

    Is there an easier way?

    David Karnok
    @akarnokd
    How about doing the flatMap I suggested with a repeat after fromArray?
    Arsala Bangash
    @ArsalaBangash

    Hi! I need some help knowing which operators to use in the following scenario:

    I have a button that submits a code to the backend and also a button that would cancel the submission of the code. I already have a method that makes an API call to the backend and returns an Observable<Result<String>>, let's call this submitCode()

    I would like to do the following:

    submitButtonClickStream -> trigger submitCode() -> cancel submission if cancel stream onNexts -> subscribe to submitCode result and implement success and failure handlers

    Ari Meidan
    @AriMeidan

    @akarnokd

    How about doing the flatMap I suggested with a repeat after fromArray?

    I get error for memory allocation.

    Niranjan
    @nnanda2016
    Hi…How to create an Observable from an iterator?
    I didnt find any factory method to create one. Do I have to use emitter?
    Niranjan
    @nnanda2016

    Pardon my limited knowledge in RxJava; how can I achieve following? I am trying to implement something like below.

    CustomIterator<String> iter = ... code to build custom iterator
    
    Observable.<String>create(emitter -> {
            while(iter.hasNext()) {
                emitter.onNext(iter.next());
            }
        }, BackpressureMode.BUFFER)
        .map(id -> {
            Doc doc = fetchFromDB(id);
            convertDocToVO(doc);
        })
        .map(vo -> complicatedTask)
        .forEach(/* Do final processing */);
        ;

    I need some help around complicatedTask. Current non-reactive implementation performs an action parallely (using FJP common pool) and for each VO document, it creates Future and drops that to a blocking queue. The consumer blocks on the Future sequentially and once available does some other processing. This model gives us two things:

    1. Parallel execution
    2. Blocking queue guarantees that a fixed no. of Futures are created.

    Some clarifications:

    1. The method compicatedTask does all the processing in CPU; no I/O.
    2. Each VO has a collection property which needs some processing which is parallelized.
    3. The return type of compicatedTask is another type of DTO.

    if I want to achieve similar in reactive, how should I do?

    If i return Future from .map(vo -> complicatedTask), then it will lead to a lot of Futures in memory.
    David Karnok
    @akarnokd
    Observable.from((Iterable<String>)() -> iter).
    Niranjan
    @nnanda2016
    Thanks @akarnokd …can you provide some ideas for the second problem?
    Ignacio Baca Moreno-Torres
    @ibaca
    You really should read all the fromXxx methods :wink2:
    Niranjan
    @nnanda2016
    @ibaca you’re right…i totally miss this part…funny thing is, i am already doing it (converting an iterator to iterable)…but not sure why i missed that..it was my bad!
    Ignacio Baca Moreno-Torres
    @ibaca
    :smile: :+1:
    Vu Minh Hoang
    @vuminhhoang
    hi
    lukaszguz
    @lukaszguz

    Hi,
    I'm a bit confused about how doOnComplete works.

    Flowable<String> emitter = Flowable.just("aa", "ba", "ab", "bb", "ac")
                    .doOnComplete(() -> println(Thread.currentThread().getName() + " - end emit"));
    
            Flowable<GroupedFlowable<Character, String>> groupBy = emitter.groupBy(s -> s.charAt(0));
    
            groupBy.flatMap(characterStringGroupedFlowable -> bufferAndDelay(stats(characterStringGroupedFlowable)))
                    .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                    .test()
                    .await();
    
    private Flowable<String> stats(GroupedFlowable<Character, String> groupedFlowable) {
            return groupedFlowable.compose(upstream -> {
                AtomicLong count = new AtomicLong(0);
                return upstream
                        .doOnNext(s -> {
                            count.incrementAndGet();
                            println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - [" + s + "] - count " + count.get());
                        })
                        .doOnComplete(() -> println(Thread.currentThread().getName() + " - group: " + groupedFlowable.getKey() + " - stats - complete - count " + count.get()));
            });
        }
    
        private Flowable<String> bufferAndDelay(Flowable<String> flowable) {
            return flowable.publish(stringFlowable ->
                    stringFlowable.buffer(1)
                            .concatMap(bufferedStrings ->
                                    Flowable.just(bufferedStrings)
                                            .doOnNext(x -> println(Thread.currentThread().getName() + " - before delay - " + x))
                                            .delay(100, TimeUnit.MILLISECONDS, Schedulers.computation())
                                            .doOnNext(x -> println(Thread.currentThread().getName() + " - after delay - " + x))
                                            .flatMap(Flowable::fromIterable)
                            ));
        }

    Output:

    pool-1-thread-1 - group: a - stats - [aa] - count 1
    pool-1-thread-1 - before delay - [aa]
    pool-1-thread-1 - group: b - stats - [ba] - count 1
    pool-1-thread-1 - before delay - [ba]
    pool-1-thread-1 - group: a - stats - [ab] - count 2
    pool-1-thread-1 - group: b - stats - [bb] - count 2
    pool-1-thread-1 - end emit
    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    RxComputationThreadPool-1 - after delay - [aa]
    RxComputationThreadPool-2 - after delay - [ba]
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3
    RxComputationThreadPool-1 - before delay - [ab]
    RxComputationThreadPool-2 - before delay - [bb]
    RxComputationThreadPool-4 - after delay - [bb]
    RxComputationThreadPool-3 - after delay - [ab]
    RxComputationThreadPool-3 - before delay - [ac]
    RxComputationThreadPool-5 - after delay - [ac]

    Result is that

    pool-1-thread-1 - group: a - stats - complete - count 2
    pool-1-thread-1 - group: b - stats - complete - count 2
    ...
    RxComputationThreadPool-1 - group: a - stats - [ac] - count 3

    Why stats - complete is before stats - [ac] - count 3? Am I break rule saying "emit event synchronously, not concurrently"
    When I remove publish from bufferAndDelay method order is correct.