These are chat archives for ReactiveX/RxJava

4th
Aug 2016
Abhinav Solan
@eyeced
Aug 04 2016 17:52
I tried with that approach and that fixes it, one thing I wanted to ask .. what we are doing is we are reading messages from Kafka .. and then these messages have to go through some validations and then persisted in Cassandra, and to completely make use of the Cassandra and Kafka asynchronous nature in between we are completely using RxJava to create pipelines, which is very long because we use a lot of group by to split the stream between valid and invalid, we have around say 10 validation so we split the stream on each validation .. now for a small batch size everything works but if the batch size increases to let say around 2000 or 3000 messages the stream just hangs somewhere, I am trying to look through thread dump and all but still not able to find something. Had anyone faced this kind of issue ?
private Observable<AssetDataPoint> processInner(String topic, Long orgId, Observable<AssetDataPoint> assetDataPoints) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting asset data points processing");
        }
        return assetDataPoints
                // convert asset data point to cts reading.
                .flatMap(this::fromAssetDataPoint)
                // group message by reading key
                .groupBy(KeyReadings::getReadingKey)
                // in grouped messages, if there are multiple streams and each object in stream
                // has repeated reads for single meas type then group them up into one stream
                // so after this grouping we would have reading key and all reads in them.
                .map(grpObs -> new KeyReadings(grpObs.getKey(), grpObs.flatMap(KeyReadings::getReadings)))
                // map them into a valid state tuple
                .map(keyReadings -> new StateTuple<>(ValidationState.VALID, keyReadings))
                // check the validity of the meas type
                .map(this::validMeasType)
                // split reads which are not time aligned
                // if the meas type is of interval then check that the read is time aligned or not
                .flatMap(this::splitReadsIfNotTimeAligned)
                // reading should not have null flags
                .flatMap(this::validateReadHasNoNullFlags)
                // filter out of order reads from the batch
                .flatMap(this::filterOutOfOrderReads)
                // read is not late arriving
                .flatMap(this::splitStreamInOrderAndOutOfOrderOrDuplicate)
                // if the read is late arriving then check for the last milestone checked
                .flatMap(this::readAfterMilestoneCheckedIn)
                // on the base of the state persist the read accordingly into cassandra
                .map(this::persistInCassandra)
                // convert key and reading to asset data point
                .flatMap(this::toAssetDataPoint)
                // log the result to kafka on basis of the state
                .flatMap(stateTuple ->
                        logInKafka(stateTuple, topic, orgId, this::getKafkaKey))
                // if this stream fail retry it again 3 more times
                .retry(3);
    }
my code looks something like this .. each method also have a 3-4 pipeline in it.
persistInCassandra and loginKafka are the parallel calls where the futures are handled by the ContextScheduler like HystrixContextScheduler