by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    slisaasquatch
    @slisaasquatch
    For executing in parallel, since your method is blocking, you'll probably have to do something like this
    Flux.range(0, 10)
    .parallel(4)
    .runOn(scheduler)
    .map(x -> expensiveGeneration())
    .sequential();
    If expensiveGeneration is not blocking, i.e. it returns a Publisher, then you can do this:
    Flux.range(0, 10)
    .flatMap(x -> expensiveGeneration(), 4);
    Andrey
    @404-

    in the process of evaluating RxJava, i've run into strange behavior while attempting recursion. first, i noticed that RxJava is not stacksafe (please correct me if i'm wrong). second, while attempting to get around the stackoverflow issues, i've gotten into a situation where a very simple program seemingly hangs. here's the bit of code (in Kotlin) i'm working with:

    package demos
    
    import io.reactivex.rxjava3.core.Flowable
    import io.reactivex.rxjava3.core.Single
    import io.reactivex.rxjava3.schedulers.Schedulers
    import java.util.concurrent.TimeUnit.SECONDS
    
    fun main() {
        fun incr(n: Int): Single<Int> = Single.just(n + 1)
    
        fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
            if (n < max)
                incr(n).observeOn(Schedulers.single()).toFlowable().concatMap { next -> numbers(next, max) }
            else
                Flowable.empty()
        )
    
        numbers(1, 1_000_000).sample(5, SECONDS).blockingForEach(::println)
    }

    without the call to .observeOn(Schedulers.single()), the stack explodes... with it, the first few sampled numbers get printed and then... just... nothing, e.g.:

    14737
    19765
    23398
    23802

    would really appreciate any help/advice.

    Aman Bansal
    @iamanbansal
    java.lang.NoSuchMethodError: No static method trampoline()
    Getting this error in android unit testing
    Mike Morden
    @mikemorden_gitlab
    Hello all. I would like to ask a question about RxJava. I am using Retrofit to fetch a bunch of data from backend. Also, there is a pagination, it's fetches until the next page id returns null from the backend. So, how can I collect all the received response into an array? Can someone give me an example about it?
    Davide Pugliese
    @Deviad
    Hello, does anyone know how you can Mono-fy a block of code as you can do with Promise?
    Ex.
    new Promise((resolve, reject)=>{someFunctionCall(); resolve(); })
    How is this accomplished with Rx Java?
    Basically I need to send a done signal after a sync method has been executed.
    MMM, thinking about it I could just send back a Mono.empty();
    Let’s try.
    Davide Pugliese
    @Deviad
    Why is this wrong:
     public String save(UserRequestDTO requestDTO) {
            UserInfoEntity userInfoEntity = mapper.mapToUserInfoEntity(requestDTO);
            return repository.save(userInfoEntity)
                .flatMap((x) -> getUser(requestDTO.getUsername()).flatMap(y -> Mono.just(getUrl(y.getUsername())))).block();
    
        }
    ?
    Anas Altair
    @anastr
    @Deviad you need to use somthing like fromcallback(), take a look at this linck:
    ggehlot
    @ggehlot
    Has anyone been able to use HystrixObservableCommand with rxjava3?
    Sergey Busygin
    @sbusygin
    Hello
    Where I can find job java junior?
    Mark Raynsford
    @io7m
    'ello. i realize that the order in which subscribers to an observable will be called is not defined, but i'm guessing it has a somewhat deterministic order based on the internal data structures used. i'm curious: is there an easy way to randomize the order in which subscribers are called for a given event? i want to be sure that my application is not dependent on
    subscription delivery order
    consider it similar to the way that some hashmap implementations randomize iterators so that programs don't become dependent on the order of elements when iterating
    Anas Altair
    @anastr
    @io7m you can use sorted((x,y) -> Integer.compare(random.nexInt(2), random.nextInt(2)))
    Mark Raynsford
    @io7m
    @anastr: i think you misinterpreted my question. Observable.sorted() will cause the items emitted by the observable to be emitted in a sorted order (with a custom comparator in your case to randomize the order). what i was referring to wasn't about changing the order of events, but of randomizing the order in which subscribers are called for each emitted event
    for example, if you take a look at BehaviorSubject, the onNext method iterates over the array of subscribers and calls onNext on each of them. that array is, as far as i can tell, the list of observers in subscription order. that means that my program's behaviour could implicitly become dependent on subscribers having subscribed in a particular order, and
    therefore reacting to emitted events in a given order. if that array was randomized, it'd be much harder to get into that situation
    Mark Raynsford
    @io7m
    some implementations of HashMap on the jvm randomize the entry set before passing it to an iterator, so that programs can't accidentally become dependent on the iteration order of maps (because it's random every time)
    Anas Altair
    @anastr
    Will, fare enough @io7m , you can make random delay on each observer or run them together in parallel way...
    h4d
    @fairchild88
    hello
    Piojagcodes
    @piojagcodes
    same here what is fastest way to get java junior job? even as a remote developer.. i finished 1 course filled with basics
    Sergey Busygin
    @sbusygin
    Hello
    Anybody can help find job java junior job?
    Pietro Galassi
    @pietrogalassi
    Hi all. What happens if i call an api exposed with Flux (so no blocking) and while it performs the action the pod of the microservices of this api crashes ?
    will i loose response?
    how to recover?
    bage
    @allfuof
    good morning
    Yaz
    @yazalulloa_twitter
    Hi, If I posted an inquiry on stackoverflow, if anyone have any idea other than the solution I came up with it would be great https://stackoverflow.com/questions/62963794/rxjava-valve-use-case
    mahyar_shariati
    @MahyarShariati_twitter
    Hi guys
    i have a specific question from you developers
    on scale of 1 to 5 (low to high) how do you rate your "teamWork" skill?
    DB
    @dineshbhagat

    Hi All,
    I am using io.reactivex.rxjava2:rxjava:2.2.19 with Java-11 Ref: ReactiveX/RxJava#6701, but when I have observed Thread dump, Following is the content

    "RxCachedThreadScheduler-512" #1715 daemon prio=5 os_prio=0 cpu=14871.41ms elapsed=3810.70s tid=0x00007fdfc8051800 nid=0x5b37 waiting on condition  [0x00007fdefa1f6000]
       java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park(java.base@11.0.6/Native Method)
        - parking to wait for  <0x000000068f374f98> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(java.base@11.0.6/LockSupport.java:194)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.6/AbstractQueuedSynchronizer.java:2081)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(java.base@11.0.6/ScheduledThreadPoolExecutor.java:1170)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(java.base@11.0.6/ScheduledThreadPoolExecutor.java:899)
        at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.6/ThreadPoolExecutor.java:1054)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.6/ThreadPoolExecutor.java:1114)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.6/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(java.base@11.0.6/Thread.java:834)

    Reason to check stacktrace is, I have around 1k threads are in waiting state and it happens only in some of the cases and its difficult to reproduce.
    Please suggest if I am missing anything.
    P.S. Upgraded RxJava1 to RxJava2 and replaced Observables with Flowable with backpressure.Buffer strategy.

    Chris Eldon
    @eldon97
    hi, i have a simple question about reactivex. according to the website reactivex.io reactivex is called as an API? why it is called an API? i think it doesn't quite fit with the concept of Application programming interface..
    Mark Elston
    @melston
    @eldon97, from the Oxford dictionary an API is defined as "a set of functions and procedures allowing the creation of applications that access the features or data of an operating system, application, or other service." I think RX falls under the "other service" category.
    Volnei Munhoz
    @volnei_gitlab

    Hi, I have the follow situation and need some help here.

    A method that find if the payment exists, and if it exists I need some processes that returns a Single. But this does not works since maybe.flatMap requires a MaybeSource. There are any other pattern to do this?

    In the end I need return the payment (and need that the flow be executed as well).

    public Maybe<Payment>  process(String id) { 
       return getPayment(id).flatMap(payment -> {
           // do something that returns a Single source
       });
    }
    Mark Elston
    @melston
    @volnei_gitlab , I'm not understanding your question (or maybe the context). What does getPayment return and what is the lambda supposed to be doing?
    Volnei Munhoz
    @volnei_gitlab

    Hi @melston, thanks for your reply. I will try to explain...

    I have a database with a payment entities that represents the payment requests. When I call the process(String id), method it will find a Payment on database (that could not be found, so getPayment(id) is a Maybe. Right?

    Ok, if the payment exists on database, I need to process this payment on a gateway (some actions that process this payment on a payment gateway). This actions are network calls to register this payment to a gateway. After all, I need that this method process returns the current (maybe) payment.

    All network calls on the gateway return a Single<?> response.

    Volnei Munhoz
    @volnei_gitlab

    I more elaborated code to try to explain it better. I trying to chain some invocations that depends on a first invocation (getPayment)

    public Maybe<Payment> getPayment(String id) {
         // lookup a payment in database and return it as a maybe
    }
    
    public Single<?> registerPayment(Payment payment) {
        // register this payment on a payment gateway.
    }
    
    // This method contains a error cause the flatMap only accetps a MaybeSource and registerPayment returns a Single.
    public Maybe<Payment>  process(String id) { 
       return getPayment(id).flatMap(payment -> {
           return registerPayment(payment);
       });
    }

    Remembering that at the end, I need to return the Maybe<Payment>.

    Dan O'Reilly
    @dano
    @volnei_gitlab Can you juse ust flatMapSingleElement instead of flatMap?
    Volnei Munhoz
    @volnei_gitlab
    @dano, yes, I can, but since I use flatMapSingleElement and this solves part of the problem, how to return the getPayment(id) result on process method, since flatMapSingleElement chains the call and return the registerPayment response?
    Dan O'Reilly
    @dano
    .flatMapSingleElement(payment -> registerPayment(payment)
        .map(ign -> payment)
    );
    Volnei Munhoz
    @volnei_gitlab
    Thanks @dano, that is exactly I need!! =D
    1 reply
    Jaroslav Sagan
    @SaganJaro_twitter

    Hi all, I would like to ask you for an advice about retryWhen() operator. Here is an example code:

    fun testRetry() {
        val publisher = PublishSubject.create<ChangedEntity>()
    
        val onNext: (ChangedEntity) -> Unit = { println("onNext") }
        val onError: (Throwable) -> Unit = { println("onError") }
    
        publisher
            .map {
                if (Random().nextInt(10) % 3 == 0) {
                    println("Throwing error")
                    throw RuntimeException(TimeoutException())
                }
                else {
                    println(it)
                    it
                }
            }
            .retryWhen { errors ->
                errors.flatMap {
                    if (it.cause is TimeoutException) {
                        println(">>>>>>>>>>>>>>>>> TimeoutException, retry")
                        Observable.just(null)
                    } else {
                        Observable.error<Throwable>(it)
                    }
                }
            }
            .subscribe(onNext, onError)
    
        publisher.onNext(createChangedEntity())
        publisher.onNext(createChangedEntity())
        publisher.onNext(createChangedEntity())
        publisher.onNext(createChangedEntity())
        publisher.onNext(createChangedEntity())
        publisher.onNext(createChangedEntity())
        publisher.onNext(createChangedEntity())
        publisher.onNext(createChangedEntity())
    }

    Basically map() operator can have a code which can trow an exception (contains a call to another service) and I would like to always retry it for certain exceptions, but I can't figure out how to use it. In this example the line with the TimeoutException will be printed out but the executions is stopped, there is no re-subscription.

    Any idea what I'm doing wrong?

    and we are using rxJava 2.2.2
    Jaroslav Sagan
    @SaganJaro_twitter
    I have managed to get the desired functionality with .retry { t: Throwable -> t.cause is TimeoutException } but I still do not understand why retryWhen() is not working.
    Fiorella Zampetti
    @fzampetti_gitlab
    Hi there, we are conducting a survey about why refactoring-related contributions to software projects get rejected.
    If you have any related experience, we highly appreciate your input!
    Thanks in advance and the survey is available at https://usi.eu.qualtrics.com/jfe/form/SV_cO6Ayah0D6q4eSF