spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.json.trusted.packages: '*'
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
@KafkaListener(id = "myId", topics = "topic1")
public void listen(User user) {
System.out.println(user);
}
concurrency: 1
). As soon as the 2nd instance goes up, the rebalancing spikes the CPU with a loop of Attempt to heartbeat failed since group is rebalancing
making the services unusable (3 cpu limit). Is there any particular tip you could give to reduce such load? I can't use static membership because I have kafka 2.1.1. Thank you!
Hi all, I got a technical question about running spring-kafka version 2.2.7 with AWS MSK (Kafka version 2.2.1)
I have a consumer application with two running counts in the container platform. The consumer application uses one consumer group and listens to two topics A and B
After running normally for few weeks without any restart, we have suddenly run into an error indicating the application could not commit the message in topic A.
The commit exception:
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Right after the commit exception, there are two messages showing
1. group coordinator (id rack null) is unavailable or invalid will attempt rediscovery
2. discovered group coordinator
Since we are using SeekToCurrentHandler, the application could still process the next message, just that the processed message could not be committed to Kafka. The same error messages would appear for each message.
During the error period, messages in another topic B could be processed totally fine.
I have then rolling restarted the application runtimes and the problem is resolved.
The application is deployed to three different QA environments and only one of them has faced this issue.
Description:
An attempt was made to call a method that does not exist. The attempt was made from the following location:
org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:598)
The following method did not exist:
org.springframework.kafka.listener.ContainerProperties.setAuthorizationExceptionRetryInterval(Ljava/time/Duration;)V
The method's class, org.springframework.kafka.listener.ContainerProperties, is available from the following locations:
jar:file:/C:/Users/User/.m2/repository/org/springframework/kafka/spring-kafka/2.3.3.RELEASE/spring-kafka-2.3.3.RELEASE.jar!/org/springframework/kafka/listener/ContainerProperties.class
It was loaded from the following location:
file:/C:/Users/User/.m2/repository/org/springframework/kafka/spring-kafka/2.3.3.RELEASE/spring-kafka-2.3.3.RELEASE.jar
I have a requirement to show last sync from Kafka. What i understand is spring for kafka doesn't call the message listener when there are no records available in kafka. But i want spring for kafka to call the message listener even when there are no records in kafka during this poll. Is this possible?
At the end of every poll, I want to update last sync from Kafka value in DB. Please share your thoughts.
Hi team, I am using spring-kafka 2.5.5.
I am looking forward to get the kafka consumer and producer metrics from my spring application. This application is not using annotations but is manually polling records and sending records from and to the topics.
In prometheus, I can see only jvm, tomcat and custom metrics posted but I am missing kafka provided metrics. Would you please point me how to enable these metrics? the kafka consumer is created using
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
protected KafkaConsumer<byte[], Event> getConsumer(List<String> inputTopics) {
KafkaConsumer<byte[], Event> consumer = new KafkaConsumer<>(props));
consumer.subscribe(inputTopics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("PARTITIONS revoked: " + partitions);
consumer.commitAsync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("PARTITIONS assigned: " + partitions);
}
});
return consumer;
}
application properties has jmx enabled
spring.jmx.enabled=true
management.endpoints.web.base-path=/
management.endpoint.metrics.enabled=true
management.endpoints.web.exposure.include=prometheus
management.endpoint.prometheus.enabled=true
management.metrics.export.prometheus.enabled=true
Am I missing any configuration to expose these metrics? suggestions please?
I have changed my implementation to use spring-kafka and still see the same issue of kafka metrics not being exposed
@EnableKafka
@Configuration
class KafkaConsumerConfig {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Autowired
private GlobalConfig config;
@Bean
public ConsumerFactory<byte[], RheosEvent> salesTopicConsumerFactory() {
Map<String, Object> props = commonConfig();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "XXX");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName() + ".zzz");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean(name="salesKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], RheosEvent>> salesKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<byte[], RheosEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.setConsumerFactory(salesTopicConsumerFactory());
factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
@Bean
public ConsumerFactory<byte[], RheosEvent> behaviorTopicConsumerFactory() {
Map<String, Object> props = commonConfig();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "YYY");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean(name="behaviorKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], RheosEvent>> behaviorKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<byte[], RheosEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.setConsumerFactory(behaviorTopicConsumerFactory());
factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
private Map<String, Object> commonConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
return props;
}
static class CustomRebalanceListener implements ConsumerAwareRebalanceListener{
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
logger.info("PARTITIONS revoked: " + partitions);
consumer.commitAsync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("PARTITIONS assigned: " + partitions);
}
}
}
Please suggest if I am missing any configuration
I am currently playing with 2.7.0 version because I can use pausingPartitions on Containers, yet I cannot get BackOff interval to work ...factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(10000L, 3L)));
Before on 2.6.7. version no problem - 10s wait between new retries, but in 2.7.0 there is no wait at all.
I saw there is some change because of not-delayed Containers stopping, but I didn't see any way, what to configure different for that interval to work. Any idea? Thank you
Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule
when attempting to produce the message. When I remove the "parallel" part of the stream it works fine and I am able to produce. When I initialize a KafkaProducer myself without spring-kafka or spring-cloud-stream's wrappers, it works using the parallel streams.
Hi guys, a question regarding handling errors: I'm using a spring boot application without REST layer, so no ControllerAdvice is placed. Now, I'm listening to events with this configuration
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
So I have a messgerconverter
@Bean
public RecordMessageConverter messageConverter() {
return new StringJsonMessageConverter();
}
And this listener
public void executeConsumer(final ConsumerRecord<String, String> cr,
@Payload final POJO payload) {
org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading zeroes not allowed
at [Source: (String)"{"property1":"value", "property2": 005}"; line: 1, column: 51]
at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:128) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:132) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:264) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:74) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1310) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1293) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1253) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1234) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1149) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:959) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:766) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:704) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading zeroes not allowed
at [Source: (String)"{"property1":"value", "property2": 005}"; line: 1, column: 51]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.10.jar:2.9.10]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693) ~[jackson-core-2.9.10.jar:2.9.10]
at com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(
Hi team! If different functional areas of the same application needs to talk to the same Kafka cluster, what's the practice do you follow when comes to the number of KafkaTemplate instance you use? Will you share a single instance or do you tend to have a dedicated instance per feature area within the application?
And if you have to connect your application to two kafka brokers (two separate clusters), is there a clever way to leverage what's provided by autoconfiguration (spring.kafka.*) without having to initialize the the templates with two groups of custom properties?
:point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
@Bean(name = "xxxEventKafkaListenerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> smartAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryUsingTopicName(xxxAlertsTopic));
factory.setBatchListener(true);
factory.setConcurrency(numberOfConcurrentListeners);
factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
log.info("Created xxxKafkaListerFactory !!");
return factory;
}
Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";
Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.
2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-alerts-xxx-consumer-local-1, groupId=ps-alerts-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container