Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Ebrahim Khosravani

    Guys, I need to make connection with ssl to kafka, that is I should prepare the kafka configuration server side as well.
    I created the keystore.jks + truststore.jks which I read from zone : https://dzone.com/articles/kafka-ssl-client-authentication-in-multi-tenancy-a

    keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert 
    keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial 
    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
    keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
    keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
    keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed

    And also I change the config/service.properties like this:


    Then kafka is going up without any issue.
    but when ever I tried with client which is spring configured spring starter. I received the handshakes failed error form both side. kafka.log and client .
    Also this is my config from spring side:

          - broker:9093
          key-password: eg12345
          key-store-location: classpath:/jks/kafka.client.keystore.jks
          key-store-password: eg12345
          key-store-type: JKS
          trust-store-location: classpath:/jks/kafka.client.truststore.jks
          trust-store-password: eg12345
          trust-store-type: JKS
          protocol: SSL
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group-id: test-consumer-group
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer

    Error Receive from client side:
    [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Bootstrap broker broker:9093 (id: -1 rack: null) disconnected
    Error Receive from kafka side:
    [SocketServer brokerId=0] Failed authentication with / (SSL handshake failed) (org.apache.kafka.common.network.Selector)

    2 replies
    Miguel González
    is it possible to pause/restart a kafka streams app? I have only found this discussion https://groups.google.com/g/confluent-platform/c/Nyj3eN-3ZlQ/m/lMH-bFx-AAAJ about using map to call an external service and loop until some condition completes
    9 replies
    Miguel González

    So I'm trying out the Cloud Stream Kafka Binder:

    I have a question regarding the functional programming model. My topology has two input topics and one output topic, so I'm using a BiFunction. In my case I need to do some other processing with the input Kstreams that I receive, like filtering and repartition the streams. So can I create two other Functions that process those input streams and then have the bifunction use them as parameters with Function Composition. or should I do all that processing in the BiFunction?

    2 replies
    Pietro Galassi
    How i can handle exception producer (as an example org.apache.kafka.common.errors.TimeoutException, org.apache.kafka.common.errors.NetworkException) ?
    16 replies
    Pietro Galassi
    In a MANUAL_IMMEDIATE scenario how i can create a rebalance listener that commits what i have already pull ?
    11 replies
    Pedro Silva
    Hello. I know that this change (spring-projects/spring-kafka#1615) made it possible to set a max age for KafkaProducers when they are created by the DefaultKafkaProducerFactory. Is it possible to achieve the same thing but for the producers created by KafkaStreams/StreamsBuilderFactoryBean ? Thanks
    2 replies
    Kannadasan S
    How to ensure Spring Cloud Stream Listener to wait to process messages until SpringBoot Application is fully initialized on Start?
    1 reply
    Mateusz Fronczek
    Hey. I'm currently looking into setting up a new Kafka transaction from within the thread started by @StreamListener. I do have a ChainedKafkaTransactionManager in the application context. At first, I tried to utilize @Transactional(propagation = Propagation.REQUIRES_NEW, transactionManager = "chainedKafkaTransactionManager") but to no avail - it failed with "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION". Therefore, I attempted to use kafkaTemplate.executeInTransaction(), however, it seems that it is rolled back together with the outer transaction. How should this be configured in order to achieve separation between two transactions, where the inner one is created from within consumer governed tx? I also tried to set new transactionIdSuffix on KafkaTemplate, still no success. Any help would be much appreciated, thanks!
    8 replies
    Spencer Goldberg
    What's the appropriate defensive programming for the issues detected in the NetworkClient related to missing listeners or metadata (e.g. UNKNOWN_TOPIC_OR_PARTITION)? My producer has callbacks registered on the kafkaTemplate send() but I don't see them being called (based on logs) at the same time as the NetworkClient warnings in the logs.
    Spencer Goldberg
    Also if I choose not to implement an onSuccess and onFailure callback, is successful delivery guaranteed when send().get() finally returns? Or do you truly need to have an onFailure callback to detect when the send truly fails.?
    6 replies
    I have a spring kafka app using @KafkaListener annotation using a ConcurrentKafkaListenerContainer. I have to use ThreadLocal for state, and am trying to clean up state when receiving the ListenerContainerIdle event. I read in the documentation that "By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective.". Does this mean that I can access ThreadLocal when subscribed to the ListenerContainerIdle event alongside the @KafkaListener?
    5 replies
    Pietro Galassi
    Is there a way to add a shutdown hook so that current event in elaboration in a listener is completed BEFORE pod is shutdown ? (Even in a kill pod scenario for OOM)
    1 reply
    Pietro Galassi
    Does producer retry take care also of disconnects ?
    2 replies
    Akshansh Jain
    Do idle consumers account for CPU utilization of application? For ex. I have a topic with 2 partitions and have 100 consumers running in my application for this topic, then the CPU utilization of my app would be higher than if I only had 2 consumers running. (I am creating consumers using .setConcurrency() method of ConcurrentKafkaListenerContainerFactory). Spring version - 1.5.6.RELEASE, spring-kafka version - 1.2.2.RELEASE
    19 replies
    Does Spring Kafka uses Log4j2 for logging?Reason for asking is due to this security bug n log4j2
    10 replies
    Pietro Galassi
    Is there a way during a rebalance to stop the execution the actual event beeing processed by a poll so that this event whould be re elaborated by a different consumer after the rebalance ?
    1 reply
    Have this issue when start multiple instances of application. When i have only one instance it's does not reproduces:
    Exception in thread "ForkJoinPool.common-worker-1 org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:743)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:584)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:544)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:519)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:513)
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:683)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:569)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:386)
    at com.nab.ms.hs.lodgement.producer.HardshipCaseSubmitEventProducer.publishHardshipCaseSubmitEvent(HardshipCaseSubmitEventProducer.java:47)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.lambda$publishHardshipCaseSubmitEvent$0(CreateHardshipRequestService.java:108)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.processAccountRequest(CreateHardshipRequestService.java:103)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.processNewHardshipRequest(CreateHardshipRequestService.java:75)
    at com.nab.ms.hs.lodgement.application.HardshipNewRequestService.lambda$processNewHardshipRequest$0(HardshipNewRequestService.java:46)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1728)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
    Caused by: java.lang.ExceptionInInitializerError: null
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:368)
    ... 22 common frames omitted
    Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.subject.TopicNameStrategy for configuration key.subject.name.strategy: Class io.confluent.kafka.serializers.subject.TopicNameStrategy could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:729)
    1 reply
    Hello everyone, and a happy new year :-)I have implemented "non blocking retries" in my Kafka consumers. However I am trying to find a way to reprocess all failed messages delivered to the DLT, based on the error message reported in the headers of the messages posted to the DLT. The current DLT handler handles one message at a time when they are posted to the DTL, but I am trying to develop a reprocess mechanism that will enable me to do a mass reprocess of messages that had failed for a specific reason, once the failure reason had was fixed.
    3 replies
    Andras Hatvani
    Hi, is there a way to retrieve and inject the default value serde in a kafka streams app/test?
    2 replies
    Bastien Bouclet
    Hi, when using a batch listener, is there a way to partially positively acknowledge a batch (synchronously commit the offsets up to a point in the batch, and then continue processing the records until finally acknowledging the whole batch) ?
    8 replies
    Hi. How to modify the existing kafka record header value in producer and consumer records. I don't want to duplicate the key instead I want to override the existing key value.
    2 replies
    Josh Wein
    I haven't been able to find any docs on this - is it possible to use a Java 17 Record as a Kafka message? I'm able to produce a message that is a record, but deserialization using 'org.apache.kafka.common.serialization.StringDeserializer' and 'org.springframework.kafka.support.converter.StringJsonMessageConverter' for the batch listener, seems to fail. From my understanding Records are not exactly like Objects when it comes to serialization/deserialization so that may be why it doesn't work out of the box
    2 replies

    Approaches to handle 10 million records of 10 tables each with several joins

    I have 10 million records of 10 tables each with several joins. I am looking for the best alternative or DB assign or approach to read the records very quickly that is the query should be fast.

    option#1 - normalize the tables, don't go for joins unnecessarily
    option#2 - add all the columns in 1st query where multiple times the
    where conditions will be used in the looping construct
    option#3 - go for nosql database instead of mysql

    Please advise


    1 reply

    The DeadLetterPublishingRecoverer use a template resolver to get a valid KafkaTemplate to publish accepted messages.
    But according to this

    The template is only based on the value class.

    My messages being <String, String> I created a template for <String, String> messages.

    But how to handle deserializations error cases ?

    • <String, String> => Key error, Value ok => <byte[], String>
    • <String, String> => Key ok, Value error => <String, byte[]>
    • <String, String> => Key error, Value error => <byte[], byte[]>


    12 replies

    I use the stateful retry mechanism so I can retry processing messages a few time with a fixed delay between each attempts and it's not working

        public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                KafkaTemplate<String, Object> template) {
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            configurer.configure(factory, kafkaConsumerFactory);
            factory.setErrorHandler(new SeekToCurrentErrorHandler(
                    new DeadLetterPublishingRecoverer(template), 3));
            return factory;
        private RetryTemplate retryTemplate() {
            RetryTemplate template = new RetryTemplate();
            FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
            SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(MAX_ATTEMPTS,
                    Collections.singletonMap(RetryableException.class, true),
            return template;

    I'm throwing a RetryableException in the KafkaListener .
    Did I miss something ? Thanks

    11 replies
    i am using spring-boot, spring-kafka, micrometer-prometheus in my project .
    my project is message consumer
    anyone knows how metrics could be exposed for this kafka listener ?
    is there any public grafana dashboard which makes use of these metrics ?
    3 replies
    Tiger Wang (王豫)

    Hello - a best practice we embrace is to fail a bean initialization when connection check to an external resource fails. For a Kafka consumer bean, what's the best way to check for broker availability during its initialization?

    AFAIK there is no straightforward way to catch Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected warning.


    Tiger Wang (王豫)
    All I can think of is, have a timer to check if onPartitionAssigned() is called within 60sec after startup...and if not, then consider something wrong with the broker and terminate the service.
    I just don't feel it's the best approach.

    A better approach is to construct a KafkaAdmin object during bean initialization, and try to do something like list topics and fail the bean initialization if it can't.

    However I am looking to see if there is a best practice at all.

    Any recommendation?
    Tiger Wang (王豫)
    3 replies
    Ebrahim Khosravani

    Hi guys, I stuck with KStream and Java-8. I need your help.
    I was just confused about this method, and I need to create some unit tests with this method. Is there any hint and sample?

    public Function<KStream<String, String>, KStream<String, String>[]> handler() { return transactionStream -> transactionStream .map((key, value) -> extractStatus(key, value)) .branch((key, value) -> true); }

    1 reply
    Pietro Galassi
    Hi all, is there a way to perform some action right before or after "Shutdown executor service" and "Consumer stopped" are logged by a @KafkaListener ?
    3 replies
    Hi , in my project after spring-kafka upgrade from 2.2.3 to 2.7.3 deserializaion started failing
    1 reply
    we are receiving avro message from producer application
    i am also using custom deserializer mentioned here https://gitter.im/spring-projects/spring-kafka?at=5d7535a6ae44a84124a21c3e
    any suggestion about changes in this upgrade which is causing this failure ?
    7 replies
    Zane XiaoYe

    Hi @garyrussell
    It seems that the reactive Kafka consumer stopped once there’s an exception occurred in the processing logic, such as an WebClientResponseException exception.

                    .flatMap(record -> process(record))

    what is the correct way to avoid that?

    9 replies
    Pietro Galassi
    Hi, does @KafkaListener goes in graceful shutdown on sigterm (not consuming other messages) ? Should it be configured ?
    14 replies

    Hello, I'm using RetryingBatchErrorHandler for a batch listener with the FixedBackOff and it seems to work well.
    However when the error handler retries, no retrying log is shown so there's no way to acknowledge for me whether this handler is retrying or not. It only shows the error log that I configured as ConsumerRecordRecoverer only when the retries are exhausted.

    Is there any way to print the every retrying log while the error handler is retrying?

    Below is the part of the container factory configuration

    final var errorHandler = new RetryingBatchErrorHandler(new FixedBackOff(3000, 5), (consumerRecord, e) -> {
        log.error("failed record - {}", consumerRecord);
    3 replies
    @garyrussell Hello Gary, I have seen places where StringSerDeSer is being used where JsonSerDeSer can be used and vice versa for an object let us say User.class. I do not see anywhere this has been specified what to use and when. Please can you help ? Thanks.
    8 replies
    Billy Conner Jr.
    I am fairly new to Kafka. I have a producer sending data to a broker. I can use the console consumer to view the messages sent ok. However, it seems that after a few days those messages are no longer there. I thought data sent to kafka would remain there indefinitely. Is there a configuration setting I have incorrect?
    Hi, I'm curious how spring kafka client resolve the exact broker address?
    Any documentation?
    9 replies
    Miguel González

    Hello guys, I'm using ackMode=MANUAL_IMMEDIATE and receiving on my listener Acknowledgment as a parameter, but I keep seeing this error sometimes on my logs "Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group" full stack trace on:


    on the listener method i'm using acknowledgment.acknowledge() (error is from this line); and I catch a custom exception, I do acknowledgment.nack.

    Any suggestions on how to avoid this

    9 replies
    Miguel González
    hey guys if I use manual ack should I remove the use of a SeekToCurrentErrorHandler in my consumer config? If something goes would it be retried twice if I have both configurations (i.e acknowledgment.nack and SeekToCurrentErrorHandler)
    1 reply