Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Pietro Galassi
    How to configure a EOS (exactly once semantic) ?
    36 replies
    Deepak Emani
    hi, we are looking at the options for kafka consumer replaying messages based on timestamp. we use spring kafka in one consumer and reactive spring kafka in another consumer. Please point to any working examples for both type of services
    9 replies
    Morning, Are there any known issues in converting avro to json and back to avro as part of monitoring?
    2 replies
    Hello! I need to recevie messages form one topic, process them and send to another. It's a correct way to ack messages after send was success, like this:
    @KafkaListener(topics = "in", containerFactory = "testFactory")
    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String result = process(record);
    kafkaTemplate.send("out", result).addCallback(new ListenableFutureCallback<>() { 
                public void onFailure(Throwable throwable) {
                LOGGER.error("Failed to send message: {}", result, throwable);
                public void onSuccess(SendResult<String, String> stringStringSendResult) {
                     LOGGER.debug("Successfully send message: {}", result);
    5 replies
    Jack Wilkinson
    Im currently trying to write a robust kafka producer using KafkaTemplate.send, does anyone have any good resources for how to handle the many exceptions that might be thrown from this call? I am calling .get() on to ensure its performed synchronously but i want to unit test all the possible scenarios
    1 reply
    Aleksey Vasin
    Hello, how is the best practise to fix this? I have one consumer group, which consume from different topics and catch this sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    2 replies
    thanks for answers
    Is there anyway we can monitor consumer lag programmatically?
    1 reply
    Luigi Cerone
    Hello, is there any way I can delete a topic via @NewTopic annotation? Thanks
    4 replies
    when configuring consumption of topics using kafka listener, some of the listeners fail with disconnect exception or timeout exception, is there a way to retry, the containers seems to be simply hanging in there doing nothing
    4 replies

    I want to read 3 crore csv rows which is of 2GB csv file size and need to insert into MySQL via Java.

    Could someone please help me know the fastest and memory efficient way to avoid out of memory exception as well load in lesser time?

    Please kindly advise.

    Igor Rudenko
    Hi there, I have a problem Spring Kafka do not send kafka header with name Id and system time, how to override option to include this fields in header?
    1 reply
    Can someone help me with links and suggestion to process csv file and send it to kafka topic via spring batch/integration?
    6 replies

    Hi Everyone, I am new in Kafka and trying to test my application with embedded kafka my class config like below:


    And I am using below versions:
    kafka confluent avro serializer: 5.2.1

    But I got below error:
    Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/zookeeper/client/ZKClientConfig
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1786)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:413)
    at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:116)
    at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:277)
    at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:635)
    at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:390)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:325)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:123)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:124)
    ... 25 more
    Caused by: java.lang.NoClassDefFoundError: org/apache/zookeeper/client/ZKClientConfig
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1285)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1272)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:315)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1845)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1782)
    ... 34 more
    Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.client.ZKClientConfig
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
    ... 39 more

    Could somebody please help me if you faced with such an issue before. Thanks in advance

    1 reply
    Joe McCaffrey

    hi - see that the next release has an upgrade to kafka 2.8.0. Should I expect a kafka mismatch with the latest 2.7 release?

    Having some weird errors like Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/kafka/common/record/BufferSupplier

    3 replies
    Where i can find a good spring integration + kafka sample where to process a csv file and write it to DB or Kafka based on the configuration?
    7 replies
    what are the open source tool is available to monitor/go through kafka messages ?
    Jack Wilkinson

    Hey all, is there a way for me to know if the Kafka broker is disconnected in my consumer. It is currently set up and working great via the @KafkaListener annotation on a method.

    My use case is that if the Kafka broker has become disconnected, for example: it was once connected and working but now no longer is. Is there a way i can hook into that so a method is called so that i can log any appropriate details for the operations team?

    14 replies
    what are the open source tools available to monitor/go through kafka messages ?
    Mukut Bhattacharjee

    Hi devs

    I have setup a single node kafka using bitnami-kafka (https://github.com/bitnami/bitnami-docker-kafka/blob/master/docker-compose.yml) with some modifications as below

    version: "2"
        image: docker.io/bitnami/zookeeper:3.7
          - "2181:2181"
          - "/var/data/bitnami:/bitnami"
        image: docker.io/bitnami/kafka:2.7.0
          - "9092:9092"
          - "/var/data/bitnami:/bitnami"
          - KAFKA_BROKER_ID=1
          - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
          - zookeeper

    Below is my application.yml for spring boot producer client

        bootstrap-servers: localhost:9092
          group-id: "my-app-group"
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

    But the producer is not able to connect to the broker with the folowing exception

    2021-07-29 10:25:30.139 ERROR 14455 --- [ad | producer-1] m.l.springkafka.producer.EventProducer   : Error in  publishing message. Root cause: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation
    org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation
        at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(KafkaTemplate.java:620) ~[spring-kafka-2.7.4.jar:2.7.4]
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:872) ~[spring-kafka-2.7.4.jar:2.7.4]
        at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1366) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:690) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:381) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) ~[kafka-clients-2.7.1.jar:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) ~[kafka-clients-2.7.1.jar:na]
        at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
    Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0:120003 ms has passed since batch creation

    Whereas I am able to produce using kafka-console-producer --topic test --bootstrap-server localhost:9092 and consume successfully using kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

    Can any one please provide some inputs here? TIA

    1 reply

    How to handle this specific use case in Spring Batch - Different Names?

    There is a CSV file with many records where the CSV header names and the DB domain object names are different. How to fetch the CSV data and create a model to persist in the database in Spring Batch? Below throws errors since the header and db or jpa names are different

    Example, employee.csv

    Employee Id, Employee Name, Employee Address, Employee Address 2, Date Of Birth
    Domain Object Employee.java

    public string empId;
    public string empName;
    public string empAddress;
    public string empAddress2;
    public string empDOB;
    Error thrown on the flat file reader

    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.StepExecutionListener;
    import org.springframework.batch.core.configuration.annotation.JobScope;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
    import org.springframework.batch.item.file.mapping.DefaultLineMapper;
    import org.springframework.batch.item.file.mapping.FieldSetMapper;
    import org.springframework.core.io.FileSystemResource;
    import org.springframework.stereotype.Component;
    import org.springframework.validation.BindException;
    import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
    import org.springframework.batch.item.file.transform.FieldSet;

    public class EmployeeFlatFileItemReader extends FlatFileItemReader<Employee> implements StepExecutionListener {

    private String fileName;
    private String filePath;

    public void beforeStep(StepExecution stepExecution) {
    fileName = (String) stepExecution.getJobExecution().getJobParameters().getString("fileName");
    filePath = (String) stepExecution.getJobExecution().getJobParameters().getString("filePath");
    setResource(new FileSystemResource(filePath));

    public EmployeeFlatFileItemReader() {
    try {
    DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>() {
    setLineTokenizer(new DelimitedLineTokenizer() {
    setNames("Employee Id" + "," + "Employee Name" + "," + "Employee Address" + "," + "Employee Address 2" + "," + "Date Of Birth");
    setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
    setTargetType(Employee.class); // Throws error
    } catch (Exception e) {

    public ExitStatus afterStep(StepExecution stepExecution) {
    return stepExecution.getExitStatus();


    I am using Spring-kafka in production code which is making rest call to external service. Now while writing integration test using EmbeddedKafka server I am using WireMock server to mock external API call. My test case is running in main thread and kafka consumer is in its own thread so dependency set using wiremock is not getting picked up. Can anyone point me in some direction to handle this threading issue ?
    Zane XiaoYe
    Hi, is there any configuration needs to be done in order to get OpenTelemetry instrumentation working?
    1 reply
    Hi, I see there is now @RetryableTopic feature. But can it be triggered manually to retry just a sub list of messages from a batch ?
    1 reply

    Registering error with row details + line number + error message

    I'm processing a csv file using spring batch. I want to registering error with row details + line number + error message on all the steps wherever the error has been occured. The error may occur in reading stage or file input stage or processing stage or writing stage.

    How to do this? Best practice to do this?


    1 reply
    Akshansh Jain
    Hi, I have a use case where I need to define topics of my consumers at runtime. Also, other consumer configs are also provided at runtime. So I am creating a ConcurrentMessageListenerContainer and starting the container. I'm using this container as I need to define the concurrency as well. Now, I also want my consumers to consume messages in a batch. Is that possible with this implementation?
    3 replies
    Zane XiaoYe
    I want to test with @EmbededKafaka,
    I’m assuming by puting this annotition on the top of my test case,
    all my producer, consumer configuration in the main/java package will be pointing to this in-memory Kafka right?
    I don’t need to re-define all the consumer/producer in test/java for this embeded kafka, right?
    5 replies
    Akshansh Jain
    Hi, I have a use case where I want to increase/decrease number of consumers at runtime for a specific topic. As mentioned in the answer here - https://stackoverflow.com/questions/65058534/is-there-a-way-to-update-the-number-of-concurrency-in-concurrentmessagelistenerc, I can change the concurrency by stopping and starting the container. Here, if I use multiple KafkaMessageListenerContainers instead of ConcurrentMessageListenerContainer, and just increase/decrease these containers, will that be a better approach, considering I won't have to stop/start in this case.
    2 replies
    Zane XiaoYe
    What does bootstrapServersProperty = "spring.kafka.bootstrap-servers” mean? Is spring.kafka.bootstrap-servers a special reserved address?
    1 reply
    Jack Wilkinson

    Does anyone have any links to any useful guides for using Kafka Streams within Spring Boot?

    Also, do you have to use Spring Cloud Streams when using Kafka Streams within Spring?

    1 reply
    Zane XiaoYe
    Get the "Topic ... not present in metadata after 60000 ms.” error while producing to the EmbededKafka. The broker server is set to “localhost:9092”.
    1 reply
    Zane XiaoYe
    The only address i specified in the code base is localhost:9092, i could also telnet to this address, how could i connect to a different broker?
    1 reply
    Israel Perales

    Hi everyone.
    I have an issue when my microservices get lost the connection with kafka servers, all microservices consume the 100% of CPU.
    All works fine when reestablish the connection with kafka and i will try to secure the kafka server for never dies.

    But i think this is a bug in the libraries and i find this link reported in kafka clients jira.


    But i don't know if only upgrade the kafka client fix the bug or this is a bug in spring-kafka too and i need to upgrade both libraries?.

    this are my versions:

    spring-kafka: 1.1.7.RELEASE
    kafka-clients: kafka-clients-

    My first approach is to upgrade only kafka clients to the latest stable zero versions

    If this not works, i will upgrade spring-kafka to latest stable of 1.X.X versions, 1.3.11.RELEASE.

    Obviously the best practice is upgrade all to the latest version and this is the roadmap, but at this moment i need to patch this bug because i have a lot of microservices and i can't upgrade all libraries at the same time.

    Anyone remembers something about this problem?

    2 replies
    Akshansh Jain
    Is there a configuration/setting to create number of consumers according to number of partitions for a specific topic, and autoscale them according to these partitions, in spring kafka?
    5 replies
    does @KafkaListener use subscribe() internally to subscribe to topics?
    6 replies
    I see in the Apache Kafka code that it collects polling metrics. However, I do not see these metrics exposed through Micrometer. Is there something I need to do to expose these?
    2 replies
    Zane XiaoYe
    I’m using @EmbeddedKafka, if i don’t specifiy the topics inside the annotations, how can I create the topic using API?
    3 replies
    Md. Amjad Hossain
    Hello, I am working to configure spring cloud bus Kafka on my application. application(yc-account) is the client of config server. o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=yc-account:8082-0, groupId=anonymous.45817d1a-94be-469d-a365-17b2ca165443] Adding newly assigned partitions: springCloudBus-0
    How I can change the groupId anonymous to yc-account?
    1 reply
    Muhammad Abdurrahman
    Hi, I'm trying to create two separate kafka streams using two different StreamsBuilderFactoryBean, the default one and a custom one. However, as soon as I introduce the new stream with the new builder, the other does not respond to any events, i.e. the processor does not get invoked. Is it possible to create streams using multiple builders in spring-kafka? I'm not getting any errors. When I debug, I can see that the KStream<?,?> beans are being created, but they do not seem to be processing anything. Ant ideas? @artembilan
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> config = new HashMap<>();
            return new KafkaStreamsConfiguration(config);
        @Bean(name = B_STREAMS_BUILDER)
        public FactoryBean<StreamsBuilder> myKStreamBuilder(
                @Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) KafkaStreamsConfiguration streamsConfig
        ) {
            return new StreamsBuilderFactoryBean(streamsConfig);
      public KStream<?, ?> kStream(@Qualifier(DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilder kStreamBuilder) {
        public KStream<?, ?> kStream(@Qualifier(B_STREAMS_BUILDER) StreamsBuilder kStreamBuilder) {
    3 replies
    Charlie Hubbard

    Hi, I'm trying to connect to an AWS MKS kafka cluster using IAM authentication, but my client isn't on EC2. I think I have to integrate this library https://github.com/aws/aws-msk-iam-auth, but the instructions the provide are for the Kafka client not Spring boot. It requires installing the following properties:

    # Sets up TLS for encryption and SASL for authN.
    security.protocol = SASL_SSL
    # Identifies the SASL mechanism to use.
    sasl.mechanism = AWS_MSK_IAM
    # Binds SASL client implementation.
    sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
    # Encapsulates constructing a SigV4 signature based on extracted credentials.
    # The SASL client bound by "sasl.jaas.config" invokes this class.
    sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

    So I'm trying to adapt this to Spring Boot integration now, and I'm not sure how I can load the 'software.amazon.msk.auth.iam.IAMLoginModule' into the spring integration client. What properties would I use? Is this even possible?

    4 replies
    I have lKafka Consumer with 4 instances running with annotation . @KafkaListener without mentioning Concurrency.So whats the default concurreny for KafkaListener .And will 4 instances would be consuming messages?.1 kafkaListener is annoted with 2 topics with 5 partition each.
    3 replies
    Anders Clausen

    Hi all. I've got an AVRO schema and generated my POJOs. I can consume the messages fine but when I want to turn the parent-POJO into a String with either ObjectMapper or ObjectWriter from Jackson, I get weird exceptions like Caused by: org.apache.avro.AvroRuntimeException: Not an array:.

    Any chance anybody has a fix for this?

    Have a great weekend. Cheers

    2 replies

    Hi again guys, I have a problem with the consumer objects usage, that it is leading to a ClassCastException, maybe you can throw some light into the issue.

    First, I have this object generated by a schema (avro in this case), that is generated in the directory build/generated-main-avro-java/com/company/GeneratedObject.java

    Then, I have the following KafkaListenerContainerFactory

        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GeneratedObject>>
      generatedAvroContainerFactory() {
            final ConcurrentKafkaListenerContainerFactory<String, GeneratedObject> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
            return factory;

    With the consumer:

            topics = "topic",
            containerFactory = "generatedAvroContainerFactory",
            groupId = "${spring.kafka.consumer.group-id}"
        public void consumer(ConsumerRecord<String, GeneratedObject> record, Acknowledgment acknowledgment) {

    Messages arrive to the consumer, but right after I try to access any field of the object,
    I get

    Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.company.GeneratedObject (org.apache.avro.generic.GenericData$Record and com.company.GeneratedObject are in unnamed module of loader 'app')

    Could it be I'm missing any kind of configuration? Or does not have it any relation with spring-kafka?

    6 replies
    Hello ,
    Does Kafka Producer/Consumer Api expose any metrics to monitor .I have spring actuator jars.Or how i can monitor themetrics ?
    1 reply
    Hello, is it possible to generate Sleuth traceId for each message consumed from Kafka ?? I am not looking to carry id at message headers level, just trying to see if we can generate unique traceId for each consumed message.
    3 replies
    I am using @KafkaListener for the consumer. The producer is sending "spring_json_header_types" with type as string for all the header by the real value is bytearray. This is causing exception to be logged as error when trying to convert header in consumer even though I am not using those headers.
    Here is there error "o.s.k.support.DefaultKafkaHeaderMapper : Could not decode json type: PublishProfilePreferences for key: DeviceMessageName"
    This is not creating any issue but it a noise in the logs. Is there a way to avoid this header mapping? that is use of "spring_json_header_types"
    5 replies
    Roussi Abdelghani

    hello All, I have an issue with my consumer, I got this error :

    o.a.k.c.c.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

    I read about the fact that the time took by the message processing between the call to the poll() and the commit of the offset may be longer than auto.commit.interval.ms, is it the only cause for this problème ?
    How can I know if my consumer is using batch mode or not ?

    here is my configuration (I'm using spring cloud stream, but I think this have nothing to do with spring cloud stream but the configuration of the consumer ) :

              enabled: true
          source: testProducer
              destination: test-consumer-topic
              group: test-consumer
              destination: test-producer-topic
                useNativeEncoding: true
              auto-create-topics: false
              headerMapperBeanName: kafkaHeaderMapper
                auto.offset.reset: earliest
                '[key.serializer]': org.apache.kafka.common.serialization.StringSerializer
                '[value.serializer]': org.springframework.kafka.support.serializer.JsonSerializer
                  messageKeyExpression: headers['kafka_messageKey']
    Michael Düsterhus
    hello is there a nice way in spring-kafka to check connectivity to kafka and start/stop listeners accordingly? otherwise the log is bloated with log messages as long as kafka is down. The NetworkClient of the Kafka lib maintains a connectivity state, but sadly its not accessible from the outside.
    2 replies
    Zane XiaoYe
    I’m using Spring reactor to build the data processing pipeline, using Spring Kafka to consume from the source. Is it possible to support backpressure such that when the downstream is blocking, Spring Kafka can slower the consumption speed?
    3 replies
    Egorka Voronyansky
    Hi, everyone! I have faced with next problem. I checked SO and found nothing applicable to my problem. Tried every answer. Can someone please help me?
    class org.apache.kafka.common.serialization.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Serializer
    3 replies