Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Alex Reisberg
    @a-reisberg
    I guess I'm trying to move from Rx 1 and haven't fully understood exactly what the difference is
    also my question was unclear. On a Subject you can do a onNext
    (which is essentially publishing to other Observers)
    and I wanted to hide the onNext
    I didn't know that Observables are in the process of being deprecated?
    Justin Tuchek
    @justintuchek
    I highly doubt Observable is on any path to being deprecated. It’s just different from Flowable, where it wouldn’t make sense to discard events (such of cursor movements).
    Serban Balamaci
    @balamaci
    I'd say not deprecated, but not much different than Flowable as I do not see reasons where I'd want to use Observables instead of Flowables(which have the benefit that you are aware of backpressure and what the backpressure strategy is and you won't be surprised at runtime by a MissingBackpressuException)
    Justin Tuchek
    @justintuchek
    From reading the Observable source code from 2.x.x it seems to be utilizing an unbounded amount for input events - and from the docs of 2.0 it seems like you are going to hit an OOM instead.
    Serban Balamaci
    @balamaci
    I think so, then it would be equivalent to a Flowable if we'd do a .toFlowable(BackpressureStrategy.BUFFER)
    which is also unbounded
    but at least you know upfront you might expect an OOM
    Justin Tuchek
    @justintuchek
    The @BackPressureSupport(…) and @SchedulerSupport(…) annotations are really helpful when going through the code :thumbsup:
    Serban Balamaci
    @balamaci
    true
    David Karnok
    @akarnokd
    @a-reisberg A Subject is already an Observable so you don't have to do anything. asObservable has been renamed to hide in 2.x and it adds an extra layer so casting back to Subject no longer works. Just define your code as Observable<T> someMethod() { return subject; ).
    Alex Reisberg
    @a-reisberg
    @akarnokd I did end up using hide. The reason is that I want to use onNext internally and only allow others to subscribe. Thanks a lot!
    AriMeidan
    @MeidanAri_twitter
    Hi, I have question.
    I Have this scenario - I want to make a remote call and get results, I want to do something with the data, and then notify to subscribers that this call has completed.
    I dont want that every subscriber will be able to make this remote call, So i want someone to request the call and other to subscribe to its completion.
    David Karnok
    @akarnokd
    @MeidanAri_twitter use .cache()
    Luciano
    @lJoublanc
    Got a ~10k line project , and just received MissingBackPressureException which isn't picked up by my unit tests. How do I debug this?
    Caused by: rx.exceptions.MissingBackpressureException
            at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:353)
            at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:319)
            at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
            at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
            at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
            at rx.internal.operators.OperatorSampleWithTime$SamplerSubscriber.emitIfNonEmpty(OperatorSampleWithTime.jav
            at rx.internal.operators.OperatorSampleWithTime$SamplerSubscriber.call(OperatorSampleWithTime.java:98)
    DavidMihola
    @DavidMihola
    Hi, in RxJava(2), is there something like Guava's Optional.fromNullable(T)- I would need this quite often to wrap null values in Maybe.empty() and all others in Maybe.just()...
    Luciano
    @lJoublanc
    In scala you can do Observable from Option(possiblyNullValue) as Option implements Iterable. Not sure about java though.
    Alex Reisberg
    @a-reisberg
    This is an rxandroid question but since there's no dedicated gitter channel. I got the following proguard error:
    Warning: org.jctools.maps.ConcurrentAutoTable$CAT: can't find referenced class sun.misc.Unsafe
    is it safe to issue a dontwarn for it?
    DavidMihola
    @DavidMihola
    @lJoublanc Thanks - however, I can't think of anything like this in Java/on Android.
    DavidMihola
    @DavidMihola
    Also, while I'm here: In RxJava1 I would occasionally use a Subject as the Subscriber to an Observable, i. e. "pipe" the output of the Observable into the Subject. I can't find a way to do this in RxJava2 - subscribe(Observer) does accept the Subject, but does not return a Disposable, and since Subject is not a DisposableObserver, I can't use subscribeWith either... Should I just wrap my Subject in a DisposableObserver before subscribing or is there a simpler way I am missing?
    Ronen
    @ronenhamias
    is there a way to call
    ```
    PublishSubject.<Message>create().toSerialized()
    that it will sync call ?
    will be executed on the same calling thread without buffering or context switching?
    so onNext it will be sync call
    incomingMessagesSubject.onNext(message);
    Artem Gilmudinov
    @guliash
    Hi
    How to specify Observable.using disposeAction scheduler?
    Artem Gilmudinov
    @guliash
    @xgrommx Can't find answer to my question in the article.
    Monkey
    @Even201314
            Observable.combineLatest(observable1, observable2,
                observable3, observable4, observable5,
                new Function5<>())

    When one or sevral observables return code = 404,I throw exception

     return Observable.error(
                        new RxApiException(tHttpResult.getCode(), tHttpResult.getMessage()));

    And I will get

    FATAL EXCEPTION : RxCachedThreadScheduler-3

    How to solve this problem?

    Dorus
    @Dorus
    @Even201314 If one of the sources pushes an error(), that sequence ends, and it makes no longer sense to push anything from the resulting combineLatest stream. Therefore it'll just forwards the error and completes the stream (error() itself is a completion message). Im not sure why it's showing "FATAL EXCEPTION", possibly you're not handeling errors correctly in yous subscribe?
    Ola Theander
    @olatheander
    Hi guys, I'm just starting to look into the reactive concept i.e. looking at RxJava, Project Reactor, Spring Framework 5 reactive stuff etc. and I'm a bit curious of the low-level stuff. I.e. if you have a reactive web service of some kind, what happens when you have a high load with respect to incoming requests? Having only a limited number of threads, i.e. not the conventional model of one thread per request, I realize that in theory if you do not block, you can service a lot of requests using a small number of threads but how does it work in practice? I mean at any given point in time the code/CPU can only do a limited amount of concurrent processing but what happens to the requests? Do they queue up somehow until serviced? What I'm looking for is an article or a paper explaining the reactive paradigm at this low level and would be grateful for any such recommendations.
    Connor Lanigan
    @connorlanigan
    I'm trying to create an Observable where I can dynamically push new values. For this, I'm using PublishSubject.create<T>() so far, and it works well. However, now I want to push a value upon each subscription. I found PublishSubject.create(Observable.OnSubscribe<T> f), which allows me to specify an OnSubscribe-function. But this one returns an Observable instead of a PublishSubject, so I can't use onNext anymore. How can I go about this?
    Daniel Feist
    @dfeist
    @olatheander for material regarding that i'd look at documentation/presentations on netty or the reactor pattern in general..
    you can't implement the reactor pattern with a reactive framework sure, but the question you are asking is more related to that pattern than any framework
    David Karnok
    @akarnokd
    @connorlanigan see BehaviorSubject
    Connor Lanigan
    @connorlanigan
    @akarnokd That's what I need, thank you!
    Ola Theander
    @olatheander
    @dfeist Thanks for the reply. It's true that it's general and not tied to a specific framework and I realize that I can dig through the source of any such framework to find out how it's implemented behind the scene but I figure that there might be a paper or similar discussing the implementation principles which is more easily digested. Most presentations and documentation seems emphasize that it's different from the one-thread-per-request model but very little about how it's actually works behind the scene (or I have just been looking at the wrong places :-)).
    Daniel Feist
    @dfeist
    @olatheander did you Google "reactor pattern paper"?
    Ola Theander
    @olatheander
    @dfeist Not used "reactor" which gave a nice first hit but other variants like "reactive". Many thanks (and feeling a bit stupid :-)).
    Daniel Feist
    @dfeist
    With http inbound the connections are typically/optionally accepted immediately in selector (depending on framework) , then read/write events go on event loop. If system is overloaded connections stay open (until timeout) but aren't handled.. So they queue up as open/unhandled requests whch of course dont use threads.
    Ashok Raju
    @rajuashok

    Hi there, I'm trying to figure out how to chain to observables together. I have an existing method: public static Observable<Data> getData(). In my other class I have this existing code:

    doSomeBackgroundWork()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<..>() { ... })

    I'd like to now chain the getData() call to this call. How would I do this? I tried This initially:

    doSomeBackgroundWork()
    .flatMap(s -> call() {
       mApi.getData()
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<..>() { ... })

    But this doesn't work, because the getData() code is actually executed on the main thread.

    I need it to execute on the background thread just like the doSomeBackgroundWork() method.
    Ashok Raju
    @rajuashok

    This ended up working for me:

    doSomeBackgroundWork()
    .flatMap(s -> call() {
       return Observable.just(Data.empty());
    }
    .concatWith(mApi.getData().onErrorReturn(...))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<..>() { ... })

    There has to be a cleaner way of doing this right?

    ViTORossonero
    @ViTORossonero

    @rajuashok what does call() do?

    If you ignore result of doSomeBackgroundWork() you can use Completable instead:

    Completable doSomeBackgroundWork() {
    ...
    }
    
    
    doSomeBackgroundWork()
    .andThen(mApi.getData()
                 .onErrorReturn(...))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<..>() { ... })