Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    I am using KafkaListener ammotation in my consumer API.Is it possible to change concurreny and add topics withouts restarting the app?
    3 replies
    One more question is there any major difference 2.7.x
    and 2.6.x.Due to some reason I need to use spring boot 2.3.x.And when i try to use 2.3.3.RELEASE spring boot ,i dont get the methods to pause and resume kafka consumer
    1 reply
    Can someone help me with latest working microservices demo or sample application to deploy in AWS and test? Please kindly help me..Need to learn it quickly as possible..
    I am not sure if its related to Kafka.
    Our Spring Kafka consumer is deployed on PCF with multiple instances (2 data center) .Is it possible to capture from which ipadress/data center the Kafka message was read ?Any kind of hook ?
    3 replies
    Adis Bajric
    How do i get instance of running consumer . ConsumerFactories only provide createConsumer , addListener and simillar?
    3 replies
    Hi, Is there any sample code availble to avoid @DirtiesContext to clean up springboot context in springtest while using @EmbeddedKafka ? Since we have lot of integration test using embeddedkafka and clean the context for everytestcase is increasing the buildtime.
    2 replies
    Pietro Galassi
    If a rebalance happens offeset are automatically committed ? Both in BATCH and RECORD AckMode ? Either if an event is in elaboration and a rebalance occurs its offset is committed by rebalance listener or you need to extend it in order to do the commit ?
    40 replies
    Aleksey Vasin

    Hello @garyrussell, please help us with our problem...

    The goal we want to achieve is to receive all messages from each topic's partitions without using the consumer group, since the application can scale to several instances and each instance must read all messages from all partitions. At the same time, we have another restriction: while application is running, in the case a new topic's partition has been created, application(without restart) should receive messages from this new partition.

    1. How to create a consumer via @KafkaListener without a consumer group, so that the consumer will assign to all topic's partitions?
    2. How to make it so, that a consumer which was created without a consumer group automatically detects a newly created partition in the topic?
    7 replies
    I have Spring Kafka Consumer running in PROD with multiple instances. I read the data in listener thread, then do the processing in background thread. How can I shutdown the stop the app gracefully i.e. when all processing is done in background thread and listener thread. I have exposed 2 end points to
    1) resume the Spring Kafka Container
    2)pause the Spring Kafka Container
    I can pause the listener thread by calling PAUSE rest end point.Is there way to monitor background thread
    10 replies
    Zane XiaoYe

    Hi @garyrussell
    I’m trying the reactor-kafka, with sendTransactionally.

    public Mono<Void> send(Flux<ProducerRecord<String, String>> records) {
        return sender.sendTransactionally(Flux.just(records))
                     .doOnNext(r -> log.info("Producer#send", r.recordMetadata().toString())).then();

    this send was called inside a flatMap, but I got the Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION a lot. What could be the problem? My transactionId is unique across nodes.

    1 reply
    Miguel González
    hello guys, I'm building a kafka streams app, but the issue I have... one of my topics has two event types in the same topic. Is there a way to use a DelegatingDeserializer/DelegatingSerializer as SERDE for a KStream? Otherwise I think it would try to map the array of bytes to the incorrect event type. The idea is that I want to filter out one of the event types and work with the other one. I was thinking on receiving the array of bytes and then transform to protobuf but then I wouldn't be able to use TimestampExtractors.
    6 replies
    Petromir Dzhunev
    Hello guys. Thanks for the great library. I have a task to develop a multi-tenant application, which translates to listening for the same type of event from different Kafka clusters. Is there a way to configure DefaultKafkaConsumerFactory or ConcurrentKafkaListenerContainerFactory to connect to 2 or more Kafka clusters, then bind it to a single @KafkaListenerwhich will process all the events?
    3 replies
    Zane XiaoYe
    How can I specify a custom partitioner in reactor-kafka’s KafkaSender?
    3 replies
    Zane XiaoYe
    How can I increase the processing concurrency of reactor-kafka consumer? suppose I have the following consumer code:
                    .doOnNext(record -> {                   
                        processMessage(record.value()).doOnSuccess(__ -> {
    7 replies
    Miguel González

    hey team, can you confirm when using KafkaTemplate's method send(topic, key, value) it will calculate the partition based on the message key?

    if you go to https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java#L406
    it basically creates a ProducerRecord, like this: https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java#L133
    which seems to send null as partition

    I found this question https://stackoverflow.com/questions/45556142/spring-kafka-partitioning seems to confirm it, but where does it happen?

    5 replies
    Pietro Galassi
    Is it possible to graceful shutdown kafka consumer ? On kubernates when a pod is cancelled/removed it seems that the actual event that his elaborating is not finished and therefore not commited. This let the event moved to another pod and therefore if first event was at half of the elaboration some data on database may be already written. I know that kafka event "should" be equipotent. How to handle this scenario ? It it possible to wait the consumer to finish it's elaboration before shutdown ?
    8 replies

    When i set ReceiverOptions.commintInterval in my java configuration, i don't see changes in consumer configuration. There only one property - auto.commit.interaval.ms.

    Does ReceiverOptions.commintInterval(Duration d) in reactor-kafka overrides kafka property auto.commit.interval.ms?
    When i use reactor-kafka can i set commit interval via kafka properties or i should set it exactly in ReceiverOptions configuration?

    2 replies
    Can i use reactor-kafka reciever with Spring Webflux for processing received records concurrently by webflux rest calls?
    If i use concatMap for process records via webflux, then in process only in one reactor-netty thread. When i use flat map - there are records processed concurrently by multiple netty threads, but there is no commit order. Should i use only concatMap for processing record via WebFlux?
    1 reply
    Pietro Galassi
    How to avoid logging "FETCH_SESSION_ID_NOT_FOUND" ? We using Spring Boot 2.4.3
    5 replies
    I have a Kafka consumer reading from topic-1 .While reading the message if json message is invalid (And not for other reasons ) then i need to send that message to DLQ topic dlq-topic-1.Is it possible
    2 replies
    Hello, does ConcurrentMessageListenerContainer.isRunning() shall return true if its child are not running ? Actually its returning true even if ConcurrentMessageListenerContainerisChildRunning() is false
    15 replies
    Laurensius Bernhard
    Hi guys, im have a question.
    i have an existing ConcurrentKafkaListenerContainerFactory called kafkaListenerContainerFactory and i want to add new listener with batch listener turned on, so i created another ConcurrentKafkaListenerContainerFactory called kafkaBatchListenerContainerFactory.
    after that i updated my existing @KafkaListener to use the new batch listener containerFactory, the topic still the same.
    the problem is, the messages are consumed from the beginning. i just want to continue from the recent offset just like before. what did i do wrong here?
    in case its related, my properties are spring.kafka.consumer.group-id: test-group and auto-offset-reset: earliest
    7 replies
    The kafka producer sends data to the consumer, and the data must store a copy of the data on the broker's disk as it passes through all brokers? Can it be forwarded directly to the consumer over the network without storing to disk?
    5 replies
    Hi All, I am trying to figure out the best way to implement "at least once" processing with spring kafka. The "processing" that happens in the @KafkaListener does some work and then produces a message to a different kafka topic. Is the only way to enforce at least once semantics to use the Batch Kafka Listener, and flush the producer - then manually commit the offsets?
    12 replies
    Ebrahim Khosravani

    Guys, I need to make connection with ssl to kafka, that is I should prepare the kafka configuration server side as well.
    I created the keystore.jks + truststore.jks which I read from zone : https://dzone.com/articles/kafka-ssl-client-authentication-in-multi-tenancy-a

    keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert 
    keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial 
    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
    keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
    keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
    keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed

    And also I change the config/service.properties like this:


    Then kafka is going up without any issue.
    but when ever I tried with client which is spring configured spring starter. I received the handshakes failed error form both side. kafka.log and client .
    Also this is my config from spring side:

          - broker:9093
          key-password: eg12345
          key-store-location: classpath:/jks/kafka.client.keystore.jks
          key-store-password: eg12345
          key-store-type: JKS
          trust-store-location: classpath:/jks/kafka.client.truststore.jks
          trust-store-password: eg12345
          trust-store-type: JKS
          protocol: SSL
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group-id: test-consumer-group
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer

    Error Receive from client side:
    [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Bootstrap broker broker:9093 (id: -1 rack: null) disconnected
    Error Receive from kafka side:
    [SocketServer brokerId=0] Failed authentication with / (SSL handshake failed) (org.apache.kafka.common.network.Selector)

    2 replies
    Miguel González
    is it possible to pause/restart a kafka streams app? I have only found this discussion https://groups.google.com/g/confluent-platform/c/Nyj3eN-3ZlQ/m/lMH-bFx-AAAJ about using map to call an external service and loop until some condition completes
    9 replies
    Miguel González

    So I'm trying out the Cloud Stream Kafka Binder:

    I have a question regarding the functional programming model. My topology has two input topics and one output topic, so I'm using a BiFunction. In my case I need to do some other processing with the input Kstreams that I receive, like filtering and repartition the streams. So can I create two other Functions that process those input streams and then have the bifunction use them as parameters with Function Composition. or should I do all that processing in the BiFunction?

    2 replies
    Pietro Galassi
    How i can handle exception producer (as an example org.apache.kafka.common.errors.TimeoutException, org.apache.kafka.common.errors.NetworkException) ?
    16 replies
    Pietro Galassi
    In a MANUAL_IMMEDIATE scenario how i can create a rebalance listener that commits what i have already pull ?
    11 replies
    Pedro Silva
    Hello. I know that this change (spring-projects/spring-kafka#1615) made it possible to set a max age for KafkaProducers when they are created by the DefaultKafkaProducerFactory. Is it possible to achieve the same thing but for the producers created by KafkaStreams/StreamsBuilderFactoryBean ? Thanks
    2 replies
    Kannadasan S
    How to ensure Spring Cloud Stream Listener to wait to process messages until SpringBoot Application is fully initialized on Start?
    1 reply
    Mateusz Fronczek
    Hey. I'm currently looking into setting up a new Kafka transaction from within the thread started by @StreamListener. I do have a ChainedKafkaTransactionManager in the application context. At first, I tried to utilize @Transactional(propagation = Propagation.REQUIRES_NEW, transactionManager = "chainedKafkaTransactionManager") but to no avail - it failed with "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION". Therefore, I attempted to use kafkaTemplate.executeInTransaction(), however, it seems that it is rolled back together with the outer transaction. How should this be configured in order to achieve separation between two transactions, where the inner one is created from within consumer governed tx? I also tried to set new transactionIdSuffix on KafkaTemplate, still no success. Any help would be much appreciated, thanks!
    8 replies
    Spencer Goldberg
    What's the appropriate defensive programming for the issues detected in the NetworkClient related to missing listeners or metadata (e.g. UNKNOWN_TOPIC_OR_PARTITION)? My producer has callbacks registered on the kafkaTemplate send() but I don't see them being called (based on logs) at the same time as the NetworkClient warnings in the logs.
    Spencer Goldberg
    Also if I choose not to implement an onSuccess and onFailure callback, is successful delivery guaranteed when send().get() finally returns? Or do you truly need to have an onFailure callback to detect when the send truly fails.?
    6 replies
    I have a spring kafka app using @KafkaListener annotation using a ConcurrentKafkaListenerContainer. I have to use ThreadLocal for state, and am trying to clean up state when receiving the ListenerContainerIdle event. I read in the documentation that "By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective.". Does this mean that I can access ThreadLocal when subscribed to the ListenerContainerIdle event alongside the @KafkaListener?
    5 replies
    Pietro Galassi
    Is there a way to add a shutdown hook so that current event in elaboration in a listener is completed BEFORE pod is shutdown ? (Even in a kill pod scenario for OOM)
    1 reply
    Pietro Galassi
    Does producer retry take care also of disconnects ?
    2 replies
    Akshansh Jain
    Do idle consumers account for CPU utilization of application? For ex. I have a topic with 2 partitions and have 100 consumers running in my application for this topic, then the CPU utilization of my app would be higher than if I only had 2 consumers running. (I am creating consumers using .setConcurrency() method of ConcurrentKafkaListenerContainerFactory). Spring version - 1.5.6.RELEASE, spring-kafka version - 1.2.2.RELEASE
    19 replies
    Does Spring Kafka uses Log4j2 for logging?Reason for asking is due to this security bug n log4j2
    10 replies
    Pietro Galassi
    Is there a way during a rebalance to stop the execution the actual event beeing processed by a poll so that this event whould be re elaborated by a different consumer after the rebalance ?
    1 reply
    Have this issue when start multiple instances of application. When i have only one instance it's does not reproduces:
    Exception in thread "ForkJoinPool.common-worker-1 org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:743)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:584)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:544)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:519)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:513)
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:683)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:569)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:386)
    at com.nab.ms.hs.lodgement.producer.HardshipCaseSubmitEventProducer.publishHardshipCaseSubmitEvent(HardshipCaseSubmitEventProducer.java:47)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.lambda$publishHardshipCaseSubmitEvent$0(CreateHardshipRequestService.java:108)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.processAccountRequest(CreateHardshipRequestService.java:103)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.processNewHardshipRequest(CreateHardshipRequestService.java:75)
    at com.nab.ms.hs.lodgement.application.HardshipNewRequestService.lambda$processNewHardshipRequest$0(HardshipNewRequestService.java:46)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1728)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
    Caused by: java.lang.ExceptionInInitializerError: null
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:368)
    ... 22 common frames omitted
    Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.subject.TopicNameStrategy for configuration key.subject.name.strategy: Class io.confluent.kafka.serializers.subject.TopicNameStrategy could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:729)
    1 reply
    Hello everyone, and a happy new year :-)I have implemented "non blocking retries" in my Kafka consumers. However I am trying to find a way to reprocess all failed messages delivered to the DLT, based on the error message reported in the headers of the messages posted to the DLT. The current DLT handler handles one message at a time when they are posted to the DTL, but I am trying to develop a reprocess mechanism that will enable me to do a mass reprocess of messages that had failed for a specific reason, once the failure reason had was fixed.
    3 replies
    Andras Hatvani
    Hi, is there a way to retrieve and inject the default value serde in a kafka streams app/test?
    2 replies
    Bastien Bouclet
    Hi, when using a batch listener, is there a way to partially positively acknowledge a batch (synchronously commit the offsets up to a point in the batch, and then continue processing the records until finally acknowledging the whole batch) ?
    8 replies
    Hi. How to modify the existing kafka record header value in producer and consumer records. I don't want to duplicate the key instead I want to override the existing key value.
    2 replies
    Josh Wein
    I haven't been able to find any docs on this - is it possible to use a Java 17 Record as a Kafka message? I'm able to produce a message that is a record, but deserialization using 'org.apache.kafka.common.serialization.StringDeserializer' and 'org.springframework.kafka.support.converter.StringJsonMessageConverter' for the batch listener, seems to fail. From my understanding Records are not exactly like Objects when it comes to serialization/deserialization so that may be why it doesn't work out of the box
    2 replies

    Approaches to handle 10 million records of 10 tables each with several joins

    I have 10 million records of 10 tables each with several joins. I am looking for the best alternative or DB assign or approach to read the records very quickly that is the query should be fast.

    option#1 - normalize the tables, don't go for joins unnecessarily
    option#2 - add all the columns in 1st query where multiple times the
    where conditions will be used in the looping construct
    option#3 - go for nosql database instead of mysql

    Please advise