Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Thomas May
    @cogman
    What would be the proposed behavior of this in 3.0? Would the post connect subscriber just get a onComplete/onError if the publisher is finished or would it get something else?
    David Karnok
    @akarnokd
    Please read the linked issue.
    Restmad
    @restmad
    Wow!
    Thomas May
    @cogman

    @akarnokd I guess I was misreading it.

    This paragraph I think is the one you are refering to

    In the fresh state, consumers can pile up and be ready to receive events. An atomic state change to running will begin streaming events until a terminal event is reached. Consumers subscribing in this state will always receive the terminal event, and in case of replay, the cached items as well.

    And I'm guessing the "this state" in the final sentence is referring to the terminal state and not the running state, correct?

    David Karnok
    @akarnokd
    Yes. I've updated the sentence to clear up this ambiguity.
    Matteo Moci
    @mox601
    Hi everyone! I am trying to have a Single start only when a list of other Single completes. The following code seem to work with Flowable, but I can’t find a way to translate to Single:
    <T> Flowable<T> whenAll(List<Flowable<T>> flowables) {
            return Flowable.fromIterable(flowables).flatMap(a -> a);
        }
    I am not sure that your Flowable code works as you expect, I'd use something like zip for it too
    Oleh Dokuka
    @OlegDokuka
    @mox601 In case if you need to run all Single in parallel and wait until all have been completed you should use following API
    <T> Completable whenAll(List<Single<T>> singles) {
        return Single.merge(singles).ignoreElements();
    }
    In above example, results of singles execution will be totally ignored and io.reactivex.Completable will notify you about completaion over onError or onComplete. If you are required to return exectly Single type, you may replace ignoreElements with last
    <T> Single<T> whenAll(List<Single<T>> singles, T fallback) {
        return Single.merge(singles).last(fallback);
    }
    In case if your requirements sounds like I would like to execute all my Singles one by one in order in which they were passed you should refer to next example
    <T> Flowable whenAll(List<Single<T>> singles) {
        return Single.concat(singles);
    }
    Phoenix
    @wbinarytree

    Hi is there an operator can ignore the onComplete from upstream? for example i use this

    Observable.flatmap{
        Observable.just(it)
                                .onErrorReturn{error ->ErrorState(error)}
    }

    it won't complete when a error occur

    Oleh Dokuka
    @OlegDokuka
    Unfortunatelly, as far as I know, there is no such operator out of the box, but in Reactor 3 this feature will be included in the next 3.2.x release
    Phoenix
    @wbinarytree
    Thanks ! what's that operator called then? onErrorReplace or something?
    actually it will be called as .errorStrategyContinue, but you may join the discussion and propose and additional resume operator with fallback -> reactor/reactor-core#629
    Phoenix
    @wbinarytree
    i will keep my flatMap then. Thanks . I will check it out in Project Reactor
    Oleh Dokuka
    @OlegDokuka
    :+1:
    Phoenix
    @wbinarytree

    Another question. Is there any operator is equivalent andThen for Completable in Observable? For example i use this really often

    API.login
          .doOnNext(//save the login datas)
          .ignoreElements()
          .andThen(//other observable Calls)

    I think concatWith is somehow the good one but it need downstream be the same type.

    David Karnok
    @akarnokd
    Use flatMap to continue with some other type.
    API.login
        .doOnNext(loginData -> { /* ... */ })
        .flatMap(loginData -> API.fetchFavorites(loginData.userId))
    Phoenix
    @wbinarytree
    But it's not guarantee to be called after complete no? Also For example maybe the upstream emit more than one item(instead of login). But ignoreElements().anThen() will called only once after the upstream complete.
    Oleh Dokuka
    @OlegDokuka
    use .concateMap instead. It guaranties that each call will be executed in order as they come,
    Phoenix
    @wbinarytree
    yes. Then back to the problem of the same signature in concatMap
    Oleh Dokuka
    @OlegDokuka
    so if you make a call to external API concate guaraties that the next call will not be executed until previois has been completed
    Phoenix
    @wbinarytree
    I think concatMap is not the same. .ignoreElements().anThen() will execute only once no matter what number of item emitted by the upstream. but concatMap will map all element to the given transform function
    Oleh Dokuka
    @OlegDokuka
    Yeah, you are right.
    Mark Galea
    @cloudmark
    @akarnokd I'm reading the Rx Code and your blog and I have a question about the merge operator. Why is the main queue an SPSC instance (rather than MPSC) give that inner subscribers can call the parent.tryEmit(this, t); concurrently. Or is this impossible?
    David Karnok
    @akarnokd
    That SPSC queue is for the the main Observables that turn out to be of scalar origin, such as just. Since these come through onNext, there is guarantted to be one producer of them. The drain will then read from this SPSC queue and the queues of the inner Subscribers from a guaranteed single thread at a time.
    Mark Galea
    @cloudmark
    This message was deleted
    This message was deleted

    Ah gotcha! So the main queue is only responsible for scalars from the following line:

    if (t instanceof ScalarSynchronousObservable) {
                    tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
    }

    Each inner will have its own queue and not use the main. @akarnokd ?

    Mark Galea
    @cloudmark
    Why are scalars prioritised over inners in the emitLoop? From the code it seems that the main queue is dequeued first.
    David Karnok
    @akarnokd
    Yes. There are users who flatMap onto just() and empty() regularly and this shortcut helped their performance greatly.
    Mark Galea
    @cloudmark
    Thanks a lot!
    Gordeev Boris
    @gordeevbr
    Greetings! So I wanted to try out Java 9 MrJars. Since in DESIGN.md it says "we want to use the multi-release jar file support" I have decided to practice MrJars by attempting to migrate RxJava. I have succeeded in making a gradle script that produces seemingly valid MrJar with some limitations. It seems to be that there's no way to use this MrJar yet with gradle, which means that this is probably not a production ready solution. I have decided not to submit a PR yet, but I would be glad if someone takes a look at changes in my repo.
    Is it going to work if I just provide a link? https://github.com/gordeevbr/RxJava
    So there are old build scripts (jar, sourcesJar, javadocJar) and new ones are mrJar, sourcesMrJar and javadocMrJar. I have tested them out and everything seems to be in order.
    If I just include it into my classpath with gradle using flatDir, I can't seem to use the Java 9 part.
    Thanks in advance!
    David Karnok
    @akarnokd
    @gordeevbr Hi. Please don't submit a PR for multi-release jar. There is currently no value in having twice the JAR size and the duplication of the source files. In addition, you can't add new public API (such as Java 9+ features) via mr jars anyway and I'm not sure the build and release infrastructure could even handle mr jars properly.
    Gordeev Boris
    @gordeevbr
    Yeah, that's why I didn't submit a PR.
    I have made it so that there's no duplication of source files, the new API "extends" the old one, even though it does so in a not obvious manner.
    It seems to me that Gradle can handle creating an MrJar, but I think there's no way to use these Jars yet. That's why I gave up on it right now.
    Gordeev Boris
    @gordeevbr
    That's why I asked if someone could give it a look.
    Gordeev Boris
    @gordeevbr
    Oh wait, I'm totally wrong.
    Yeah, that won't work it seems.
    I have read through this again.