Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Artem Bilan
    @artembilan
    Any chances to see that com.ctl.asynchmessage.configuration.AppConfig?
    Also: any chances to move such a story to StackOverflow for better context and traceability
    Rahul Lokurte
    @rahulmlokurte
    package com.ctl.asynchmessage.configuration;
    
    
    import com.ctl.asynchmessage.utils.AvroDeserializer;
    import io.confluent.connect.avro.ConnectDefault;
    import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import org.apache.avro.data.Json;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.serializer.Deserializer;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ErrorHandler;
    import org.springframework.kafka.listener.MessageListenerContainer;
    import org.springframework.kafka.support.converter.MessageConverter;
    import org.springframework.kafka.support.serializer.JsonDeserializer;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class AppConfig {
    
        private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
        @Value("${spring.kafka.consumer.max-poll-records}")
        private int maxPollRecords;
        @Value("${spring.kafka.consumer.group-id}")
        private String groupId;
        @Value("${spring.kafka.consumer.client-id}")
        private String clientId;
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        private String autoOffsetReset;
        @Value("${spring.kafka.consumer.topic}")
        private String topic;
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
            props.put("schema.registry.url", "http://127.0.0.1:8081");
            return props;
        }
    
        @Bean
        public ConsumerFactory<String, ConnectDefault> consumerFactory() {
            return new DefaultKafkaConsumerFactory<String, ConnectDefault>(consumerConfigs(),new StringDeserializer(), new JsonDeserializer<>(ConnectDefault.class));
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> kafkaListenerContainerFactory() throws IOException {
            ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
     factory.setErrorHandler(new ErrorHandler(){
    
                @Override
                public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
                    String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                    String topics = s.split("-")[0] + "-" + s.split("-")[1];
                    int offset = Integer.valueOf(s.split("offset ")[1]);
                    int partition = Integer.valueOf(s.split("-")[2].split(" at")[0]);
                    TopicPartition topicPartition = new TopicPartition(topics, partition);
                    logger.info("Skipping " + topic + "-" + partition + " offset " + offset);
                    consumer.seek(topicPartition, offset + 1);
                }
    
                @Override
                public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
    
                }
    
                @Override
                public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
                    String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                    String topics = s.split("-")[0];
                    int offset = Integer.valueOf(s.split("offset ")[1]);
                    int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
                    TopicPartition topicPartition = new TopicPartition(topics, partition);
                    logger.info("Skipping " + topic + "-" + partition + " offset " + offset);
                    consumer.seek(topicPartition, offset + 1);
                }
            });
    
    
    
            return factory;
        }
    }
    Artem Bilan
    @artembilan
    What does your .split("Error deserializing key/value for partition ") mean? What do you really split? And what is your goal? What should be in the end of that split?
    And why do you try to split the second part of first split result: split(". If needed, please seek past the record to continue consumption.")
    Rahul Lokurte
    @rahulmlokurte
    The end goal is to consume Avro message through schema registry. When I start the consumer application, I get the error
    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition wstordo-event-0 at offset 23. If needed, please seek past the record to continue consumption
    To move past the record, i have implemented the logic in ErrorHandler..
    Artem Bilan
    @artembilan
    Right. But why do you try to split that message in your code?
    Do you know what does split() do?
    Rahul Lokurte
    @rahulmlokurte
    I split it to know the current partition, topic name and the offset, so that I can increment that offset on the partition
    Artem Bilan
    @artembilan
       /**
         * Splits this string around matches of the given <a
         * href="../util/regex/Pattern.html#sum">regular expression</a>.
         *
         * <p> This method works as if by invoking the two-argument {@link
         * #split(String, int) split} method with the given expression and a limit
         * argument of zero.  Trailing empty strings are therefore not included in
         * the resulting array.
         *
         * <p> The string {@code "boo:and:foo"}, for example, yields the following
         * results with these expressions:
         *
         * <blockquote><table cellpadding=1 cellspacing=0 summary="Split examples showing regex and result">
         * <tr>
         *  <th>Regex</th>
         *  <th>Result</th>
         * </tr>
         * <tr><td align=center>:</td>
         *     <td>{@code { "boo", "and", "foo" }}</td></tr>
         * <tr><td align=center>o</td>
         *     <td>{@code { "b", "", ":and:f" }}</td></tr>
         * </table></blockquote>
         *
         *
         * @param  regex
         *         the delimiting regular expression
         *
         * @return  the array of strings computed by splitting this string
         *          around matches of the given regular expression
         *
         * @throws  PatternSyntaxException
         *          if the regular expression's syntax is invalid
         *
         * @see java.util.regex.Pattern
         *
         * @since 1.4
         * @spec JSR-51
         */
        public String[] split(String regex) {
    The split("-") is correct in your code, but I can't understand what you expect from the split use like this:
    .split("Error deserializing key/value for partition ")
    what should be a result?
    You can get topic, partition and offset from the ConsumerRecord
    Sorry, need to go
    Rahul Lokurte
    @rahulmlokurte
    ok.. Instead of splitting it, I can directly get the topic, partition and offset. Let me check on that..Thanks
    Rahul Lokurte
    @rahulmlokurte
    when a record is encountered that cannot be deserialized from Avro, I am getting exception as
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition wstordo-event-4
    it cannot be deserialized. The record is not added to the collection returned by poll(). This results in a loop of fail as poll() will continue returning the same record repeatedly with no provided way to get past the record.
    Any Solution to this
    Rahul Lokurte
    @rahulmlokurte
    Now I am getting below error
    2019-09-08 22:23:39.955 ERROR 11448 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null
    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition wstordo-event-4
    Here is the configuration done :
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
     props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class);
     props.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, KafkaErrorProvider.class);
     @Bean
        public ConsumerFactory<String, ConnectDefault> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(), new AvroDeserializer<>(ConnectDefault.class));
        }
    Artem Bilan
    @artembilan
    I would say that you need to move that ErrorHandlingDeserializer2 into your DefaultKafkaConsumerFactory definition. Your props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class); doesn't make any effect because you override such a config with an explicit instantiation via bean definition.
    Rahul Lokurte
    @rahulmlokurte
    Now, I am getting below error:
    Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    Caused by: java.lang.NoSuchMethodException: com.ctl.asynchmessage.utils.AvroDeserializer.<init>()
        at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_144]
        at java.lang.Class.newInstance(Class.java:412) ~[na:1.8.0_144]
        ... 25 common frames omitted
    Below is my customer deserializer
    package com.ctl.asynchmessage.utils;
    
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.io.DatumReader;
    import org.apache.avro.io.Decoder;
    import org.apache.avro.io.DecoderFactory;
    import org.apache.avro.specific.SpecificDatumReader;
    import org.apache.avro.specific.SpecificRecordBase;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.xml.bind.DatatypeConverter;
    import java.util.Arrays;
    import java.util.Map;
    
    public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
    
        private static final Logger logger = LoggerFactory.getLogger(AvroDeserializer.class);
    
        protected final Class<T> targetType;
    
        public AvroDeserializer(Class<T> targetType) {
            this.targetType = targetType;
        }
    
        @Override
        public void configure(Map<String, ?> map, boolean b) {
            // No-op
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public T deserialize(String topic, byte[] data) {
    
            try {
                T result = null;
    
                if (data != null) {
                    logger.debug("data='{}'", DatatypeConverter.printHexBinary(data));
    
                    DatumReader<GenericRecord> datumReader =
                            new SpecificDatumReader<>(targetType.newInstance().getSchema());
                    Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
    
                    result = (T) datumReader.read(null, decoder);
                    logger.debug("deserialized data='{}'", result);
                }
                return result;
            } catch (Exception ex) {
                throw new SerializationException(
                        "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
            }
        }
    
        @Override
        public void close() {
            // No-op
        }
    }
    Rahul Lokurte
    @rahulmlokurte
    I have removed custom deserializer and added kafkaavrodeserializer from confluent repository. It worked.
    Artem Bilan
    @artembilan
    Good. Would you mind to share the final working config, please?
    Rahul Lokurte
    @rahulmlokurte
    But the problem is , now even if i pass string, kafka listener is listening with values set to null as shown below
    wstrdo obj='{"EVENTID": null, "EVENTTYPE": null, "EVENTCATEGORY": null,"JOBID": null, "JOBSTATUS": null}
    If I send proper avro, then it gives me the below
    '{"EVENTID": "915463629F0302E7E0530F4A020A9CEB", "EVENTTYPE": null,"EVENTCATEGORY": null, "JOBID": "JOB", "JOBSTATUS": "Status"}'
    After listening to the topic, I am executing some business logic . This causes an issue
    Rahul Lokurte
    @rahulmlokurte
    How to Make listener ignore the Avro message, if it is not according to schema and then move on to next message ?
    Rahul Lokurte
    @rahulmlokurte
    Here is the config file:
    package com.ctl.asynchmessage.configuration;
    
    
    import com.ctl.asynchmessage.handler.KafkaErrorProvider;
    import com.ctl.asynchmessage.utils.AvroDeserializer;
    import io.confluent.connect.avro.ConnectDefault;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ContainerProperties;
    import org.springframework.kafka.listener.ErrorHandler;
    import org.springframework.kafka.listener.MessageListenerContainer;
    import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
    import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class AppConfig {
    
        private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
        @Value("${spring.kafka.consumer.max-poll-records}")
        private int maxPollRecords;
        @Value("${spring.kafka.consumer.group-id}")
        private String groupId;
        @Value("${spring.kafka.consumer.client-id}")
        private String clientId;
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        private String autoOffsetReset;
        @Value("${spring.kafka.consumer.topic}")
        private String topic;
    
    
        @Bean
        public DefaultKafkaConsumerFactory<String, ConnectDefault> defaultDefaultKafkaConsumerFactory(){
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
            props.put("schema.registry.url", "***********CONFIDENTIAL*************");
            props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,true);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
            props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
            props.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, KafkaErrorProvider.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> kafkaListenerContainerFactory() throws IOException {
            ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(defaultDefaultKafkaConsumerFactory());
            factory.getContainerProperties().setAckOnError(false);
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
            return factory;
        }
    }
    Artem Bilan
    @artembilan
    Shouldn't the logic be processed by that ErrorHandlingDeserializer2:
    public T deserialize(String topic, byte[] data) {
            try {
                return this.delegate.deserialize(topic, data);
            }
            catch (Exception e) {
                return recoverFromSupplier(topic, null, data, e);
            }
        }
    I mean in case of any exception your KafkaErrorProvider should process that exception. No?
    Rahul Lokurte
    @rahulmlokurte
    Yes. It does
    Marcel Koopman
    @marcelkoopman
    Hi, i'm having some issues with kafka transactions, the producer seems to use a transaction, but the messagechannel send method wraps everything into a transaction, is this specific for cloud stream kafka?
    Gary Russell
    @garyrussell
    You mean it starts a new transaction? There was a bug in older versions; what version are you using?
    Marcel Koopman
    @marcelkoopman
    i think the most recent version, but i will check. do you remember if that bug was starting a new transaction with every message send?
    Gary Russell
    @garyrussell
    Yes, the producer binding didn't detect the transaction that was started by the consumer so each send ran in its own tx.
    Marcel Koopman
    @marcelkoopman
    ok great, thx @garyrussell
    one last thing, which project are we talking about that contained that bug?
    Gary Russell
    @garyrussell
    It was fixed in spring-integration-kafka 3.1.2 spring-projects/spring-integration-kafka#260
    Marcel Koopman
    @marcelkoopman
    nice
    Gary Russell
    @garyrussell
    But there is a work around for earlier versions (I just have to find it).
    Marcel Koopman
    @marcelkoopman
    i can upgrade no problem
    Marcel Koopman
    @marcelkoopman
    My kafka producer aborts a transaction, but my output consumer still receives events, I thought it would consume only then the transaction committed
    @Transactional(isolation = Isolation.READ_COMMITTED) @StreamListener(UpaConstants.OUTPUT_TOPIC) public void listen(@Payload String event) { log.info("Consumed {}", event); }
    Gnurt
    @HuaTrung
    Hi guys. I'm facing a problem when dealing with ErrorHandlingDeserializer2 . I followed this https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#error-handling-deserializer. But my "failedDeserializationFunction" in "ErrorHandlingDeserializer2" wrapper is always null. Therefore, I can't use my custom FailedDeserializationFunction.