Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Zane XiaoYe
    I’m using @EmbeddedKafka, if i don’t specifiy the topics inside the annotations, how can I create the topic using API?
    3 replies
    Md. Amjad Hossain
    Hello, I am working to configure spring cloud bus Kafka on my application. application(yc-account) is the client of config server. o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=yc-account:8082-0, groupId=anonymous.45817d1a-94be-469d-a365-17b2ca165443] Adding newly assigned partitions: springCloudBus-0
    How I can change the groupId anonymous to yc-account?
    1 reply
    Muhammad Abdurrahman
    Hi, I'm trying to create two separate kafka streams using two different StreamsBuilderFactoryBean, the default one and a custom one. However, as soon as I introduce the new stream with the new builder, the other does not respond to any events, i.e. the processor does not get invoked. Is it possible to create streams using multiple builders in spring-kafka? I'm not getting any errors. When I debug, I can see that the KStream<?,?> beans are being created, but they do not seem to be processing anything. Ant ideas? @artembilan
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> config = new HashMap<>();
            return new KafkaStreamsConfiguration(config);
        @Bean(name = B_STREAMS_BUILDER)
        public FactoryBean<StreamsBuilder> myKStreamBuilder(
                @Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) KafkaStreamsConfiguration streamsConfig
        ) {
            return new StreamsBuilderFactoryBean(streamsConfig);
      public KStream<?, ?> kStream(@Qualifier(DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilder kStreamBuilder) {
        public KStream<?, ?> kStream(@Qualifier(B_STREAMS_BUILDER) StreamsBuilder kStreamBuilder) {
    3 replies
    Charlie Hubbard

    Hi, I'm trying to connect to an AWS MKS kafka cluster using IAM authentication, but my client isn't on EC2. I think I have to integrate this library https://github.com/aws/aws-msk-iam-auth, but the instructions the provide are for the Kafka client not Spring boot. It requires installing the following properties:

    # Sets up TLS for encryption and SASL for authN.
    security.protocol = SASL_SSL
    # Identifies the SASL mechanism to use.
    sasl.mechanism = AWS_MSK_IAM
    # Binds SASL client implementation.
    sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
    # Encapsulates constructing a SigV4 signature based on extracted credentials.
    # The SASL client bound by "sasl.jaas.config" invokes this class.
    sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

    So I'm trying to adapt this to Spring Boot integration now, and I'm not sure how I can load the 'software.amazon.msk.auth.iam.IAMLoginModule' into the spring integration client. What properties would I use? Is this even possible?

    4 replies
    I have lKafka Consumer with 4 instances running with annotation . @KafkaListener without mentioning Concurrency.So whats the default concurreny for KafkaListener .And will 4 instances would be consuming messages?.1 kafkaListener is annoted with 2 topics with 5 partition each.
    3 replies
    Anders Clausen

    Hi all. I've got an AVRO schema and generated my POJOs. I can consume the messages fine but when I want to turn the parent-POJO into a String with either ObjectMapper or ObjectWriter from Jackson, I get weird exceptions like Caused by: org.apache.avro.AvroRuntimeException: Not an array:.

    Any chance anybody has a fix for this?

    Have a great weekend. Cheers

    2 replies

    Hi again guys, I have a problem with the consumer objects usage, that it is leading to a ClassCastException, maybe you can throw some light into the issue.

    First, I have this object generated by a schema (avro in this case), that is generated in the directory build/generated-main-avro-java/com/company/GeneratedObject.java

    Then, I have the following KafkaListenerContainerFactory

        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GeneratedObject>>
      generatedAvroContainerFactory() {
            final ConcurrentKafkaListenerContainerFactory<String, GeneratedObject> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
            return factory;

    With the consumer:

            topics = "topic",
            containerFactory = "generatedAvroContainerFactory",
            groupId = "${spring.kafka.consumer.group-id}"
        public void consumer(ConsumerRecord<String, GeneratedObject> record, Acknowledgment acknowledgment) {

    Messages arrive to the consumer, but right after I try to access any field of the object,
    I get

    Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.company.GeneratedObject (org.apache.avro.generic.GenericData$Record and com.company.GeneratedObject are in unnamed module of loader 'app')

    Could it be I'm missing any kind of configuration? Or does not have it any relation with spring-kafka?

    6 replies
    Hello ,
    Does Kafka Producer/Consumer Api expose any metrics to monitor .I have spring actuator jars.Or how i can monitor themetrics ?
    1 reply
    Hello, is it possible to generate Sleuth traceId for each message consumed from Kafka ?? I am not looking to carry id at message headers level, just trying to see if we can generate unique traceId for each consumed message.
    3 replies
    I am using @KafkaListener for the consumer. The producer is sending "spring_json_header_types" with type as string for all the header by the real value is bytearray. This is causing exception to be logged as error when trying to convert header in consumer even though I am not using those headers.
    Here is there error "o.s.k.support.DefaultKafkaHeaderMapper : Could not decode json type: PublishProfilePreferences for key: DeviceMessageName"
    This is not creating any issue but it a noise in the logs. Is there a way to avoid this header mapping? that is use of "spring_json_header_types"
    5 replies
    Roussi Abdelghani

    hello All, I have an issue with my consumer, I got this error :

    o.a.k.c.c.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

    I read about the fact that the time took by the message processing between the call to the poll() and the commit of the offset may be longer than auto.commit.interval.ms, is it the only cause for this problème ?
    How can I know if my consumer is using batch mode or not ?

    here is my configuration (I'm using spring cloud stream, but I think this have nothing to do with spring cloud stream but the configuration of the consumer ) :

              enabled: true
          source: testProducer
              destination: test-consumer-topic
              group: test-consumer
              destination: test-producer-topic
                useNativeEncoding: true
              auto-create-topics: false
              headerMapperBeanName: kafkaHeaderMapper
                auto.offset.reset: earliest
                '[key.serializer]': org.apache.kafka.common.serialization.StringSerializer
                '[value.serializer]': org.springframework.kafka.support.serializer.JsonSerializer
                  messageKeyExpression: headers['kafka_messageKey']
    Michael Düsterhus
    hello is there a nice way in spring-kafka to check connectivity to kafka and start/stop listeners accordingly? otherwise the log is bloated with log messages as long as kafka is down. The NetworkClient of the Kafka lib maintains a connectivity state, but sadly its not accessible from the outside.
    2 replies
    Zane XiaoYe
    I’m using Spring reactor to build the data processing pipeline, using Spring Kafka to consume from the source. Is it possible to support backpressure such that when the downstream is blocking, Spring Kafka can slower the consumption speed?
    3 replies
    Egorka Voronyansky
    Hi, everyone! I have faced with next problem. I checked SO and found nothing applicable to my problem. Tried every answer. Can someone please help me?
    class org.apache.kafka.common.serialization.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Serializer
    3 replies
    I am using Spring Boot 2.4.3 and Spring-Kafka 2.6.6
    I am using KafkaListener ammotation in my consumer API.Is it possible to change concurreny and add topics withouts restarting the app?
    3 replies
    One more question is there any major difference 2.7.x
    and 2.6.x.Due to some reason I need to use spring boot 2.3.x.And when i try to use 2.3.3.RELEASE spring boot ,i dont get the methods to pause and resume kafka consumer
    1 reply
    Can someone help me with latest working microservices demo or sample application to deploy in AWS and test? Please kindly help me..Need to learn it quickly as possible..
    I am not sure if its related to Kafka.
    Our Spring Kafka consumer is deployed on PCF with multiple instances (2 data center) .Is it possible to capture from which ipadress/data center the Kafka message was read ?Any kind of hook ?
    3 replies
    Adis Bajric
    How do i get instance of running consumer . ConsumerFactories only provide createConsumer , addListener and simillar?
    3 replies
    Hi, Is there any sample code availble to avoid @DirtiesContext to clean up springboot context in springtest while using @EmbeddedKafka ? Since we have lot of integration test using embeddedkafka and clean the context for everytestcase is increasing the buildtime.
    2 replies
    Pietro Galassi
    If a rebalance happens offeset are automatically committed ? Both in BATCH and RECORD AckMode ? Either if an event is in elaboration and a rebalance occurs its offset is committed by rebalance listener or you need to extend it in order to do the commit ?
    40 replies
    Aleksey Vasin

    Hello @garyrussell, please help us with our problem...

    The goal we want to achieve is to receive all messages from each topic's partitions without using the consumer group, since the application can scale to several instances and each instance must read all messages from all partitions. At the same time, we have another restriction: while application is running, in the case a new topic's partition has been created, application(without restart) should receive messages from this new partition.

    1. How to create a consumer via @KafkaListener without a consumer group, so that the consumer will assign to all topic's partitions?
    2. How to make it so, that a consumer which was created without a consumer group automatically detects a newly created partition in the topic?
    7 replies
    I have Spring Kafka Consumer running in PROD with multiple instances. I read the data in listener thread, then do the processing in background thread. How can I shutdown the stop the app gracefully i.e. when all processing is done in background thread and listener thread. I have exposed 2 end points to
    1) resume the Spring Kafka Container
    2)pause the Spring Kafka Container
    I can pause the listener thread by calling PAUSE rest end point.Is there way to monitor background thread
    10 replies
    Zane XiaoYe

    Hi @garyrussell
    I’m trying the reactor-kafka, with sendTransactionally.

    public Mono<Void> send(Flux<ProducerRecord<String, String>> records) {
        return sender.sendTransactionally(Flux.just(records))
                     .doOnNext(r -> log.info("Producer#send", r.recordMetadata().toString())).then();

    this send was called inside a flatMap, but I got the Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION a lot. What could be the problem? My transactionId is unique across nodes.

    1 reply
    Miguel González
    hello guys, I'm building a kafka streams app, but the issue I have... one of my topics has two event types in the same topic. Is there a way to use a DelegatingDeserializer/DelegatingSerializer as SERDE for a KStream? Otherwise I think it would try to map the array of bytes to the incorrect event type. The idea is that I want to filter out one of the event types and work with the other one. I was thinking on receiving the array of bytes and then transform to protobuf but then I wouldn't be able to use TimestampExtractors.
    6 replies
    Petromir Dzhunev
    Hello guys. Thanks for the great library. I have a task to develop a multi-tenant application, which translates to listening for the same type of event from different Kafka clusters. Is there a way to configure DefaultKafkaConsumerFactory or ConcurrentKafkaListenerContainerFactory to connect to 2 or more Kafka clusters, then bind it to a single @KafkaListenerwhich will process all the events?
    3 replies
    Zane XiaoYe
    How can I specify a custom partitioner in reactor-kafka’s KafkaSender?
    3 replies
    Zane XiaoYe
    How can I increase the processing concurrency of reactor-kafka consumer? suppose I have the following consumer code:
                    .doOnNext(record -> {                   
                        processMessage(record.value()).doOnSuccess(__ -> {
    7 replies
    Miguel González

    hey team, can you confirm when using KafkaTemplate's method send(topic, key, value) it will calculate the partition based on the message key?

    if you go to https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java#L406
    it basically creates a ProducerRecord, like this: https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java#L133
    which seems to send null as partition

    I found this question https://stackoverflow.com/questions/45556142/spring-kafka-partitioning seems to confirm it, but where does it happen?

    5 replies
    Pietro Galassi
    Is it possible to graceful shutdown kafka consumer ? On kubernates when a pod is cancelled/removed it seems that the actual event that his elaborating is not finished and therefore not commited. This let the event moved to another pod and therefore if first event was at half of the elaboration some data on database may be already written. I know that kafka event "should" be equipotent. How to handle this scenario ? It it possible to wait the consumer to finish it's elaboration before shutdown ?
    8 replies

    When i set ReceiverOptions.commintInterval in my java configuration, i don't see changes in consumer configuration. There only one property - auto.commit.interaval.ms.

    Does ReceiverOptions.commintInterval(Duration d) in reactor-kafka overrides kafka property auto.commit.interval.ms?
    When i use reactor-kafka can i set commit interval via kafka properties or i should set it exactly in ReceiverOptions configuration?

    2 replies
    Can i use reactor-kafka reciever with Spring Webflux for processing received records concurrently by webflux rest calls?
    If i use concatMap for process records via webflux, then in process only in one reactor-netty thread. When i use flat map - there are records processed concurrently by multiple netty threads, but there is no commit order. Should i use only concatMap for processing record via WebFlux?
    1 reply
    Pietro Galassi
    How to avoid logging "FETCH_SESSION_ID_NOT_FOUND" ? We using Spring Boot 2.4.3
    5 replies
    I have a Kafka consumer reading from topic-1 .While reading the message if json message is invalid (And not for other reasons ) then i need to send that message to DLQ topic dlq-topic-1.Is it possible
    2 replies
    Hello, does ConcurrentMessageListenerContainer.isRunning() shall return true if its child are not running ? Actually its returning true even if ConcurrentMessageListenerContainerisChildRunning() is false
    15 replies
    Laurensius Bernhard
    Hi guys, im have a question.
    i have an existing ConcurrentKafkaListenerContainerFactory called kafkaListenerContainerFactory and i want to add new listener with batch listener turned on, so i created another ConcurrentKafkaListenerContainerFactory called kafkaBatchListenerContainerFactory.
    after that i updated my existing @KafkaListener to use the new batch listener containerFactory, the topic still the same.
    the problem is, the messages are consumed from the beginning. i just want to continue from the recent offset just like before. what did i do wrong here?
    in case its related, my properties are spring.kafka.consumer.group-id: test-group and auto-offset-reset: earliest
    7 replies
    The kafka producer sends data to the consumer, and the data must store a copy of the data on the broker's disk as it passes through all brokers? Can it be forwarded directly to the consumer over the network without storing to disk?
    5 replies
    Hi All, I am trying to figure out the best way to implement "at least once" processing with spring kafka. The "processing" that happens in the @KafkaListener does some work and then produces a message to a different kafka topic. Is the only way to enforce at least once semantics to use the Batch Kafka Listener, and flush the producer - then manually commit the offsets?
    12 replies
    Ebrahim Khosravani

    Guys, I need to make connection with ssl to kafka, that is I should prepare the kafka configuration server side as well.
    I created the keystore.jks + truststore.jks which I read from zone : https://dzone.com/articles/kafka-ssl-client-authentication-in-multi-tenancy-a

    keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert 
    keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial 
    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
    keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
    keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
    keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed

    And also I change the config/service.properties like this:


    Then kafka is going up without any issue.
    but when ever I tried with client which is spring configured spring starter. I received the handshakes failed error form both side. kafka.log and client .
    Also this is my config from spring side:

          - broker:9093
          key-password: eg12345
          key-store-location: classpath:/jks/kafka.client.keystore.jks
          key-store-password: eg12345
          key-store-type: JKS
          trust-store-location: classpath:/jks/kafka.client.truststore.jks
          trust-store-password: eg12345
          trust-store-type: JKS
          protocol: SSL
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group-id: test-consumer-group
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer

    Error Receive from client side:
    [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Bootstrap broker broker:9093 (id: -1 rack: null) disconnected
    Error Receive from kafka side:
    [SocketServer brokerId=0] Failed authentication with / (SSL handshake failed) (org.apache.kafka.common.network.Selector)

    2 replies
    Miguel González
    is it possible to pause/restart a kafka streams app? I have only found this discussion https://groups.google.com/g/confluent-platform/c/Nyj3eN-3ZlQ/m/lMH-bFx-AAAJ about using map to call an external service and loop until some condition completes
    9 replies
    Miguel González

    So I'm trying out the Cloud Stream Kafka Binder:

    I have a question regarding the functional programming model. My topology has two input topics and one output topic, so I'm using a BiFunction. In my case I need to do some other processing with the input Kstreams that I receive, like filtering and repartition the streams. So can I create two other Functions that process those input streams and then have the bifunction use them as parameters with Function Composition. or should I do all that processing in the BiFunction?

    2 replies
    Pietro Galassi
    How i can handle exception producer (as an example org.apache.kafka.common.errors.TimeoutException, org.apache.kafka.common.errors.NetworkException) ?
    16 replies
    Pietro Galassi
    In a MANUAL_IMMEDIATE scenario how i can create a rebalance listener that commits what i have already pull ?
    11 replies
    Pedro Silva
    Hello. I know that this change (spring-projects/spring-kafka#1615) made it possible to set a max age for KafkaProducers when they are created by the DefaultKafkaProducerFactory. Is it possible to achieve the same thing but for the producers created by KafkaStreams/StreamsBuilderFactoryBean ? Thanks
    2 replies
    Kannadasan S
    How to ensure Spring Cloud Stream Listener to wait to process messages until SpringBoot Application is fully initialized on Start?
    1 reply
    Mateusz Fronczek
    Hey. I'm currently looking into setting up a new Kafka transaction from within the thread started by @StreamListener. I do have a ChainedKafkaTransactionManager in the application context. At first, I tried to utilize @Transactional(propagation = Propagation.REQUIRES_NEW, transactionManager = "chainedKafkaTransactionManager") but to no avail - it failed with "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION". Therefore, I attempted to use kafkaTemplate.executeInTransaction(), however, it seems that it is rolled back together with the outer transaction. How should this be configured in order to achieve separation between two transactions, where the inner one is created from within consumer governed tx? I also tried to set new transactionIdSuffix on KafkaTemplate, still no success. Any help would be much appreciated, thanks!
    8 replies
    Spencer Goldberg
    What's the appropriate defensive programming for the issues detected in the NetworkClient related to missing listeners or metadata (e.g. UNKNOWN_TOPIC_OR_PARTITION)? My producer has callbacks registered on the kafkaTemplate send() but I don't see them being called (based on logs) at the same time as the NetworkClient warnings in the logs.
    Spencer Goldberg
    Also if I choose not to implement an onSuccess and onFailure callback, is successful delivery guaranteed when send().get() finally returns? Or do you truly need to have an onFailure callback to detect when the send truly fails.?
    6 replies
    I have a spring kafka app using @KafkaListener annotation using a ConcurrentKafkaListenerContainer. I have to use ThreadLocal for state, and am trying to clean up state when receiving the ListenerContainerIdle event. I read in the documentation that "By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective.". Does this mean that I can access ThreadLocal when subscribed to the ListenerContainerIdle event alongside the @KafkaListener?
    3 replies