java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read from segment FileRecords(size=1966, file=/var/lib/kafka/__consumer_offsets-0/00000000000000000000.log, start=0, end=2147483647)
I've been trying everything to debug this but I'm not being able to pin what's causing it (I had to update this kafka broker server properties: https://gist.github.com/tomasAlabes/0bd3e03546e399db6c6e6b8d4a78686b)
Hi guys, I think I'm probably missing some theory regarding produce/consume json.
What I'm trying to achieve is
Point is, I want for my consumer microservice, to consume multiple json objects from multiple producers, so the property spring.kafka.consumer.properties.spring.json.value.default.type
I thinks is not enough for this case.
Now, problem is, if I dont use spring.kafka.consumer.properties.spring.json.value.default.type
, I get No type information in headers and no default type provided
error.
So after reading the documentation, I thought spring.kafka.consumer.properties.spring.json.type.mapping
and spring.kafka.producer.properties.spring.json.type.mapping
could match my requirements, but after setting them as
spring.kafka.producer.properties.spring.json.type.mapping=test:producerpackage.producerobject
spring.kafka.consumer.properties.spring.json.type.mapping=test:consumerpackage.consumerobject
I'm still getting the same error.
Maybe json type mapping property is not the one suitable for this purpose, or maybe is not even possible to achieve this.
Hello, I'm using KafkaStream with props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer);
with a DeadLetterPublishingRecoverer
while my application is configured with String key and Avro value.
Once I got a deser error the DLT publication fails with :org.apache.kafka.common.errors.SerializationException: Can't convert key of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in key.serializer
But when I'm using kafka (without stream) its working.
What am I doing wrong ? Should I change the KafkaTemplate for DeadLetterPublishingRecoverer
into a byte[],byte[] ? Will it still work with normal kafka ?
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.json.trusted.packages: '*'
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
@KafkaListener(id = "myId", topics = "topic1")
public void listen(User user) {
System.out.println(user);
}
concurrency: 1
). As soon as the 2nd instance goes up, the rebalancing spikes the CPU with a loop of Attempt to heartbeat failed since group is rebalancing
making the services unusable (3 cpu limit). Is there any particular tip you could give to reduce such load? I can't use static membership because I have kafka 2.1.1. Thank you!
Hi all, I got a technical question about running spring-kafka version 2.2.7 with AWS MSK (Kafka version 2.2.1)
I have a consumer application with two running counts in the container platform. The consumer application uses one consumer group and listens to two topics A and B
After running normally for few weeks without any restart, we have suddenly run into an error indicating the application could not commit the message in topic A.
The commit exception:
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Right after the commit exception, there are two messages showing
1. group coordinator (id rack null) is unavailable or invalid will attempt rediscovery
2. discovered group coordinator
Since we are using SeekToCurrentHandler, the application could still process the next message, just that the processed message could not be committed to Kafka. The same error messages would appear for each message.
During the error period, messages in another topic B could be processed totally fine.
I have then rolling restarted the application runtimes and the problem is resolved.
The application is deployed to three different QA environments and only one of them has faced this issue.
Description:
An attempt was made to call a method that does not exist. The attempt was made from the following location:
org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:598)
The following method did not exist:
org.springframework.kafka.listener.ContainerProperties.setAuthorizationExceptionRetryInterval(Ljava/time/Duration;)V
The method's class, org.springframework.kafka.listener.ContainerProperties, is available from the following locations:
jar:file:/C:/Users/User/.m2/repository/org/springframework/kafka/spring-kafka/2.3.3.RELEASE/spring-kafka-2.3.3.RELEASE.jar!/org/springframework/kafka/listener/ContainerProperties.class
It was loaded from the following location:
file:/C:/Users/User/.m2/repository/org/springframework/kafka/spring-kafka/2.3.3.RELEASE/spring-kafka-2.3.3.RELEASE.jar
I have a requirement to show last sync from Kafka. What i understand is spring for kafka doesn't call the message listener when there are no records available in kafka. But i want spring for kafka to call the message listener even when there are no records in kafka during this poll. Is this possible?
At the end of every poll, I want to update last sync from Kafka value in DB. Please share your thoughts.
Hi team, I am using spring-kafka 2.5.5.
I am looking forward to get the kafka consumer and producer metrics from my spring application. This application is not using annotations but is manually polling records and sending records from and to the topics.
In prometheus, I can see only jvm, tomcat and custom metrics posted but I am missing kafka provided metrics. Would you please point me how to enable these metrics? the kafka consumer is created using
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
protected KafkaConsumer<byte[], Event> getConsumer(List<String> inputTopics) {
KafkaConsumer<byte[], Event> consumer = new KafkaConsumer<>(props));
consumer.subscribe(inputTopics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("PARTITIONS revoked: " + partitions);
consumer.commitAsync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("PARTITIONS assigned: " + partitions);
}
});
return consumer;
}
application properties has jmx enabled
spring.jmx.enabled=true
management.endpoints.web.base-path=/
management.endpoint.metrics.enabled=true
management.endpoints.web.exposure.include=prometheus
management.endpoint.prometheus.enabled=true
management.metrics.export.prometheus.enabled=true
Am I missing any configuration to expose these metrics? suggestions please?
I have changed my implementation to use spring-kafka and still see the same issue of kafka metrics not being exposed
@EnableKafka
@Configuration
class KafkaConsumerConfig {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Autowired
private GlobalConfig config;
@Bean
public ConsumerFactory<byte[], RheosEvent> salesTopicConsumerFactory() {
Map<String, Object> props = commonConfig();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "XXX");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName() + ".zzz");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean(name="salesKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], RheosEvent>> salesKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<byte[], RheosEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.setConsumerFactory(salesTopicConsumerFactory());
factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
@Bean
public ConsumerFactory<byte[], RheosEvent> behaviorTopicConsumerFactory() {
Map<String, Object> props = commonConfig();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "YYY");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean(name="behaviorKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], RheosEvent>> behaviorKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<byte[], RheosEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.setConsumerFactory(behaviorTopicConsumerFactory());
factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
private Map<String, Object> commonConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
return props;
}
static class CustomRebalanceListener implements ConsumerAwareRebalanceListener{
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
logger.info("PARTITIONS revoked: " + partitions);
consumer.commitAsync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("PARTITIONS assigned: " + partitions);
}
}
}
Please suggest if I am missing any configuration
I am currently playing with 2.7.0 version because I can use pausingPartitions on Containers, yet I cannot get BackOff interval to work ...factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(10000L, 3L)));
Before on 2.6.7. version no problem - 10s wait between new retries, but in 2.7.0 there is no wait at all.
I saw there is some change because of not-delayed Containers stopping, but I didn't see any way, what to configure different for that interval to work. Any idea? Thank you
Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule
when attempting to produce the message. When I remove the "parallel" part of the stream it works fine and I am able to produce. When I initialize a KafkaProducer myself without spring-kafka or spring-cloud-stream's wrappers, it works using the parallel streams.