Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    David Whetstone
    @humblehacker
    Here’s roughly what I’m doing:
    button.clicks()
        .compose(bindToLifecycle())
        .doOnNext { something() }
        .flatMap { somethingReturingAPublishSubject() }
        .doFinally { somethingElse() }
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeBy(
            onNext = { theMainThing() },
            onError = { handleError() })
    RxJava2 on Android
    David Karnok
    @akarnokd
    Those operators require a finite sequence, but I guess clicks() never completes.
    David Whetstone
    @humblehacker
    Thanks, @akarnokd. That was it. I assumed flatMap sort of replaced the original observable with the result of somethingReturningAPublishSubject(), but I learned that it merges the two—completing only when both the original observables complete.
    David Whetstone
    @humblehacker
    I see two ways to handle this. First, use take(1) to make clicks() a finite sequence, but this requires setting up the stream again after every click.
    fun setupButton() {
        button.clicks()
          .take(1)
          .flatMap { somethingReturningAnObservable() }
          .observeOn(AndroidSchedulers.mainThread())
          .doFinally { 
            something() 
            setupButton()
          }
          .subscribeBy(
              onNext = { theMainThing() },
              onError = { handleError() }
          )
      }
    Or, chain the streams:
      fun setupButton2() {
        button.clicks()
          .subscribeBy(
              onNext = { 
                somethingReturningAnObservable()
                  .observeOn(AndroidSchedulers.mainThread())
                  .doFinally { something() }
                  .subscribeBy(
                      onNext = { theMainThing() },
                      onError = { handleError() }
                  )
              }
          )
      }
    I’m leaning toward the second approach, though in general it complicates error handling. Anyone have any thoughts on these approaches?
    Boris Maslakov
    @bmaslakov

    Greetings, RxJava community. I was wondering if there's a cleaner alternative for such:

    obj.method1()
      .flatMap { v -> obj.method2(v).map { v } };

    i.e. we get a result of an operation, start another operation, wait for the second operation to complete and discard its result.

    Option 2 is even more verbose:

    create { subscriber ->
      obj.method1()
        .subscribe { v ->
          obj.method2(v)
            .subscribe {
              subscriber.onNext(v);
            }
        }
    }

    What do you think? Is option 1 is the best we could do?

    David Karnok
    @akarnokd
    Nesting subscribe calls like option 2 is definitely discouraged.
    Yannick Lecaillez
    @ylecaillez
    Hi guys ! I've question regarding resource management:
    I've a Single<Connection> which connect to a server each time i'm subscribing to it.
    Given the following code: singleConnection.timeout(10, SECONDS).subscribe().
    If the timeout occurs, it'll forward an error and cancel() the singleConnection().
    My question: what if the connection finally succeeded after the time out triggered but just before the cancel() been invoked ?
    If nothing is done, i think this race condition will leak the connected connection .
    Now i'm wondering what is the best way to handle such case ?
    one way is to close the connection in singleConnection.cancel(), but given that the singleConnection has already emitted the connection, cancel() should be a no-op ?
    Yannick Lecaillez
    @ylecaillez
    Maybe i need some synchronization between cancel() (dispose() actually) and the onSuccess() notification
    Yannick Lecaillez
    @ylecaillez
    Actually i might have solved this issue with an AtomicBoolean: after dispose() it is guaranteed that the connection has been forwarded to the subscriber OR that it'll never be forwarded to it (disconnecting silently it if the connection happened in the mean time)
    Oleh Dokuka
    @OlegDokuka
    @ylecaillez, Actually, you may always write something like next
    Flux
                    .just("")
                    .flatMap(v -> Flux.just("Opened").delaySubscription(Duration.ofSeconds(2)).doFinally(s -> System.out.println(s)))
                    .timeout(Duration.ofSeconds(2), Flux.just("closed"))
                    .subscribe(System.out::println);
    And in that case always have a chance to cleanup your resources if flatted connection has been canceled
    This example like try with finally
    P.S. This example is written using Reactor 3, but do not worry, in RxJava 2 there is an identical API
    David Karnok
    @akarnokd
    Unfortunately, you can't solve this properly with RxJava 2 and Java 6. Either you need a resource-aware flow implementation which only exist on a drawing board or you need Java 9 and the Cleaner API to handle the falling out of scope. However, the latter can get complicated because you may not know all the references to the Connection object.
    Yannick Lecaillez
    @ylecaillez

    I finally wrote something like

            @Override
            public void dispose() {
                if (done.compareAndSet(false, true) && connection != null) {
                    // Handle the case were we've been cancelled while the connection has actually been established but not
                    // yet forwarded to the subscriber (e.g: because of a slow SSL handshake).
                    connection.closeSilently();
                }
            }

    WDYT ?

    Yannick Lecaillez
    @ylecaillez
    Well, i guess this is a bit too abstract out of its context ... :smile: Anyway, thanks !
    Thomas May
    @cogman

    Is this a bug? I would have expected this to eventually terminate but it never does

        ConnectableFlowable<Integer> publish = Flowable.just(1)
                .publish();
    
        Disposable subscribe = publish.subscribe((e)->System.out.println(e));
        publish.connect();
        publish.blockingSubscribe();

    This is the latest rxJava

    David Karnok
    @akarnokd
    You deadlock because the publish completed and is awaiting for the next connect() call. Otherwise, it is a shortcoming of the ConnectableX API we aim to fix in 3.x: ReactiveX/RxJava#5628
    Thomas May
    @cogman
    What would be the proposed behavior of this in 3.0? Would the post connect subscriber just get a onComplete/onError if the publisher is finished or would it get something else?
    David Karnok
    @akarnokd
    Please read the linked issue.
    Restmad
    @restmad
    Wow!
    Thomas May
    @cogman

    @akarnokd I guess I was misreading it.

    This paragraph I think is the one you are refering to

    In the fresh state, consumers can pile up and be ready to receive events. An atomic state change to running will begin streaming events until a terminal event is reached. Consumers subscribing in this state will always receive the terminal event, and in case of replay, the cached items as well.

    And I'm guessing the "this state" in the final sentence is referring to the terminal state and not the running state, correct?

    David Karnok
    @akarnokd
    Yes. I've updated the sentence to clear up this ambiguity.
    Matteo Moci
    @mox601
    Hi everyone! I am trying to have a Single start only when a list of other Single completes. The following code seem to work with Flowable, but I can’t find a way to translate to Single:
    <T> Flowable<T> whenAll(List<Flowable<T>> flowables) {
            return Flowable.fromIterable(flowables).flatMap(a -> a);
        }
    I am not sure that your Flowable code works as you expect, I'd use something like zip for it too
    Oleh Dokuka
    @OlegDokuka
    @mox601 In case if you need to run all Single in parallel and wait until all have been completed you should use following API
    <T> Completable whenAll(List<Single<T>> singles) {
        return Single.merge(singles).ignoreElements();
    }
    In above example, results of singles execution will be totally ignored and io.reactivex.Completable will notify you about completaion over onError or onComplete. If you are required to return exectly Single type, you may replace ignoreElements with last
    <T> Single<T> whenAll(List<Single<T>> singles, T fallback) {
        return Single.merge(singles).last(fallback);
    }
    In case if your requirements sounds like I would like to execute all my Singles one by one in order in which they were passed you should refer to next example
    <T> Flowable whenAll(List<Single<T>> singles) {
        return Single.concat(singles);
    }
    Phoenix
    @wbinarytree

    Hi is there an operator can ignore the onComplete from upstream? for example i use this

    Observable.flatmap{
        Observable.just(it)
                                .onErrorReturn{error ->ErrorState(error)}
    }

    it won't complete when a error occur

    Oleh Dokuka
    @OlegDokuka
    Unfortunatelly, as far as I know, there is no such operator out of the box, but in Reactor 3 this feature will be included in the next 3.2.x release
    Phoenix
    @wbinarytree
    Thanks ! what's that operator called then? onErrorReplace or something?
    actually it will be called as .errorStrategyContinue, but you may join the discussion and propose and additional resume operator with fallback -> reactor/reactor-core#629
    Phoenix
    @wbinarytree
    i will keep my flatMap then. Thanks . I will check it out in Project Reactor
    Oleh Dokuka
    @OlegDokuka
    :+1:
    Phoenix
    @wbinarytree

    Another question. Is there any operator is equivalent andThen for Completable in Observable? For example i use this really often

    API.login
          .doOnNext(//save the login datas)
          .ignoreElements()
          .andThen(//other observable Calls)

    I think concatWith is somehow the good one but it need downstream be the same type.

    David Karnok
    @akarnokd
    Use flatMap to continue with some other type.
    API.login
        .doOnNext(loginData -> { /* ... */ })
        .flatMap(loginData -> API.fetchFavorites(loginData.userId))
    Phoenix
    @wbinarytree
    But it's not guarantee to be called after complete no? Also For example maybe the upstream emit more than one item(instead of login). But ignoreElements().anThen() will called only once after the upstream complete.
    Oleh Dokuka
    @OlegDokuka
    use .concateMap instead. It guaranties that each call will be executed in order as they come,
    Phoenix
    @wbinarytree
    yes. Then back to the problem of the same signature in concatMap
    Oleh Dokuka
    @OlegDokuka
    so if you make a call to external API concate guaraties that the next call will not be executed until previois has been completed