Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Artem Bilan
    @artembilan
    You also can add that org.springframework.integration.history package to white list to restore previous behavior though: DefaultKafkaHeaderMapper.addTrustedPackages():
    
        /**
         * Add packages to the trusted packages list (default {@code java.util, java.lang}) used
         * when constructing objects from JSON.
         * If any of the supplied packages is {@code "*"}, all packages are trusted.
         * If a class for a non-trusted package is encountered, the header is returned to the
         * application with value of type {@link NonTrustedHeaderType}.
         * @param packagesToTrust the packages to trust.
         */
        public void addTrustedPackages(String... packagesToTrust) {
    Jorg Heymans
    @jorgheymans
    @artembilan but ChainedTransactionManager is java configuration only right
    Artem Bilan
    @artembilan
    Right. And what is the problem with that?
    Jorg Heymans
    @jorgheymans
    nothing was just trying to figure out how to use it, the docs are rather brief
    also not sure this actually helps the issue above since my app is producer only
    seems the chainedtxmanager helps with listener code and sending offsets in the same tx
    anyway
    Mingliang
    @mingliangguo
    @artembilan thank you so much! Appreciate for your help!
    rahulmlokurte
    @rahulmlokurte
    I have written an Spring boot Kafka Consumer with Avro data format. The error deserializing key/value for partition is handled by ErrorHandler. But Error handler threw an exception java.lang.ArrayIndexOutOfBoundsException: 1 at com.ctl.asynchmessage.configuration.AppConfig$1.handle(AppConfig.java:95) ~[classes/:na] . Any advise
    String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
    Artem Bilan
    @artembilan
    Looks like some problem in your code.
    Any chances to see that com.ctl.asynchmessage.configuration.AppConfig?
    Also: any chances to move such a story to StackOverflow for better context and traceability
    rahulmlokurte
    @rahulmlokurte
    package com.ctl.asynchmessage.configuration;
    
    
    import com.ctl.asynchmessage.utils.AvroDeserializer;
    import io.confluent.connect.avro.ConnectDefault;
    import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import org.apache.avro.data.Json;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.serializer.Deserializer;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ErrorHandler;
    import org.springframework.kafka.listener.MessageListenerContainer;
    import org.springframework.kafka.support.converter.MessageConverter;
    import org.springframework.kafka.support.serializer.JsonDeserializer;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class AppConfig {
    
        private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
        @Value("${spring.kafka.consumer.max-poll-records}")
        private int maxPollRecords;
        @Value("${spring.kafka.consumer.group-id}")
        private String groupId;
        @Value("${spring.kafka.consumer.client-id}")
        private String clientId;
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        private String autoOffsetReset;
        @Value("${spring.kafka.consumer.topic}")
        private String topic;
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
            props.put("schema.registry.url", "http://127.0.0.1:8081");
            return props;
        }
    
        @Bean
        public ConsumerFactory<String, ConnectDefault> consumerFactory() {
            return new DefaultKafkaConsumerFactory<String, ConnectDefault>(consumerConfigs(),new StringDeserializer(), new JsonDeserializer<>(ConnectDefault.class));
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> kafkaListenerContainerFactory() throws IOException {
            ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
     factory.setErrorHandler(new ErrorHandler(){
    
                @Override
                public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
                    String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                    String topics = s.split("-")[0] + "-" + s.split("-")[1];
                    int offset = Integer.valueOf(s.split("offset ")[1]);
                    int partition = Integer.valueOf(s.split("-")[2].split(" at")[0]);
                    TopicPartition topicPartition = new TopicPartition(topics, partition);
                    logger.info("Skipping " + topic + "-" + partition + " offset " + offset);
                    consumer.seek(topicPartition, offset + 1);
                }
    
                @Override
                public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
    
                }
    
                @Override
                public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
                    String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                    String topics = s.split("-")[0];
                    int offset = Integer.valueOf(s.split("offset ")[1]);
                    int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
                    TopicPartition topicPartition = new TopicPartition(topics, partition);
                    logger.info("Skipping " + topic + "-" + partition + " offset " + offset);
                    consumer.seek(topicPartition, offset + 1);
                }
            });
    
    
    
            return factory;
        }
    }
    Artem Bilan
    @artembilan
    What does your .split("Error deserializing key/value for partition ") mean? What do you really split? And what is your goal? What should be in the end of that split?
    And why do you try to split the second part of first split result: split(". If needed, please seek past the record to continue consumption.")
    rahulmlokurte
    @rahulmlokurte
    The end goal is to consume Avro message through schema registry. When I start the consumer application, I get the error
    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition wstordo-event-0 at offset 23. If needed, please seek past the record to continue consumption
    To move past the record, i have implemented the logic in ErrorHandler..
    Artem Bilan
    @artembilan
    Right. But why do you try to split that message in your code?
    Do you know what does split() do?
    rahulmlokurte
    @rahulmlokurte
    I split it to know the current partition, topic name and the offset, so that I can increment that offset on the partition
    Artem Bilan
    @artembilan
       /**
         * Splits this string around matches of the given <a
         * href="../util/regex/Pattern.html#sum">regular expression</a>.
         *
         * <p> This method works as if by invoking the two-argument {@link
         * #split(String, int) split} method with the given expression and a limit
         * argument of zero.  Trailing empty strings are therefore not included in
         * the resulting array.
         *
         * <p> The string {@code "boo:and:foo"}, for example, yields the following
         * results with these expressions:
         *
         * <blockquote><table cellpadding=1 cellspacing=0 summary="Split examples showing regex and result">
         * <tr>
         *  <th>Regex</th>
         *  <th>Result</th>
         * </tr>
         * <tr><td align=center>:</td>
         *     <td>{@code { "boo", "and", "foo" }}</td></tr>
         * <tr><td align=center>o</td>
         *     <td>{@code { "b", "", ":and:f" }}</td></tr>
         * </table></blockquote>
         *
         *
         * @param  regex
         *         the delimiting regular expression
         *
         * @return  the array of strings computed by splitting this string
         *          around matches of the given regular expression
         *
         * @throws  PatternSyntaxException
         *          if the regular expression's syntax is invalid
         *
         * @see java.util.regex.Pattern
         *
         * @since 1.4
         * @spec JSR-51
         */
        public String[] split(String regex) {
    The split("-") is correct in your code, but I can't understand what you expect from the split use like this:
    .split("Error deserializing key/value for partition ")
    what should be a result?
    You can get topic, partition and offset from the ConsumerRecord
    Sorry, need to go
    rahulmlokurte
    @rahulmlokurte
    ok.. Instead of splitting it, I can directly get the topic, partition and offset. Let me check on that..Thanks
    rahulmlokurte
    @rahulmlokurte
    when a record is encountered that cannot be deserialized from Avro, I am getting exception as
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition wstordo-event-4
    it cannot be deserialized. The record is not added to the collection returned by poll(). This results in a loop of fail as poll() will continue returning the same record repeatedly with no provided way to get past the record.
    Any Solution to this
    rahulmlokurte
    @rahulmlokurte
    Now I am getting below error
    2019-09-08 22:23:39.955 ERROR 11448 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null
    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition wstordo-event-4
    Here is the configuration done :
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
     props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class);
     props.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, KafkaErrorProvider.class);
     @Bean
        public ConsumerFactory<String, ConnectDefault> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(), new AvroDeserializer<>(ConnectDefault.class));
        }
    Artem Bilan
    @artembilan
    I would say that you need to move that ErrorHandlingDeserializer2 into your DefaultKafkaConsumerFactory definition. Your props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class); doesn't make any effect because you override such a config with an explicit instantiation via bean definition.
    rahulmlokurte
    @rahulmlokurte
    Now, I am getting below error:
    Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    Caused by: java.lang.NoSuchMethodException: com.ctl.asynchmessage.utils.AvroDeserializer.<init>()
        at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_144]
        at java.lang.Class.newInstance(Class.java:412) ~[na:1.8.0_144]
        ... 25 common frames omitted
    Below is my customer deserializer
    package com.ctl.asynchmessage.utils;
    
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.io.DatumReader;
    import org.apache.avro.io.Decoder;
    import org.apache.avro.io.DecoderFactory;
    import org.apache.avro.specific.SpecificDatumReader;
    import org.apache.avro.specific.SpecificRecordBase;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.xml.bind.DatatypeConverter;
    import java.util.Arrays;
    import java.util.Map;
    
    public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
    
        private static final Logger logger = LoggerFactory.getLogger(AvroDeserializer.class);
    
        protected final Class<T> targetType;
    
        public AvroDeserializer(Class<T> targetType) {
            this.targetType = targetType;
        }
    
        @Override
        public void configure(Map<String, ?> map, boolean b) {
            // No-op
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public T deserialize(String topic, byte[] data) {
    
            try {
                T result = null;
    
                if (data != null) {
                    logger.debug("data='{}'", DatatypeConverter.printHexBinary(data));
    
                    DatumReader<GenericRecord> datumReader =
                            new SpecificDatumReader<>(targetType.newInstance().getSchema());
                    Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
    
                    result = (T) datumReader.read(null, decoder);
                    logger.debug("deserialized data='{}'", result);
                }
                return result;
            } catch (Exception ex) {
                throw new SerializationException(
                        "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
            }
        }
    
        @Override
        public void close() {
            // No-op
        }
    }
    rahulmlokurte
    @rahulmlokurte
    I have removed custom deserializer and added kafkaavrodeserializer from confluent repository. It worked.
    Artem Bilan
    @artembilan
    Good. Would you mind to share the final working config, please?
    rahulmlokurte
    @rahulmlokurte
    But the problem is , now even if i pass string, kafka listener is listening with values set to null as shown below
    wstrdo obj='{"EVENTID": null, "EVENTTYPE": null, "EVENTCATEGORY": null,"JOBID": null, "JOBSTATUS": null}
    If I send proper avro, then it gives me the below
    '{"EVENTID": "915463629F0302E7E0530F4A020A9CEB", "EVENTTYPE": null,"EVENTCATEGORY": null, "JOBID": "JOB", "JOBSTATUS": "Status"}'
    After listening to the topic, I am executing some business logic . This causes an issue
    rahulmlokurte
    @rahulmlokurte
    How to Make listener ignore the Avro message, if it is not according to schema and then move on to next message ?
    rahulmlokurte
    @rahulmlokurte
    Here is the config file:
    package com.ctl.asynchmessage.configuration;
    
    
    import com.ctl.asynchmessage.handler.KafkaErrorProvider;
    import com.ctl.asynchmessage.utils.AvroDeserializer;
    import io.confluent.connect.avro.ConnectDefault;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ContainerProperties;
    import org.springframework.kafka.listener.ErrorHandler;
    import org.springframework.kafka.listener.MessageListenerContainer;
    import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
    import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class AppConfig {
    
        private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
        @Value("${spring.kafka.consumer.max-poll-records}")
        private int maxPollRecords;
        @Value("${spring.kafka.consumer.group-id}")
        private String groupId;
        @Value("${spring.kafka.consumer.client-id}")
        private String clientId;
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        private String autoOffsetReset;
        @Value("${spring.kafka.consumer.topic}")
        private String topic;
    
    
        @Bean
        public DefaultKafkaConsumerFactory<String, ConnectDefault> defaultDefaultKafkaConsumerFactory(){
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
            props.put("schema.registry.url", "***********CONFIDENTIAL*************");
            props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,true);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
            props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
            props.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, KafkaErrorProvider.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> kafkaListenerContainerFactory() throws IOException {
            ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(defaultDefaultKafkaConsumerFactory());
            factory.getContainerProperties().setAckOnError(false);
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
            return factory;
        }
    }
    Artem Bilan
    @artembilan
    Shouldn't the logic be processed by that ErrorHandlingDeserializer2:
    public T deserialize(String topic, byte[] data) {
            try {
                return this.delegate.deserialize(topic, data);
            }
            catch (Exception e) {
                return recoverFromSupplier(topic, null, data, e);
            }
        }
    I mean in case of any exception your KafkaErrorProvider should process that exception. No?