Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Gary Russell
    Regardless; this discussion should be moved to the spring-cloud-stream room.
    Pietro Galassi
    (btw where i can find Spring 2.1.0 is no more supported?)
    Pietro Galassi
    Mariusz Wyszomierski

    Hi everybody, i have a few questions.
    Kafka listener in 95% of cases ends processing in 15 minutes.
    Average time that listener needs to finish processing is about 5 minutes.
    I set max.poll.interval.ms to 15 minutes and max.poll.records to 1.
    Consumer is transactional.
    I use ConcurrentMessageListenerContainer.
    Spring kafka is 2.3.11.

    But there is 5% cases that are longer than 15 minutes, and it causes rebalancing.
    I want to sure that my solution is right.

    First i tried to use AcknowledgingMessageListener

                topics = "topic",
                concurrency = "6",
                groupId = "GROUP_ID"
        public void receiveEvent(ConsumerRecord<String, SomeData> data, Acknowledgment ack) {
           // FIRST ack
           // then do some job (it may fails and shouldn't be repeat).

    But it doesn't work. Offset wasn't commited after run acknowledge().
    After rebalancing the same record was processed by another node.

    a) As i understand commit is done after listener ends processing?

    Then i use ConsumerAwareMessageListener

                topics = "topic",
                concurrency = "6",
                groupId = "GROUP_ID"
        public void receiveEvent(ConsumerRecord<String, SomeData> data, Consumer<String, SomeData> consumer) {
           // FIRST commit
           // then do some job (it may fails and shouldn't be repeat).

    Above example works, and after rebalancing.

    b) Is there another approach to commit offset in the begining to avoid consume the same record after rabalancing?
    c) In this approach i shouldn't skip any records, because i commit only offset from last pool, and max.poll.records is set to 1.
    But if i have max.poll.records more then 1 i could skip some records?

    4 replies
    Pietro Galassi
    Is there a way to send message in batch using KafkaTemplate ? I need to send 20k messages all together.
    24 replies
    Tomas Alabes
    Hi, I'd like to validate a setup that's giving me a headache (using kafka 2.6.1). All flows below should be transactional.
    I have many @KafkaListeners using a MyListenerContainerFactory, created with a ChainedTM, which has a KafkaTM created with MyListenersProducerFactory. This PF has a static txn id. As part of this consumer flow, I want to produce events, all in 1 txn.
    Also, I want to produce events from a browser request (produce-only flow), KT's created by ProducerOnlyFactorys, each with unique txn id's.
    • Using MyListenersProducerFactory to create the KafkaTemplates for the listeners' producers, I'm getting Group X has received offset commits from consumers as well as transactional producers. Mixing both types of offset commits will generally result in surprises and should be avoided. Should these templates be non-transactional? What's the right way to create those producers?
    • Can I avoid thinking about "am I in a consumer flow or producer-only flow?" to know what type of KT to use? As it can be error-prone to devs (ideally all KTs being created the same way by the same transactional PF)
    • If I'm not wrong, with the latest kafka you can set unique txn ids to all producers and simplify it a bit.
    8 replies
    Tomas Alabes
    Have you ever faced this exception in brokers when producing events? java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read from segment FileRecords(size=1966, file=/var/lib/kafka/__consumer_offsets-0/00000000000000000000.log, start=0, end=2147483647) I've been trying everything to debug this but I'm not being able to pin what's causing it (I had to update this kafka broker server properties: https://gist.github.com/tomasAlabes/0bd3e03546e399db6c6e6b8d4a78686b)
    1 reply

    Hi guys, I think I'm probably missing some theory regarding produce/consume json.
    What I'm trying to achieve is

    • from producer microservice, send json object
    • from consumer microservice, consume the json objects sent by any producer microservice.
    • set the configuration using application properties.

    Point is, I want for my consumer microservice, to consume multiple json objects from multiple producers, so the property spring.kafka.consumer.properties.spring.json.value.default.type I thinks is not enough for this case.

    Now, problem is, if I dont use spring.kafka.consumer.properties.spring.json.value.default.type, I get No type information in headers and no default type provided error.

    So after reading the documentation, I thought spring.kafka.consumer.properties.spring.json.type.mapping and spring.kafka.producer.properties.spring.json.type.mapping could match my requirements, but after setting them as


    I'm still getting the same error.

    Maybe json type mapping property is not the one suitable for this purpose, or maybe is not even possible to achieve this.

    19 replies
    Hi, Is there any example on how to change multi-language on every HTTP request by setting the header a special language type, using feign interceptor to intercept every HTTP requesting?
    3 replies
    Hey all,
    Is there any way to define different backoffs according to the exception type in error handler in listener factory ?
    I managed to get close to a solution using retry template, but it competes with error handler.
    4 replies

    Hello, I'm using KafkaStream with props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer); with a DeadLetterPublishingRecoverer while my application is configured with String key and Avro value.

    Once I got a deser error the DLT publication fails with :
    org.apache.kafka.common.errors.SerializationException: Can't convert key of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in key.serializer

    But when I'm using kafka (without stream) its working.

    What am I doing wrong ? Should I change the KafkaTemplate for DeadLetterPublishingRecoverer into a byte[],byte[] ? Will it still work with normal kafka ?

    10 replies
    Sean Farrow
    Hi all, ARe there any examples of using the embedded broker from spring-kafka-test with scalatest?
    1 reply
    Hi guys. Any solution to delay a consumer? I just want to publish an event, and it will be consumed in 15 mins after published
    1 reply
    Ashish Meshram
    Hi All, I have two separate application. One Producer and One Consumer. Producer is sending message with value as a custom User object. I am trying to consume that user object at consumer application but I am getting .MessageConversionException: failed to resolve class name. Class not found
    These are two different application, each with User object but in different packages.
          bootstrap-servers: localhost:9092
          group-id: group_id
          auto-offset-reset: earliest
          key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.json.trusted.packages: '*'
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
    @KafkaListener(id = "myId", topics = "topic1")
        public void listen(User user) {
    I am using latest version of spring boot and kafka
    Ashish Meshram
    It works if same application is producing and consuming messages
    1 reply
    Yes in same application we can have producer and consumer
    Sean Farrow
    Hi, is there a way of preventing a spring up from starting if a Kafka topic does not exist?
    1 reply
    Aleh Danilovich
    Hi, Is it possible to start partition reassigment when we use cooperative-sticky protocol. I have 1 topic (100 partitions) and 100 consumers in general (5 k8s pods with 20 consumer threads with the same group.id). Consumers are devided evenly in general. But in situation then i want scale my app up to 10 pods for example, all new consumers still stay unassigned to any partition. Protocol = sticky. Is it possible to use part of consumers from all new pods ? For example 10 pods - 10 consumer threads per pod ?
    1 reply
    Tomas Alabes
    One perf question, I have 2 instances of my service, each with 100 consumers consuming around 30 topics with 128 partitions each (concurrency: 1). As soon as the 2nd instance goes up, the rebalancing spikes the CPU with a loop of Attempt to heartbeat failed since group is rebalancing making the services unusable (3 cpu limit). Is there any particular tip you could give to reduce such load? I can't use static membership because I have kafka 2.1.1. Thank you!
    4 replies
    Hushen Savani
    Hello, I'm using Kafka listeners in Spring boot project. I want to upgrade Kafka dependencies in Spring boot... pls share some pointers which dependencies to change?
    I want to upgrade to 2.6.6 in order to use RecordInterceptor
    1 reply
    Laurensius Bernhard
    Hi not a really technical question, im consuming a betting transaction (a very high throughput) using spring kafka, alot of optimisation is ongoing to support this since currently its cant handle 100msg/second or about 40kbps fast enough, its constantly lagging behind. before im optimising my big transactional process, im trying to implement batchlistener. is it going to help?
    2 replies
    I am new to using spring-kafka, So please excuse me if this is a basic questions. To Consume messages, we specify a use the @KafkaListener annotation with topics and a group id specified. FOr e.g @KafkaListener( topics = "topic1", group-id="group-1") This works fine. But my requirement is to get the list of topics at Run time, based on some business logic. Is there a way this can be done ? For eg want something like this List<String> topics = getTopics() ; for(String topic: topics) { // start consuming messages for each topic }.Thanks for any help.
    Yuen Fung Lee

    Hi all, I got a technical question about running spring-kafka version 2.2.7 with AWS MSK (Kafka version 2.2.1)

    I have a consumer application with two running counts in the container platform. The consumer application uses one consumer group and listens to two topics A and B

    After running normally for few weeks without any restart, we have suddenly run into an error indicating the application could not commit the message in topic A.
    The commit exception:

    Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

    Right after the commit exception, there are two messages showing

    1. group coordinator (id rack null) is unavailable or invalid will attempt rediscovery
    2. discovered group coordinator

    Since we are using SeekToCurrentHandler, the application could still process the next message, just that the processed message could not be committed to Kafka. The same error messages would appear for each message.

    During the error period, messages in another topic B could be processed totally fine.

    I have then rolling restarted the application runtimes and the problem is resolved.

    The application is deployed to three different QA environments and only one of them has faced this issue.

    2 replies
    Currently in consumer interceptor we save some metrics info like when was the last record fetched from kafka, etc. We have a requirement to save kafka consumer metrics as well. I know we have KafkaConsumer's metrics method available to get the metrics info. But in interceptor, we don't have access to the consumer object. Please guide me on how to get access to consumer object in consumer interceptor?
    6 replies
    Deepak Emani
    Hi, Can you please point to any example done for Spring cloud contract with avro message? Looks like there needs more set up than a simple kafka container.
    1 reply
    Hi guys, I keep running into this issue Connection to node -1 (localhost/ could not be established. Broker may not be available. This is even after making sure the configs on kafka are fine
    Any idea what I'm missing...?
    nvm, I've worked it out
    I am trying to implement the spring cloud config server with auto refresh properties with kafka bus...i am getting following error while i am using <dependency>


    An attempt was made to call a method that does not exist. The attempt was made from the following location:


    The following method did not exist:


    The method's class, org.springframework.kafka.listener.ContainerProperties, is available from the following locations:


    It was loaded from the following location:

    Gary Russell
    Your spring-kafka version is too old; that property was added in 2.3.5. You should let Spring Boot's dependency management bring in the right versions for all the dependencies.
    Yassine Gharbaoui
    Hello everyone when, is there a way to get the assigned partitions within a consumer?
    1 reply
    @garyrussell , I am facing a issue in kafka that where all messages were replayed after consumer application restart. my configuration can be found in the below link. Could you please guide me https://stackoverflow.com/questions/66903764/kafka-messages-from-topic-are-being-replayed-after-consumer-restart
    2 replies
    I have a requirement to show last sync from Kafka. What i understand is spring for kafka doesn't call the message listener when there are no records. But i want spring for kafka to call the message listener even when there are no records. Is this possible?
    @garyrussell not sure if this is right channel to ask this question, but could't get much on web. Is there any way in spring kafka or integration to achieve delay queue behaviour in kafka.
    4 replies

    Hi All,

    I have a question about the kafka consumer reblanace protocol. :)

    If a new consumer joins, In the first poll request JoinGroupRequest Will be sent,
    and will the Rebalance protocol proceed immediately?


    I have a requirement to show last sync from Kafka. What i understand is spring for kafka doesn't call the message listener when there are no records available in kafka. But i want spring for kafka to call the message listener even when there are no records in kafka during this poll. Is this possible?

    At the end of every poll, I want to update last sync from Kafka value in DB. Please share your thoughts.

    5 replies
    Quick question: We use kafka to move data from MDM system to other system, we have separate seed topics for refreshing data from higher environments. I have made the consumer factory a conditional bean but I am not sure how to make the kafkalistener conditional as well
    The internet says to set autoStartup but I don't see much documentation about it
    Hoping someone here can help me
    1 reply
    what are best practices to handle exceptions related to kafka?
    1 reply
    Spencer Goldberg
    What is the right set of versions to use for spring-kafka, springboot, and kafka-streams? Getting ClassNotFound exceptions when trying to use spring-kafka 2.6.7, kafka streams 2.6.1, spring boot starter 2.4.4
    12 replies
    Dumitru Nicolae Marasoiu
    Hi, using boot 2.4.4, spring-kafka 2.6.7. In "production", the @KafkaListener gets called with proper CustomEvent Payload, but in component tests, it fails with Cannot convert from [org.apache.avro.generic.GenericData$Record] to CustomEvent...it is even stranger, in the sense that component tests pass on mac but not in docker but when trying to debug always hit Cannot convert from [org.apache.avro.generic.GenericData$Record] to..CustomEvent..where should this conversion happen? thank you
    2 replies