Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ivan Schütz
    @i-schuetz
    ok, in the meantime I got an answer... ReactiveX/RxJava#5510
    Namit Gupta
    @guptanamit_twitter
    Hello folks, I am new to reactive programming. can anyone suggest good online resources to get started on this topic.
    Jonathan Robson
    @robsonj
    Apologies if this is stupid question. As a C# dev I'm used to being able to return a disposable when creating an Observable.Create(). What is the equivalent in Java please?
    David Karnok
    @akarnokd
    @robsonj It depends on which RxJava version you are using. Generally, we stopped returning those because cancellation doesn't compose in a synchronous case.
    Jonathan Robson
    @robsonj
    @akarnokd Thanks. I think I got it. Found I was able to set the disposable (setDisposable()) on the emitter.
    Yannick Lecaillez
    @ylecaillez
    Hello guys !
    While working on a custom operator, i was wondering about the correct behavior to implement when my operator deals with a Subscriber throwing an exception on onNext().
    Reactive-streams 2#13 (https://github.com/reactive-streams/reactive-streams-jvm#2.13) says that the caller "MUST raise this error condition in a fashion that is adequate for the runtime environment."
    Reading at RxJava code, looks like onNext() invocations are never (? i admit i didn't check all operators) surrounded by try/catch.
    So i wondered if it is something planned to be included in a future RxJava or if you guys actually had a reason to not do so ?
    David Karnok
    @akarnokd
    There is no need to wrap onNext calls into try-catches. In fact, it is discouraged in RxJava 2 because it can misdirect exceptions from bugs. If you have something that can throw inside your onNext, wrap that into try-catch and don't let it escape. Btw, are you learning RxJava 2?
    The Function can throw so it is wrapped and managed. onNext should not throw as the specification mandates thus is not wrapped.
    Yannick Lecaillez
    @ylecaillez
    Yes i'm on RxJava 2. So this reactive-streams rule is not applied because it can misdirect exceptions from bugs ?
    In my operator i wrap subscriber.onNext() between try/Catch and invoke RxJavaPlugins.onError(t); if onNext() throwed an exception.
    Sure onNext() should never throws but as stated by the rule 2.13 we should also report error when this happens:
    "Calling onSubscribe, onNext, onError or onComplete MUST return normally [...] In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment."
    David Karnok
    @akarnokd
    The rule says onNext should return normally. If it throws, that's something the concrete implementation has to handle in some way. RxJava 2 doesn't capture such exceptions which then bubble in the current thread, eventually hitting an uncaught exception handler. RxJava 2 trusts the operator implementations, why don't you?
    Yannick Lecaillez
    @ylecaillez
    I trust operator implementations, i'm more concerned by 3rd party/user's subscribers. i.e: flowable.map(..).filter(..).subscribe(t -> throw IllegalStateException());
    But as you said this error will eventually hit some exception handler. This is what i've missed.
    David Karnok
    @akarnokd
    Lambdas are not trusted, that subscribe call will end up in the global error handler anyway. Also there is the safeSubscribe() method that guards against the very nasty consumers.
    Yannick Lecaillez
    @ylecaillez
    Got it ! Thank you for your time.
    Yusuf Celik
    @YusufCelik
    Is it a good design pattern to pass a callback into an argument of a function, so that class A can provide its own callback implementation to the imported class B whose function it utilises and passes its callback to?
    David Karnok
    @akarnokd
    @YusufCelik not sure how that relates to RxJava. We take functions to customize behavior in the flows.
    Stephen Berger
    @GettingNifty
    @YusufCelik that's called instantiation and yes, for instance in graphics programming you want to reuse the same piece of code rather than rewrite it
    So 100 trees are based off one tree model etc. One instance of memory
    Stephen Berger
    @GettingNifty
    That's what they were talking about and it uses instantiation but also references like : rather than pointers in other C Lang's
    brandhuge
    @brandhuge
    hi
    Pls I need help on self service android app
    jay vyas
    @jayunit100
    Whats the simplest way to add backpressure into a .connect() call ? Right now, I simply do a 'Thread.sleep()' if the last connect action took more then X MS. I'd like to do this based on rate, i.e. "if the rate of actions drops below X/sec, sleep". Because some actions are CPU hungry, if an action takes too long to run, it likely means it was using a large amount of CPU for a large amount of time.
    David Karnok
    @akarnokd
    @jayunit100 connect is associated with making a cold sequence hot and disconnection happens when the source completes or gets unsubscribed. How does your flow look like?
    Stephen Berger
    @GettingNifty
    @s-durovich Use a load bar or timer that may help u get your mind around it
    It's called multithreading
    mreddimasi
    @mreddimasi
    Hi, I'm new to RxJava request help from the community on a scenario. I have SpringMVC setup, the RestController receives an object like "Person" with properties like FirstName, LastName..etc. The RestController needs to send back some information based on the object(Person) it receives and also in parallel make entry to the DB that this particular query about object(Person) was received. Now return from the RestController does not have to wait for DB entry to be complete, these are 2 parallel operations. Is RxJava suitable for such a scenario? If yes, how could it be used?
    Rafat J. Al-Barouki
    @rjeeb
    Observable<Person> personObservable = Observable.just(new Person()).subscribeOn(Schedulers.io());
            personObservable.subscribe(person -> {
                // gets the data and return it by the controller
            });
            personObservable.subscribe(person -> {
                // add the object to the database
            });
    try to do this
    this will execute the two operations in two threads
    so they will be executed in parallel and with no waiting
    and if your method returns a value then you can do this
    Person person = new Person();
            Observable<Person> personObservable = Observable.just(person).subscribeOn(Schedulers.io());
            personObservable.subscribe(p -> {
                // add the object to the database
            });
            // gets the data and return it
    mreddimasi
    @mreddimasi
    @rafatbaroukii Thank you so much, the second approach seems to be fitting my needs will try that. Thanks again
    Keith Hoopes
    @Pytry
    What are some good excercises or games you could play while coding to help you get into the reactive mindset and style of development, but without actually using RxJava?
    Rajan Maurya
    @therajanmaurya
    Hi, I am doing my GSoC project https://github.com/therajanmaurya/android-client-2.0 in which I am using RxJava 2 and handling rest API calls. Initially user logged in and get a access token, I don’t know when this access token will expire so I want to implement a logic that will call, whenever I will get 401 error, that should fire a request to server to refresh the access token and try the current request again. I don’t want check 401 in every request response. I want to implement this in such a way that i don’t need to check 401 reponse code in every rest api call, if any request code is 401 then it will automically make another request to refresh the access token and try the current request again. Please help. It is last feature that need to implement and the most important thing is, I really want to learn how can I do this using RxJava.
    Rajan Maurya
    @therajanmaurya
    Can anyone give me example there these type of logic implemented or something please.
    David Karnok
    @akarnokd
    @therajanmaurya You may have better luck on StackOverflow.
    Rajan Maurya
    @therajanmaurya
    Ronen
    @ronenhamias
    is there example somewhere how to bridge between rx and netty?
    Jacob Kristhammar
    @jacobk
    Any suggestions on how to do the following without relying on errors for control flow?
            Single<RbmAccessToken> accessToken = accessTokens.get(agent);
    
            return Single.just(accessToken)
                .flatMap(t -> t == null
                        ? Single.error(new Throwable("no token"))
                        : t)
                .flatMap(t -> Instant.now().plusSeconds(60).isAfter(t.expiresAt())
                        ? Single.error(new Throwable("token expired"))
                        : Single.just(t))
                .onErrorResumeNext(t -> {
                    Single<RbmAccessToken> newAccessToken = getAccessToken(agent);
                    accessTokens.put(agent, newAccessToken);
                    return newAccessToken;
                });
    hehe, also, the above doesn't work :D
    Artem Rudometkin
    @perfectplayer
    @jacobk, also thought about it. Looks like there is some overRx. Maybe it's better to break up this logic into more pieces.
    Jacob Kristhammar
    @jacobk

    I had the following before:

    if (accessToken != null && accessToken.blockingGet().expiresAt().isAfter(Instant.now().plusSeconds(60))) {
        return accessToken;
    }
    
    Single<RbmAccessToken> newAccessToken = getAccessToken(agent);
    accessTokens.put(agent, newAccessToken);
    return newAccessToken;

    and was trying to get rid of the blocking get

    For completness here's the current version that actually works, didn't know about null not usable as values
    Single<RbmAccessToken> accessToken = accessTokens.get(agent);
    
    return Single.just(Optional.ofNullable(accessToken))
      .flatMap(t -> t.orElse(Single.error(new Throwable("no token"))))
      .flatMap(t -> t.isExpired()
          ? Single.error(new Throwable("token expired"))
          : Single.just(t))
      .onErrorResumeNext(t -> {
        Single<RbmAccessToken> newAccessToken = generateAccessToken(agent);
        accessTokens.put(agent, newAccessToken);
        return newAccessToken;
      });
    Jacob Kristhammar
    @jacobk
    I feel somwhat ok with using error for the expire branch, but the initial null-check feels really contrieved
    Artem Rudometkin
    @perfectplayer
    From the docs: > Note that if you pass null to Just, it will return an Observable that emits null as an item. Do not make the mistake of assuming that this will return an empty Observable (one that emits no items at all). For that, you will need the Empty operator.
    Jacob Kristhammar
    @jacobk
    Doesn't seem to be true for javarx2 it errors out on null values