Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    it seems like in RxJava1 the subscription to the original value in line 284 never returns an exception, only the one in 266, whereas in RxJava2 it fails for 284 immediately
    book.write(key, wrongValue).test().assertError(ClassCastException.class)
    that just works
    Paco
    @pakoito
    which is weird, because write is subscribed with the correct type, and it's only the forwarding that triggers the exception
    RxJava 2 propagates from here into write, RxJava 1 does it into the updates subject: https://github.com/pakoito/RxPaper/blob/master/library/src/main/java/com/pacoworks/rxpaper/RxPaperBook.java#L151
    and moving the subject call to onNext to a doAfterTerminate doesn't fix it
    oh well, behaviour changes then
    Paco
    @pakoito
    public <T> Completable write(final String key, final T value) {
            return Completable.fromAction(new Action() {
                @Override
                public void run() {
                    book.write(key, value);
                }
            }).andThen(Completable.fromAction(new Action() {
                @Override
                public void run() throws Exception {
                    try {
                        updates.onNext(Pair.create(key, value));
                    } catch (ClassCastException t) {
                        updates.onError(t);
                    }
                }
            })).subscribeOn(scheduler);
        }
    that reproduces the old behaviour
    Igor Trncic
    @igortrncic
    Hi, I'm curios how do you organize code? What is criteria for creating classes that are providing getObservable kind of methods? Or is there some nice open source projects that are handling this in a nice way?
    David Karnok
    @akarnokd
    @pakoito What you experienced sounds like a resource management issue. In 2.x, the Disposable.dispose is not called from downstream on an onComplete() unlike in 1.x where the SafeSubscriber always calls unsubscribe. The standard way of dealing with resources along a stream is using() and performing final action via doFinally.
    Vojtech Polivka
    @vojtapol
    hi guys

    I need to convert some callbacks to ReactiveX elements (Observable, Completable, Subject), but I am not sure what to pick. The problem is that the callback provides no value so it should be Completable but it is triggered multiple times so it should be Observable. What to choose? the normal use is this:

    socket.on("connect", args -> doSomething())

    where I know that args are always empty array.

    Vojtech Polivka
    @vojtapol
    would making Observable<java.lang.Void> be a good idea in this case?
    David Karnok
    @akarnokd
    A version-safe would be Observable<Object> and you ignore the value in the processing.
    Vojtech Polivka
    @vojtapol
    what do you mean by version-safe?
    David Karnok
    @akarnokd
    Void requires null values which are forbidden in 2.x; you'd save a lot of headache with Object such as (Integer)1
    Vojtech Polivka
    @vojtapol
    oh.. snap!

    so that's really the best way to deal with this?

    thats too bad.. I think there should be another top-level class (such as Subject, Completable) that would solve this more elegantly

    Vojtech Polivka
    @vojtapol
    hi
    is there a subsrciber class without "onCompleted()"? i am dealing with infinite streams here
    David Karnok
    @akarnokd
    Just implement onCompleted() as an empty method.
    Exerosis
    @Exerosis
    This might be stupid, but how can I make an Observable that adds new subs to a list so that if it's currently processing it doesn't try to reprocess after the original process is over.
    Timur Isachenko
    @isatimur
    Flowable and Observable what is the differencies?
    Abbi
    @wasabbi
    )
    Alex Krause
    @alex0ptr

    Hi, we are having problems getting our head around a backpressure problem. We want to pull files in parallel from a remote and slow location in order to analyze them in the next step with a computation bound analyze step, which should also be done in parallel. Unfortunately the analyzing step is slower than getting the files, which leads to our disk running out of memory. Therefore we need to adjust backpressure settings to slow down the producer of file downloads?
    Our code looks something like this:

    Flowable.fromIterable(fileListing)
      .flatMap(file ->
        Single.fromCallable(file -> getFileFromRemoteLocation(file))
          .subscribeOn(Schedulers.io())
      )
      .map() // backpressure needed here? e.g. after 10 files stop getting more
      .flatMap(localFile -> 
        Single.fromCallable(file -> analyzeFile(file))
          .subscribeOn(Schedulers.computation())
      )

    How do we communicate appropriate backpressure in this case?

    David Karnok
    @akarnokd
    @alex0ptr You can adjust the concurrency level of flatMap:
    Flowable.fromIterable(files)
    .flatMap(file -> Flowable.fromCallable(() -> getFileFromRemoteLocation(file))
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.computation())
        .map(f -> analyzeFile(f))
    , 10)
    ...
    Alex Krause
    @alex0ptr
    @akarnokd That is surprising? Is there no way I can tell the first flatmap that it should not produce too many elements for the second?
    David Karnok
    @akarnokd
    You have to set the concurrency level of the second flatMap as well, but you can just combine the two into a single one.
    Alex Krause
    @alex0ptr
    Yeah they are however in two different classes for seperation of concerns.
    So if I set both flatMaps to just 10 it woul work too? @akarnokd
    David Karnok
    @akarnokd
    Yes.
    Alex Krause
    @alex0ptr
    Awesome. Thanks @akarnokd
    Nikolai Senchurin
    @try2bajed1
    hi, i'm intedested in how can i pause the chain of observables by alertDialog ? and then after ok btn i should continue paused chain. I began to study rx not long ago and every info will be interesting for me, thnx
    Nikolai Senchurin
    @try2bajed1
    Try to explain more: every observable i get by Observable.fromCallable( ndk code for external hardware deivce ), so i want to process again failed step after user confirmed by clicking "try again ".
    Daniel Tabuenca
    @dtabuenc
    Is there anything like CombineLatest operator that deals with more than just two observables? Like I'd like to have say 10 observables and have a method called with an array of 10 items.
    Luciano
    @lJoublanc
    @dtabuenc Think I asked a similar question on SO, but nobody's answered ... http://stackoverflow.com/questions/41137708/efficiently-sampling-large-number-of-rx-observables-together
    VolodymyrBaisa
    @VolodymyrBaisa
    Здравствуйте, retrofit 2, rxJava, mvvm databinding. Получаю данные с retrofit2 в onNext как подписать или перебросить красиво в viewModel? Значений больше 10. Пример привести немогу еду в метро.
    VolodymyrBaisa
    @VolodymyrBaisa
    Hello, I used retrofit 2, rxJava, mvvm DataBinding. So I get data from retrofit 2 in overated method onNext an after then I get data I wont sent this in viewModel. How did this nice?
    Exerosis
    @Exerosis
    if I had an Observable o = source.replay(1, TimeUnit.MINUTES).autoConnect();
    then immediately called o.subscribe(...); then two minutes later called o.subscribe(...); again. Would source.subscribe be called a second time or have I completely misunderstood the replay operator?
    Zak Taccardi
    @ZakTaccardi
    the source is subscribed once
    and is then multicasted
    you can easily test that behavior with a junit test and logging
    Exerosis
    @Exerosis
    Thank you very much!
    Zak Taccardi
    @ZakTaccardi
    anything you add to the stream after autoConnect will be executed again for each subscription
    Vladimir Tagakov
    @Tagakov

    Hi guys! I am trying to use backpressure to handle data paging. I have an observable which requests N new chunks of data from server for every request(N). I wish to fetch a new chunk of data after user clicks a button. I've tried to use this approach

    Observable.zip(dataFromServer(), userActions(), (data, ø) -> data).subscribe(showData());

    But because of RxRingBuffer with the size of 16 inside the zip operator the upper construction after subscription immediately requests 16 chunks of data from the server. This is totally inappropriate. Even if I could change the buffer size to 1 the behavior of the upper construction is inapropriate. The data would be requested before the first user action, but I want it to be first requested after the first user action. It seems like my approach is totally wrong. Can you help?

    Vladimir Tagakov
    @Tagakov
    I've managed to accomplish appropriate behavior, but it looks ugly. Maybe there is a better way?
    val userActions = Observable.interval(1000, TimeUnit.MILLISECONDS).share();//user clicks every second//
    val dataFromServer = Observable.range(1, 10);//data has 10 pages//
    
    dataFromServer
            .doOnNext(it -> System.out.println("Received from server" + it))
            .flatMap(it -> userActions.take(1).map(ø -> it), 1)
            .doOnNext(it -> System.out.println("Before data showing" + it))
            .subscribe(showData());
    Yilin Wei
    @yilinwei
    @Tagakov I think you want a Observable[Action] and then use a scan to keep track of the state (the page which you're on), then flatMap after your showData call to put it in the widen stream if you want a Observable[Data].