Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ben Christensen
    @benjchristensen
    but why is that work being done from within an Observer.onComplete?
    work should never be done by onComplete.
    (other than cleanup with try/catch around it)
    Mark Paluch
    @mp911de
    Not sure I follow
    Ben Christensen
    @benjchristensen
    Your question was about onCompleted throwing an exception, right?
    Mark Paluch
    @mp911de
    Yeah, in particular how to deal with it as producer/emitter
    Ben Christensen
    @benjchristensen

    Observer.onCompleted should not ever throw. That breaks the contract and puts the system into undefined state. So, it's best to avoid user code that can throw from inside onCompleted.

    So what are you doing inside onCompleted? For example, are you using the onCompleted signal to invoke user code?

    Mark Paluch
    @mp911de
    Ah, ok, now I understand: One of my users uses Hystrix and gRPC to perform some remote RPC work.
    Ben Christensen
    @benjchristensen
    and it is kicked off from inside onComplete of your Observer?
    Mark Paluch
    @mp911de
    I don't know what exactly is done in the user code as I'm just providing the Observable in the framework
    Exactly
    Ben Christensen
    @benjchristensen
    yes, so instead of using the Observer to do, I suggest using flatMap (for async), or doOnCompleted (for synchronous side effects) to kick off work
    and your final Observer is JUST for cleanup
    that way an operator triggers everything in a way that error handling can be done
    Mark Paluch
    @mp911de
    Ok, I'll pass that details on. Thanks for giving me the certainty that throwing exceptions in onCompleted is illegal
    Ben Christensen
    @benjchristensen
    This flatMap overload is very powerful, as you can kick off async work (another Observable) on onNext, onError, or onComplete signals: http://reactivex.io/RxJava/javadoc/rx/Observable.html#flatMap(rx.functions.Func1,%20rx.functions.Func1,%20rx.functions.Func0)
    yup, no problem
    Mark Paluch
    @mp911de
    Thanks a lot
    Abhinav Solan
    @eyeced
    I have problem regarding scheduling, I want to run like fetching things from a distributed cache in parallel, but here when making the call I would need to set some thread locals in the subsequent threads, one is Session which I can manage by passing my own executor, other thread local is the org context as we have a multi org application, orgId I can fetch from the objects I am working on, but would need to switch the org context whenever the parallel call is made to the distributed cache we have. I think RxJavaSchedulersHook might be helpful but I am not sure, any help around this ?
    Ben Christensen
    @benjchristensen

    Yes you'll need to wrap the schedulers to do the copying of thread locals. The SchedulersHook is a good place to hook in your implementation if you want the default hooks to all pick up your behavior.

    Hystrix does things like this as well if you want to see a similar example: https://github.com/Netflix/Hystrix/wiki/Plugins#concurrencystrategy

    For example: https://github.com/Netflix/Hystrix/blob/master/hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java

    Abhinav Solan
    @eyeced
    Thanks @benjchristensen this would really help
    Abhinav Solan
    @eyeced
    I tried with that approach and that fixes it, one thing I wanted to ask .. what we are doing is we are reading messages from Kafka .. and then these messages have to go through some validations and then persisted in Cassandra, and to completely make use of the Cassandra and Kafka asynchronous nature in between we are completely using RxJava to create pipelines, which is very long because we use a lot of group by to split the stream between valid and invalid, we have around say 10 validation so we split the stream on each validation .. now for a small batch size everything works but if the batch size increases to let say around 2000 or 3000 messages the stream just hangs somewhere, I am trying to look through thread dump and all but still not able to find something. Had anyone faced this kind of issue ?
    private Observable<AssetDataPoint> processInner(String topic, Long orgId, Observable<AssetDataPoint> assetDataPoints) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Starting asset data points processing");
            }
            return assetDataPoints
                    // convert asset data point to cts reading.
                    .flatMap(this::fromAssetDataPoint)
                    // group message by reading key
                    .groupBy(KeyReadings::getReadingKey)
                    // in grouped messages, if there are multiple streams and each object in stream
                    // has repeated reads for single meas type then group them up into one stream
                    // so after this grouping we would have reading key and all reads in them.
                    .map(grpObs -> new KeyReadings(grpObs.getKey(), grpObs.flatMap(KeyReadings::getReadings)))
                    // map them into a valid state tuple
                    .map(keyReadings -> new StateTuple<>(ValidationState.VALID, keyReadings))
                    // check the validity of the meas type
                    .map(this::validMeasType)
                    // split reads which are not time aligned
                    // if the meas type is of interval then check that the read is time aligned or not
                    .flatMap(this::splitReadsIfNotTimeAligned)
                    // reading should not have null flags
                    .flatMap(this::validateReadHasNoNullFlags)
                    // filter out of order reads from the batch
                    .flatMap(this::filterOutOfOrderReads)
                    // read is not late arriving
                    .flatMap(this::splitStreamInOrderAndOutOfOrderOrDuplicate)
                    // if the read is late arriving then check for the last milestone checked
                    .flatMap(this::readAfterMilestoneCheckedIn)
                    // on the base of the state persist the read accordingly into cassandra
                    .map(this::persistInCassandra)
                    // convert key and reading to asset data point
                    .flatMap(this::toAssetDataPoint)
                    // log the result to kafka on basis of the state
                    .flatMap(stateTuple ->
                            logInKafka(stateTuple, topic, orgId, this::getKafkaKey))
                    // if this stream fail retry it again 3 more times
                    .retry(3);
        }
    my code looks something like this .. each method also have a 3-4 pipeline in it.
    persistInCassandra and loginKafka are the parallel calls where the futures are handled by the ContextScheduler like HystrixContextScheduler
    kimifdw
    @kimifdw
    hello,I want to use rxjava to do some thing like java.util.concurrent.Future do(Future can return async result).I have no idea to do it
    Dorus
    @Dorus
    @kimifdw Rx.Observable.just(1,2,3).flatMap(e -> getDataA(e)).flatMap(a -> getDataB(a)) etc.
    You just transform the stream each time with a new operator. flatMap takes an async function.
    David Ray
    @cogmission
    @kimifdw I'm going to attempt a "neophytes" answer to this: In a Nutshell you have two parts, the Observable and the Subscriber. The create of the Observable takes an Observable.OnSubscribe function that gets executed when a client subscribes. Put your functionality that gets executed by the "server" in that OnSubscribe function, and then return an Observable that when subscribed to will begin to be pushed executed results. If that helps? The asynchronous process executes in the OnSubscribe and then calls onNext(results) on the subscriber.
    kimifdw
    @kimifdw
    @cogmission @Dorus It woks.thank you
    David Ray
    @cogmission
    @kimifdw You're welcome!
    Eugene
    @Hronom

    Hello guys, here is the simple code:

    public class App {
        public static void main(String[] args) {
            PublishSubject<Integer> subject = PublishSubject.create();
            Observable<Integer> observable = subject.observeOn(Schedulers.computation());
            subject.onNext(1);
            observable.subscribe(App::ppp);
            System.out.println("on 2");
            subject.onNext(2);
            System.out.println("on 3");
            subject.onNext(3);
            System.out.println("on 4");
            subject.onNext(4);
            System.out.println("Complete");
            subject.onCompleted();
            // I need here wait until all ppp completes execution ?
        }
    
        private static void ppp(Integer ggg) {
            System.out.println(ggg);
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(3));
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("End " + e);
            }
            System.out.println("End " + ggg);
        }
    }

    So the question is what I need to put in the end of main to meet my requirements: wait until all ppp completes execution ? What is the right way in RxJava ?

    Ben Christensen
    @benjchristensen
    You can convert to a BlockingObservable instead of subscribing asynchronously.
    observable.toBlocking() ... and then use one of its blocking methods, such as forEach.
    it will block the main thread for you
    Satyarth Sampath
    @droidekas

    Hi There,

    I have a weird issue which I am not able to word properly,but I shall try my best here.

    Observable<Item> stream;
    //getList is a completing observable,it returns a list of items and marks as completed
    Observable<List<Item>>  getList;
    
    function Observable<List<Item>> getListStream(){
    
            //Whenever _stream_ emits an item,
    I need to return the list of items emitted by getList as the onNext of the 
    }
    DavidMihola
    @DavidMihola
    So, after each new item you want to emit a list of all items from the first item up to the one that was just emitted?
    Darius Lapūnas
    @DariusL
    scan would work
    David Karnok
    @akarnokd
    RxJava 2.0.0-RC1 has been released. (It may take a few hours to show up on maven search though.)
    Michael Nitschinger
    @daschl
    @akarnokd awesome! I’m already thinking how I can get the couchbase sdk to 2.x in the mid term without breaking too much people and providing a migration path
    David Karnok
    @akarnokd
    Did you do Reactive-Streams support before (for example targeting Reactor-Core)?
    Michael Nitschinger
    @daschl
    no we are currently exposing 1.1.x observables
    David Karnok
    @akarnokd
    Do you have many custom operators on top of Operator or OnSubscribe?
    Srepfler Srdan
    @schrepfler
    :clap:
    Michael Nitschinger
    @daschl
    @akarnokd no custom operators :)
    we are using a custom subject but I can port it and/or get rid of it if there is a better alternative
    I’m more concerned about if I add both to the classpath people might mess up their code (we have lots of users that are new to reactive programming)
    Enver Balalic
    @Enverbalalic
    Hi guys, just a quick question that I cant seem to solve. Is there a way to resubscribe to the same observable, with the same observer on every onComplete or onError, I know that there is a retry() option to resubscribe onError, I also tried doOnCompleted, doOnTerminate, doAfterTerminate to resubscribe but i cant seem to solve this
    it looks something like this
    Observer<Object> observer = new Observer<Object>() {
         lifecycle methods
    }
    Observable<Object> observable = function_returns_observable()
         .subscribeOn(Schedulers.newThread())
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe(observer);
    observable.doAfterTerminate(()-> observable.subscribe(observer);
    observable.subscribe(observer);
    Enver Balalic
    @Enverbalalic
    right now im just exploiting the onError retry() parameter
    and instead of calling onComplete when my observable finishes
    i just call onError