package com.ctl.asynchmessage.configuration;
import com.ctl.asynchmessage.utils.AvroDeserializer;
import io.confluent.connect.avro.ConnectDefault;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.data.Json;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.serializer.Deserializer;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@EnableKafka
public class AppConfig {
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.max-poll-records}")
private int maxPollRecords;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.client-id}")
private String clientId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.topic}")
private String topic;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put("schema.registry.url", "http://127.0.0.1:8081");
return props;
}
@Bean
public ConsumerFactory<String, ConnectDefault> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, ConnectDefault>(consumerConfigs(),new StringDeserializer(), new JsonDeserializer<>(ConnectDefault.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> kafkaListenerContainerFactory() throws IOException {
ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(new ErrorHandler(){
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0] + "-" + s.split("-")[1];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[2].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
logger.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
}
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
logger.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
}
});
return factory;
}
}
split(". If needed, please seek past the record to continue consumption.")
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition wstordo-event-0 at offset 23. If needed, please seek past the record to continue consumption
split()
do?
/**
* Splits this string around matches of the given <a
* href="../util/regex/Pattern.html#sum">regular expression</a>.
*
* <p> This method works as if by invoking the two-argument {@link
* #split(String, int) split} method with the given expression and a limit
* argument of zero. Trailing empty strings are therefore not included in
* the resulting array.
*
* <p> The string {@code "boo:and:foo"}, for example, yields the following
* results with these expressions:
*
* <blockquote><table cellpadding=1 cellspacing=0 summary="Split examples showing regex and result">
* <tr>
* <th>Regex</th>
* <th>Result</th>
* </tr>
* <tr><td align=center>:</td>
* <td>{@code { "boo", "and", "foo" }}</td></tr>
* <tr><td align=center>o</td>
* <td>{@code { "b", "", ":and:f" }}</td></tr>
* </table></blockquote>
*
*
* @param regex
* the delimiting regular expression
*
* @return the array of strings computed by splitting this string
* around matches of the given regular expression
*
* @throws PatternSyntaxException
* if the regular expression's syntax is invalid
*
* @see java.util.regex.Pattern
*
* @since 1.4
* @spec JSR-51
*/
public String[] split(String regex) {
split("-")
is correct in your code, but I can't understand what you expect from the split use like this:
.split("Error deserializing key/value for partition ")
what should be a result?
topic
, partition
and offset
from the ConsumerRecord
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class);
props.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, KafkaErrorProvider.class);
@Bean
public ConsumerFactory<String, ConnectDefault> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(), new AvroDeserializer<>(ConnectDefault.class));
}
ErrorHandlingDeserializer2
into your DefaultKafkaConsumerFactory
definition. Your props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
doesn't make any effect because you override such a config with an explicit instantiation via bean definition.
Caused by: java.lang.NoSuchMethodException: com.ctl.asynchmessage.utils.AvroDeserializer.<init>()
at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_144]
at java.lang.Class.newInstance(Class.java:412) ~[na:1.8.0_144]
... 25 common frames omitted
package com.ctl.asynchmessage.utils;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.DatatypeConverter;
import java.util.Arrays;
import java.util.Map;
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
private static final Logger logger = LoggerFactory.getLogger(AvroDeserializer.class);
protected final Class<T> targetType;
public AvroDeserializer(Class<T> targetType) {
this.targetType = targetType;
}
@Override
public void configure(Map<String, ?> map, boolean b) {
// No-op
}
@SuppressWarnings("unchecked")
@Override
public T deserialize(String topic, byte[] data) {
try {
T result = null;
if (data != null) {
logger.debug("data='{}'", DatatypeConverter.printHexBinary(data));
DatumReader<GenericRecord> datumReader =
new SpecificDatumReader<>(targetType.newInstance().getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
logger.debug("deserialized data='{}'", result);
}
return result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}
@Override
public void close() {
// No-op
}
}
'{"EVENTID": "915463629F0302E7E0530F4A020A9CEB", "EVENTTYPE": null,"EVENTCATEGORY": null, "JOBID": "JOB", "JOBSTATUS": "Status"}'
package com.ctl.asynchmessage.configuration;
import com.ctl.asynchmessage.handler.KafkaErrorProvider;
import com.ctl.asynchmessage.utils.AvroDeserializer;
import io.confluent.connect.avro.ConnectDefault;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@EnableKafka
public class AppConfig {
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.max-poll-records}")
private int maxPollRecords;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.client-id}")
private String clientId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.topic}")
private String topic;
@Bean
public DefaultKafkaConsumerFactory<String, ConnectDefault> defaultDefaultKafkaConsumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put("schema.registry.url", "***********CONFIDENTIAL*************");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,true);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, KafkaErrorProvider.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> kafkaListenerContainerFactory() throws IOException {
ConcurrentKafkaListenerContainerFactory<String, ConnectDefault> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(defaultDefaultKafkaConsumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
}
KafkaErrorProvider
should process that exception
. No?
@Transactional(isolation = Isolation.READ_COMMITTED)
@StreamListener(UpaConstants.OUTPUT_TOPIC)
public void listen(@Payload String event) {
log.info("Consumed {}", event);
}