Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Mark Paluch
    Ben Christensen
    yes, so instead of using the Observer to do, I suggest using flatMap (for async), or doOnCompleted (for synchronous side effects) to kick off work
    and your final Observer is JUST for cleanup
    that way an operator triggers everything in a way that error handling can be done
    Mark Paluch
    Ok, I'll pass that details on. Thanks for giving me the certainty that throwing exceptions in onCompleted is illegal
    Ben Christensen
    This flatMap overload is very powerful, as you can kick off async work (another Observable) on onNext, onError, or onComplete signals: http://reactivex.io/RxJava/javadoc/rx/Observable.html#flatMap(rx.functions.Func1,%20rx.functions.Func1,%20rx.functions.Func0)
    yup, no problem
    Mark Paluch
    Thanks a lot
    Abhinav Solan
    I have problem regarding scheduling, I want to run like fetching things from a distributed cache in parallel, but here when making the call I would need to set some thread locals in the subsequent threads, one is Session which I can manage by passing my own executor, other thread local is the org context as we have a multi org application, orgId I can fetch from the objects I am working on, but would need to switch the org context whenever the parallel call is made to the distributed cache we have. I think RxJavaSchedulersHook might be helpful but I am not sure, any help around this ?
    Ben Christensen

    Yes you'll need to wrap the schedulers to do the copying of thread locals. The SchedulersHook is a good place to hook in your implementation if you want the default hooks to all pick up your behavior.

    Hystrix does things like this as well if you want to see a similar example: https://github.com/Netflix/Hystrix/wiki/Plugins#concurrencystrategy

    For example: https://github.com/Netflix/Hystrix/blob/master/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java

    Abhinav Solan
    Thanks @benjchristensen this would really help
    Abhinav Solan
    I tried with that approach and that fixes it, one thing I wanted to ask .. what we are doing is we are reading messages from Kafka .. and then these messages have to go through some validations and then persisted in Cassandra, and to completely make use of the Cassandra and Kafka asynchronous nature in between we are completely using RxJava to create pipelines, which is very long because we use a lot of group by to split the stream between valid and invalid, we have around say 10 validation so we split the stream on each validation .. now for a small batch size everything works but if the batch size increases to let say around 2000 or 3000 messages the stream just hangs somewhere, I am trying to look through thread dump and all but still not able to find something. Had anyone faced this kind of issue ?
    private Observable<AssetDataPoint> processInner(String topic, Long orgId, Observable<AssetDataPoint> assetDataPoints) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Starting asset data points processing");
            return assetDataPoints
                    // convert asset data point to cts reading.
                    // group message by reading key
                    // in grouped messages, if there are multiple streams and each object in stream
                    // has repeated reads for single meas type then group them up into one stream
                    // so after this grouping we would have reading key and all reads in them.
                    .map(grpObs -> new KeyReadings(grpObs.getKey(), grpObs.flatMap(KeyReadings::getReadings)))
                    // map them into a valid state tuple
                    .map(keyReadings -> new StateTuple<>(ValidationState.VALID, keyReadings))
                    // check the validity of the meas type
                    // split reads which are not time aligned
                    // if the meas type is of interval then check that the read is time aligned or not
                    // reading should not have null flags
                    // filter out of order reads from the batch
                    // read is not late arriving
                    // if the read is late arriving then check for the last milestone checked
                    // on the base of the state persist the read accordingly into cassandra
                    // convert key and reading to asset data point
                    // log the result to kafka on basis of the state
                    .flatMap(stateTuple ->
                            logInKafka(stateTuple, topic, orgId, this::getKafkaKey))
                    // if this stream fail retry it again 3 more times
    my code looks something like this .. each method also have a 3-4 pipeline in it.
    persistInCassandra and loginKafka are the parallel calls where the futures are handled by the ContextScheduler like HystrixContextScheduler
    hello,I want to use rxjava to do some thing like java.util.concurrent.Future do(Future can return async result).I have no idea to do it
    @kimifdw Rx.Observable.just(1,2,3).flatMap(e -> getDataA(e)).flatMap(a -> getDataB(a)) etc.
    You just transform the stream each time with a new operator. flatMap takes an async function.
    David Ray
    @kimifdw I'm going to attempt a "neophytes" answer to this: In a Nutshell you have two parts, the Observable and the Subscriber. The create of the Observable takes an Observable.OnSubscribe function that gets executed when a client subscribes. Put your functionality that gets executed by the "server" in that OnSubscribe function, and then return an Observable that when subscribed to will begin to be pushed executed results. If that helps? The asynchronous process executes in the OnSubscribe and then calls onNext(results) on the subscriber.
    @cogmission @Dorus It woks.thank you
    David Ray
    @kimifdw You're welcome!

    Hello guys, here is the simple code:

    public class App {
        public static void main(String[] args) {
            PublishSubject<Integer> subject = PublishSubject.create();
            Observable<Integer> observable = subject.observeOn(Schedulers.computation());
            System.out.println("on 2");
            System.out.println("on 3");
            System.out.println("on 4");
            // I need here wait until all ppp completes execution ?
        private static void ppp(Integer ggg) {
            try {
            } catch (InterruptedException e) {
                System.out.println("End " + e);
            System.out.println("End " + ggg);

    So the question is what I need to put in the end of main to meet my requirements: wait until all ppp completes execution ? What is the right way in RxJava ?

    Ben Christensen
    You can convert to a BlockingObservable instead of subscribing asynchronously.
    observable.toBlocking() ... and then use one of its blocking methods, such as forEach.
    it will block the main thread for you
    Satyarth Sampath

    Hi There,

    I have a weird issue which I am not able to word properly,but I shall try my best here.

    Observable<Item> stream;
    //getList is a completing observable,it returns a list of items and marks as completed
    Observable<List<Item>>  getList;
    function Observable<List<Item>> getListStream(){
            //Whenever _stream_ emits an item,
    I need to return the list of items emitted by getList as the onNext of the 
    So, after each new item you want to emit a list of all items from the first item up to the one that was just emitted?
    Darius Lapūnas
    scan would work
    David Karnok
    RxJava 2.0.0-RC1 has been released. (It may take a few hours to show up on maven search though.)
    Michael Nitschinger
    @akarnokd awesome! I’m already thinking how I can get the couchbase sdk to 2.x in the mid term without breaking too much people and providing a migration path
    David Karnok
    Did you do Reactive-Streams support before (for example targeting Reactor-Core)?
    Michael Nitschinger
    no we are currently exposing 1.1.x observables
    David Karnok
    Do you have many custom operators on top of Operator or OnSubscribe?
    Srepfler Srdan
    Michael Nitschinger
    @akarnokd no custom operators :)
    we are using a custom subject but I can port it and/or get rid of it if there is a better alternative
    I’m more concerned about if I add both to the classpath people might mess up their code (we have lots of users that are new to reactive programming)
    Enver Balalic
    Hi guys, just a quick question that I cant seem to solve. Is there a way to resubscribe to the same observable, with the same observer on every onComplete or onError, I know that there is a retry() option to resubscribe onError, I also tried doOnCompleted, doOnTerminate, doAfterTerminate to resubscribe but i cant seem to solve this
    it looks something like this
    Observer<Object> observer = new Observer<Object>() {
         lifecycle methods
    Observable<Object> observable = function_returns_observable()
    observable.doAfterTerminate(()-> observable.subscribe(observer);
    Enver Balalic
    right now im just exploiting the onError retry() parameter
    and instead of calling onComplete when my observable finishes
    i just call onError
    and the retry() parameter catches that
    which is, horrible....
    @Enverbalalic retry() is only for retrying when error occurs. use repeat()
    How can two Observables be zipped and return a new Observable, not an Observable<Observable>? Observable.zip() returns an Observable of an Observable (i.g. Observable<Observable<Integer>>) when only the Observable (i.e. Observable<Integer>) is desired.
    it shouldnt
    can you show your code?
    Sure. I'll rip out the unnecessary stuff and pbin it.
    i can share some general ideas. For exmaple you should use flatMap no map to create the ziped observable.
      .flatMap(e =>
          (first, second) => ({first, second}) // optional selector to collect results from inner functions
        ), (e, {first, second}) => ({e, first, second}) // optional selector to collect result and combine them with original.