Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Kannadasan S
    How to ensure Spring Cloud Stream Listener to wait to process messages until SpringBoot Application is fully initialized on Start?
    1 reply
    Mateusz Fronczek
    Hey. I'm currently looking into setting up a new Kafka transaction from within the thread started by @StreamListener. I do have a ChainedKafkaTransactionManager in the application context. At first, I tried to utilize @Transactional(propagation = Propagation.REQUIRES_NEW, transactionManager = "chainedKafkaTransactionManager") but to no avail - it failed with "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION". Therefore, I attempted to use kafkaTemplate.executeInTransaction(), however, it seems that it is rolled back together with the outer transaction. How should this be configured in order to achieve separation between two transactions, where the inner one is created from within consumer governed tx? I also tried to set new transactionIdSuffix on KafkaTemplate, still no success. Any help would be much appreciated, thanks!
    8 replies
    Spencer Goldberg
    What's the appropriate defensive programming for the issues detected in the NetworkClient related to missing listeners or metadata (e.g. UNKNOWN_TOPIC_OR_PARTITION)? My producer has callbacks registered on the kafkaTemplate send() but I don't see them being called (based on logs) at the same time as the NetworkClient warnings in the logs.
    Spencer Goldberg
    Also if I choose not to implement an onSuccess and onFailure callback, is successful delivery guaranteed when send().get() finally returns? Or do you truly need to have an onFailure callback to detect when the send truly fails.?
    6 replies
    I have a spring kafka app using @KafkaListener annotation using a ConcurrentKafkaListenerContainer. I have to use ThreadLocal for state, and am trying to clean up state when receiving the ListenerContainerIdle event. I read in the documentation that "By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective.". Does this mean that I can access ThreadLocal when subscribed to the ListenerContainerIdle event alongside the @KafkaListener?
    5 replies
    Pietro Galassi
    Is there a way to add a shutdown hook so that current event in elaboration in a listener is completed BEFORE pod is shutdown ? (Even in a kill pod scenario for OOM)
    1 reply
    Pietro Galassi
    Does producer retry take care also of disconnects ?
    2 replies
    Akshansh Jain
    Do idle consumers account for CPU utilization of application? For ex. I have a topic with 2 partitions and have 100 consumers running in my application for this topic, then the CPU utilization of my app would be higher than if I only had 2 consumers running. (I am creating consumers using .setConcurrency() method of ConcurrentKafkaListenerContainerFactory). Spring version - 1.5.6.RELEASE, spring-kafka version - 1.2.2.RELEASE
    19 replies
    Does Spring Kafka uses Log4j2 for logging?Reason for asking is due to this security bug n log4j2
    10 replies
    Pietro Galassi
    Is there a way during a rebalance to stop the execution the actual event beeing processed by a poll so that this event whould be re elaborated by a different consumer after the rebalance ?
    1 reply
    Have this issue when start multiple instances of application. When i have only one instance it's does not reproduces:
    Exception in thread "ForkJoinPool.common-worker-1 org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:743)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:584)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:544)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:519)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:513)
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:683)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:569)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:386)
    at com.nab.ms.hs.lodgement.producer.HardshipCaseSubmitEventProducer.publishHardshipCaseSubmitEvent(HardshipCaseSubmitEventProducer.java:47)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.lambda$publishHardshipCaseSubmitEvent$0(CreateHardshipRequestService.java:108)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.processAccountRequest(CreateHardshipRequestService.java:103)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.processNewHardshipRequest(CreateHardshipRequestService.java:75)
    at com.nab.ms.hs.lodgement.application.HardshipNewRequestService.lambda$processNewHardshipRequest$0(HardshipNewRequestService.java:46)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1728)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
    Caused by: java.lang.ExceptionInInitializerError: null
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:368)
    ... 22 common frames omitted
    Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.subject.TopicNameStrategy for configuration key.subject.name.strategy: Class io.confluent.kafka.serializers.subject.TopicNameStrategy could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:729)
    1 reply
    Hello everyone, and a happy new year :-)I have implemented "non blocking retries" in my Kafka consumers. However I am trying to find a way to reprocess all failed messages delivered to the DLT, based on the error message reported in the headers of the messages posted to the DLT. The current DLT handler handles one message at a time when they are posted to the DTL, but I am trying to develop a reprocess mechanism that will enable me to do a mass reprocess of messages that had failed for a specific reason, once the failure reason had was fixed.
    3 replies
    Andras Hatvani
    Hi, is there a way to retrieve and inject the default value serde in a kafka streams app/test?
    2 replies
    Bastien Bouclet
    Hi, when using a batch listener, is there a way to partially positively acknowledge a batch (synchronously commit the offsets up to a point in the batch, and then continue processing the records until finally acknowledging the whole batch) ?
    8 replies
    Hi. How to modify the existing kafka record header value in producer and consumer records. I don't want to duplicate the key instead I want to override the existing key value.
    2 replies
    Josh Wein
    I haven't been able to find any docs on this - is it possible to use a Java 17 Record as a Kafka message? I'm able to produce a message that is a record, but deserialization using 'org.apache.kafka.common.serialization.StringDeserializer' and 'org.springframework.kafka.support.converter.StringJsonMessageConverter' for the batch listener, seems to fail. From my understanding Records are not exactly like Objects when it comes to serialization/deserialization so that may be why it doesn't work out of the box
    2 replies

    Approaches to handle 10 million records of 10 tables each with several joins

    I have 10 million records of 10 tables each with several joins. I am looking for the best alternative or DB assign or approach to read the records very quickly that is the query should be fast.

    option#1 - normalize the tables, don't go for joins unnecessarily
    option#2 - add all the columns in 1st query where multiple times the
    where conditions will be used in the looping construct
    option#3 - go for nosql database instead of mysql

    Please advise


    1 reply

    The DeadLetterPublishingRecoverer use a template resolver to get a valid KafkaTemplate to publish accepted messages.
    But according to this

    The template is only based on the value class.

    My messages being <String, String> I created a template for <String, String> messages.

    But how to handle deserializations error cases ?

    • <String, String> => Key error, Value ok => <byte[], String>
    • <String, String> => Key ok, Value error => <String, byte[]>
    • <String, String> => Key error, Value error => <byte[], byte[]>


    12 replies

    I use the stateful retry mechanism so I can retry processing messages a few time with a fixed delay between each attempts and it's not working

        public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                KafkaTemplate<String, Object> template) {
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            configurer.configure(factory, kafkaConsumerFactory);
            factory.setErrorHandler(new SeekToCurrentErrorHandler(
                    new DeadLetterPublishingRecoverer(template), 3));
            return factory;
        private RetryTemplate retryTemplate() {
            RetryTemplate template = new RetryTemplate();
            FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
            SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(MAX_ATTEMPTS,
                    Collections.singletonMap(RetryableException.class, true),
            return template;

    I'm throwing a RetryableException in the KafkaListener .
    Did I miss something ? Thanks

    11 replies
    i am using spring-boot, spring-kafka, micrometer-prometheus in my project .
    my project is message consumer
    anyone knows how metrics could be exposed for this kafka listener ?
    is there any public grafana dashboard which makes use of these metrics ?
    3 replies
    Tiger Wang (王豫)

    Hello - a best practice we embrace is to fail a bean initialization when connection check to an external resource fails. For a Kafka consumer bean, what's the best way to check for broker availability during its initialization?

    AFAIK there is no straightforward way to catch Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected warning.


    Tiger Wang (王豫)
    All I can think of is, have a timer to check if onPartitionAssigned() is called within 60sec after startup...and if not, then consider something wrong with the broker and terminate the service.
    I just don't feel it's the best approach.

    A better approach is to construct a KafkaAdmin object during bean initialization, and try to do something like list topics and fail the bean initialization if it can't.

    However I am looking to see if there is a best practice at all.

    Any recommendation?
    Tiger Wang (王豫)
    3 replies
    Ebrahim Khosravani

    Hi guys, I stuck with KStream and Java-8. I need your help.
    I was just confused about this method, and I need to create some unit tests with this method. Is there any hint and sample?

    public Function<KStream<String, String>, KStream<String, String>[]> handler() { return transactionStream -> transactionStream .map((key, value) -> extractStatus(key, value)) .branch((key, value) -> true); }

    1 reply
    Pietro Galassi
    Hi all, is there a way to perform some action right before or after "Shutdown executor service" and "Consumer stopped" are logged by a @KafkaListener ?
    3 replies
    Hi , in my project after spring-kafka upgrade from 2.2.3 to 2.7.3 deserializaion started failing
    1 reply
    we are receiving avro message from producer application
    i am also using custom deserializer mentioned here https://gitter.im/spring-projects/spring-kafka?at=5d7535a6ae44a84124a21c3e
    any suggestion about changes in this upgrade which is causing this failure ?
    7 replies
    Zane XiaoYe

    Hi @garyrussell
    It seems that the reactive Kafka consumer stopped once there’s an exception occurred in the processing logic, such as an WebClientResponseException exception.

                    .flatMap(record -> process(record))

    what is the correct way to avoid that?

    9 replies
    Pietro Galassi
    Hi, does @KafkaListener goes in graceful shutdown on sigterm (not consuming other messages) ? Should it be configured ?
    14 replies

    Hello, I'm using RetryingBatchErrorHandler for a batch listener with the FixedBackOff and it seems to work well.
    However when the error handler retries, no retrying log is shown so there's no way to acknowledge for me whether this handler is retrying or not. It only shows the error log that I configured as ConsumerRecordRecoverer only when the retries are exhausted.

    Is there any way to print the every retrying log while the error handler is retrying?

    Below is the part of the container factory configuration

    final var errorHandler = new RetryingBatchErrorHandler(new FixedBackOff(3000, 5), (consumerRecord, e) -> {
        log.error("failed record - {}", consumerRecord);
    3 replies
    @garyrussell Hello Gary, I have seen places where StringSerDeSer is being used where JsonSerDeSer can be used and vice versa for an object let us say User.class. I do not see anywhere this has been specified what to use and when. Please can you help ? Thanks.
    8 replies
    Billy Conner Jr.
    I am fairly new to Kafka. I have a producer sending data to a broker. I can use the console consumer to view the messages sent ok. However, it seems that after a few days those messages are no longer there. I thought data sent to kafka would remain there indefinitely. Is there a configuration setting I have incorrect?
    Hi, I'm curious how spring kafka client resolve the exact broker address?
    Any documentation?
    9 replies
    Miguel González

    Hello guys, I'm using ackMode=MANUAL_IMMEDIATE and receiving on my listener Acknowledgment as a parameter, but I keep seeing this error sometimes on my logs "Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group" full stack trace on:


    on the listener method i'm using acknowledgment.acknowledge() (error is from this line); and I catch a custom exception, I do acknowledgment.nack.

    Any suggestions on how to avoid this

    9 replies
    Miguel González
    hey guys if I use manual ack should I remove the use of a SeekToCurrentErrorHandler in my consumer config? If something goes would it be retried twice if I have both configurations (i.e acknowledgment.nack and SeekToCurrentErrorHandler)
    1 reply
    Hello! I'm currently using kafka batch consumer and it works fine on local environment. But on the development environment (on AWS), it shows this error log and doesn't work at all but I don't know what this log means...
    Is there anyone who knows something about this?
    2022-03-12 19:14:12.176 ERROR 37634 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
    java.lang.UnsupportedOperationException: Container should never call this
        at org.springframework.kafka.listener.ListenerInvokingBatchErrorHandler.handle(ListenerInvokingBatchErrorHandler.java:36) ~[spring-kafka-2.6.7.jar!/:2.6.7]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1431) ~[spring-kafka-2.6.7.jar!/:2.6.7]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar!/:2.6.7]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:an]
    7 replies
    Tiger Wang (王豫)
    Hello all - Does anyone has any update regarding Reactive Kafka support in Spring? I don't find any reference doc so I am assuming it's still in premature state, but I'd like to confirm. Thanks!
    4 replies
    Oleksii Suprun

    Hi All! Does anyone know how to configure spring.json.trusted.packages for Kafka Streams? I'm using the following config:

    public class KafkaStreamsConfig {
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfig(KafkaProperties kafkaProperties) {
            return new KafkaStreamsConfiguration(Map.of(
                    StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getStreams().getApplicationId(),
                    StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getStreams().getBootstrapServers(),
                    StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
                    StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
                    StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()

    My application.yml is the following:

          bootstrap-servers: XXX.XXX.XXX.XXX:9092
          application-id: spring-kafka-test
            processing.guarantee: exactly_once
            spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*

    But when I change the package of the entity published into Kafka, I'm receiving the following exception:

    Caused by: java.lang.IllegalArgumentException: The class 'com.mypackage.streams.entity.kafka.raw.Entity' is not in the trusted packages: [java.util, java.lang, com.mypackage.streams.entity.kafka, com.mypackage.streams.entity.kafka.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
        at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
        at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
        at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        ... 9 more

    Looks like the property added to the appplication.yml does not work.

    2 replies
    Hello, is Authentication/Authorization Exception and no authExceptionRetryInterval set trigerring Fatal consumer exception; stopping container considered a normal termination as for isInExpectedState() ?
    1 reply
    Diogo Almeida

    Hello everyone,
    We have made some refactorings regarding the Kafka client configurations in the sub-modules (Gradle) of our project and we started to suffer with IllegalStateException("Expected 1 but got 0 partitions") during the integration tests.
    Inside a @BeforeAll function, we expect containers to be properly assigned to topics and call ContainerTestUtils.waitForAssignment() for that. However, we are getting the exception mentioned above and initially, there has been no change in our clients' settings.
    We use spring-boot:2.6.5 (with spring-kafka:2.8.4) and our test broker runs on docker.

    Any suggestion in this regard? The only way we found to work around the situation was by adding a Thread.sleep(10_000) (which is not an option, of course) in @BeforeAll or manually defining the Topic and Partition in @KafkaListeners.

    4 replies
    Laurensius Bernhard
    Hi, im using confluent kafka and spring kafka. My question is how active connection is counted? If a single app consuming 2 topic with total 5 partition with concurrency value 2, and producing to 1 topic with 3 partition, how many connection i will create? Because i feel like there are too many connections are showing in the metric, probably around 10-15 connections for that single app.
    1 reply