Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Pietro Galassi
    @pietrogalassi
    How to avoid logging "FETCH_SESSION_ID_NOT_FOUND" ? We using Spring Boot 2.4.3
    5 replies
    covid2021corona
    @covid2021corona
    I have a Kafka consumer reading from topic-1 .While reading the message if json message is invalid (And not for other reasons ) then i need to send that message to DLQ topic dlq-topic-1.Is it possible
    2 replies
    Rajh
    @Rajh
    Hello, does ConcurrentMessageListenerContainer.isRunning() shall return true if its child are not running ? Actually its returning true even if ConcurrentMessageListenerContainerisChildRunning() is false
    15 replies
    Laurensius Bernhard
    @l7777777b
    Hi guys, im have a question.
    i have an existing ConcurrentKafkaListenerContainerFactory called kafkaListenerContainerFactory and i want to add new listener with batch listener turned on, so i created another ConcurrentKafkaListenerContainerFactory called kafkaBatchListenerContainerFactory.
    after that i updated my existing @KafkaListener to use the new batch listener containerFactory, the topic still the same.
    the problem is, the messages are consumed from the beginning. i just want to continue from the recent offset just like before. what did i do wrong here?
    in case its related, my properties are spring.kafka.consumer.group-id: test-group and auto-offset-reset: earliest
    7 replies
    Cassiel-girl
    @Cassiel-girl
    The kafka producer sends data to the consumer, and the data must store a copy of the data on the broker's disk as it passes through all brokers? Can it be forwarded directly to the consumer over the network without storing to disk?
    5 replies
    Smark11
    @Smark11
    Hi All, I am trying to figure out the best way to implement "at least once" processing with spring kafka. The "processing" that happens in the @KafkaListener does some work and then produces a message to a different kafka topic. Is the only way to enforce at least once semantics to use the Batch Kafka Listener, and flush the producer - then manually commit the offsets?
    12 replies
    Ebrahim Khosravani
    @mahbodkh

    Guys, I need to make connection with ssl to kafka, that is I should prepare the kafka configuration server side as well.
    I created the keystore.jks + truststore.jks which I read from zone : https://dzone.com/articles/kafka-ssl-client-authentication-in-multi-tenancy-a

    keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
    
    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert 
    keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert
    
    keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
    
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial 
    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
    
    keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
    keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
    keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed

    And also I change the config/service.properties like this:

    listeners=PLAINTEXT://:9092,SSL://:9093
    advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    security.inter.broker.protocol=SSL
    ssl.client.auth=required
    ssl.keystore.location=/home/kafka/kafka/conf/kafka.server.keystore.jks
    ssl.keystore.password=eg12345
    ssl.truststore.location=/home/kafka/kafka/conf/kafka.server.truststore.jks
    ssl.truststore.password=eg12345
    ssl.key.password=eg12345
    ssl.protocol=TLS
    ssl.endpoint.identification.algorithm=
    ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
    #ssl.keymanager.algorithm=SunX509
    #ssl.trustmanager.algorithm=PKIX
    ssl.keystore.type=JKS
    ssl.truststore.type=JKS
    #authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    #allow.everyone.if.no.acl.found=true
    javax.net.debug=ssl

    Then kafka is going up without any issue.
    but when ever I tried with client which is spring configured spring starter. I received the handshakes failed error form both side. kafka.log and client .
    Also this is my config from spring side:

    spring:
      kafka:
        bootstrap-servers:
          - broker:9093
        ssl:
          key-password: eg12345
          key-store-location: classpath:/jks/kafka.client.keystore.jks
          key-store-password: eg12345
          key-store-type: JKS
          trust-store-location: classpath:/jks/kafka.client.truststore.jks
          trust-store-password: eg12345
          trust-store-type: JKS
          protocol: SSL
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group-id: test-consumer-group
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer

    Error Receive from client side:
    [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Bootstrap broker broker:9093 (id: -1 rack: null) disconnected
    Error Receive from kafka side:
    [SocketServer brokerId=0] Failed authentication with /47.251.66.91 (SSL handshake failed) (org.apache.kafka.common.network.Selector)

    2 replies
    Miguel González
    @magg
    is it possible to pause/restart a kafka streams app? I have only found this discussion https://groups.google.com/g/confluent-platform/c/Nyj3eN-3ZlQ/m/lMH-bFx-AAAJ about using map to call an external service and loop until some condition completes
    9 replies
    Miguel González
    @magg

    So I'm trying out the Cloud Stream Kafka Binder:

    I have a question regarding the functional programming model. My topology has two input topics and one output topic, so I'm using a BiFunction. In my case I need to do some other processing with the input Kstreams that I receive, like filtering and repartition the streams. So can I create two other Functions that process those input streams and then have the bifunction use them as parameters with Function Composition. or should I do all that processing in the BiFunction?

    2 replies
    Pietro Galassi
    @pietrogalassi
    How i can handle exception producer (as an example org.apache.kafka.common.errors.TimeoutException, org.apache.kafka.common.errors.NetworkException) ?
    16 replies
    Pietro Galassi
    @pietrogalassi
    In a MANUAL_IMMEDIATE scenario how i can create a rebalance listener that commits what i have already pull ?
    11 replies
    Pedro Silva
    @jp-silva
    Hello. I know that this change (spring-projects/spring-kafka#1615) made it possible to set a max age for KafkaProducers when they are created by the DefaultKafkaProducerFactory. Is it possible to achieve the same thing but for the producers created by KafkaStreams/StreamsBuilderFactoryBean ? Thanks
    2 replies
    Kannadasan S
    @Kannadasan89
    How to ensure Spring Cloud Stream Listener to wait to process messages until SpringBoot Application is fully initialized on Start?
    1 reply
    Mateusz Fronczek
    @matfro
    Hey. I'm currently looking into setting up a new Kafka transaction from within the thread started by @StreamListener. I do have a ChainedKafkaTransactionManager in the application context. At first, I tried to utilize @Transactional(propagation = Propagation.REQUIRES_NEW, transactionManager = "chainedKafkaTransactionManager") but to no avail - it failed with "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION". Therefore, I attempted to use kafkaTemplate.executeInTransaction(), however, it seems that it is rolled back together with the outer transaction. How should this be configured in order to achieve separation between two transactions, where the inner one is created from within consumer governed tx? I also tried to set new transactionIdSuffix on KafkaTemplate, still no success. Any help would be much appreciated, thanks!
    8 replies
    Spencer Goldberg
    @goldberg.spencer_gitlab
    What's the appropriate defensive programming for the issues detected in the NetworkClient related to missing listeners or metadata (e.g. UNKNOWN_TOPIC_OR_PARTITION)? My producer has callbacks registered on the kafkaTemplate send() but I don't see them being called (based on logs) at the same time as the NetworkClient warnings in the logs.
    Spencer Goldberg
    @goldberg.spencer_gitlab
    Also if I choose not to implement an onSuccess and onFailure callback, is successful delivery guaranteed when send().get() finally returns? Or do you truly need to have an onFailure callback to detect when the send truly fails.?
    6 replies
    Smark11
    @Smark11
    I have a spring kafka app using @KafkaListener annotation using a ConcurrentKafkaListenerContainer. I have to use ThreadLocal for state, and am trying to clean up state when receiving the ListenerContainerIdle event. I read in the documentation that "By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective.". Does this mean that I can access ThreadLocal when subscribed to the ListenerContainerIdle event alongside the @KafkaListener?
    5 replies
    Pietro Galassi
    @pietrogalassi
    Is there a way to add a shutdown hook so that current event in elaboration in a listener is completed BEFORE pod is shutdown ? (Even in a kill pod scenario for OOM)
    1 reply
    Pietro Galassi
    @pietrogalassi
    Does producer retry take care also of disconnects ?
    2 replies
    Akshansh Jain
    @akshanshjain95
    Do idle consumers account for CPU utilization of application? For ex. I have a topic with 2 partitions and have 100 consumers running in my application for this topic, then the CPU utilization of my app would be higher than if I only had 2 consumers running. (I am creating consumers using .setConcurrency() method of ConcurrentKafkaListenerContainerFactory). Spring version - 1.5.6.RELEASE, spring-kafka version - 1.2.2.RELEASE
    19 replies
    covid2021corona
    @covid2021corona
    Does Spring Kafka uses Log4j2 for logging?Reason for asking is due to this security bug n log4j2
    https://www.lunasec.io/docs/blog/log4j-zero-day
    10 replies
    Pietro Galassi
    @pietrogalassi
    Is there a way during a rebalance to stop the execution the actual event beeing processed by a poll so that this event whould be re elaborated by a different consumer after the rebalance ?
    1 reply
    Виктор
    @Vichukano
    Have this issue when start multiple instances of application. When i have only one instance it's does not reproduces:
    Exception in thread "ForkJoinPool.common-worker-1 org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:743)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:584)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:544)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:519)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:513)
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:683)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:569)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:386)
    at com.nab.ms.hs.lodgement.producer.HardshipCaseSubmitEventProducer.publishHardshipCaseSubmitEvent(HardshipCaseSubmitEventProducer.java:47)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.lambda$publishHardshipCaseSubmitEvent$0(CreateHardshipRequestService.java:108)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.processAccountRequest(CreateHardshipRequestService.java:103)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.processNewHardshipRequest(CreateHardshipRequestService.java:75)
    at com.nab.ms.hs.lodgement.application.HardshipNewRequestService.lambda$processNewHardshipRequest$0(HardshipNewRequestService.java:46)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1728)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
    Caused by: java.lang.ExceptionInInitializerError: null
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:368)
    ... 22 common frames omitted
    Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.subject.TopicNameStrategy for configuration key.subject.name.strategy: Class io.confluent.kafka.serializers.subject.TopicNameStrategy could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:729)
    1 reply
    guybar
    @guybar
    Hello everyone, and a happy new year :-)I have implemented "non blocking retries" in my Kafka consumers. However I am trying to find a way to reprocess all failed messages delivered to the DLT, based on the error message reported in the headers of the messages posted to the DLT. The current DLT handler handles one message at a time when they are posted to the DTL, but I am trying to develop a reprocess mechanism that will enable me to do a mass reprocess of messages that had failed for a specific reason, once the failure reason had was fixed.
    3 replies
    Andras Hatvani
    @andrashatvani
    Hi, is there a way to retrieve and inject the default value serde in a kafka streams app/test?
    2 replies
    Bastien Bouclet
    @bgK
    Hi, when using a batch listener, is there a way to partially positively acknowledge a batch (synchronously commit the offsets up to a point in the batch, and then continue processing the records until finally acknowledging the whole batch) ?
    8 replies
    shravani154
    @shravani154
    Hi. How to modify the existing kafka record header value in producer and consumer records. I don't want to duplicate the key instead I want to override the existing key value.
    2 replies
    Josh Wein
    @JoshWein
    I haven't been able to find any docs on this - is it possible to use a Java 17 Record as a Kafka message? I'm able to produce a message that is a record, but deserialization using 'org.apache.kafka.common.serialization.StringDeserializer' and 'org.springframework.kafka.support.converter.StringJsonMessageConverter' for the batch listener, seems to fail. From my understanding Records are not exactly like Objects when it comes to serialization/deserialization so that may be why it doesn't work out of the box
    2 replies
    learningdeveloper2021
    @learningdeveloper2021

    Approaches to handle 10 million records of 10 tables each with several joins

    I have 10 million records of 10 tables each with several joins. I am looking for the best alternative or DB assign or approach to read the records very quickly that is the query should be fast.

    option#1 - normalize the tables, don't go for joins unnecessarily
    option#2 - add all the columns in 1st query where multiple times the
    where conditions will be used in the looping construct
    option#3 - go for nosql database instead of mysql

    Please advise

    Thanks

    1 reply
    Rajh
    @Rajh

    Hello.
    The DeadLetterPublishingRecoverer use a template resolver to get a valid KafkaTemplate to publish accepted messages.
    But according to this
    https://github.com/spring-projects/spring-kafka/blob/0d430b95ff1c398c7d02978db5a5c0c369901216/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L450

    The template is only based on the value class.

    My messages being <String, String> I created a template for <String, String> messages.

    But how to handle deserializations error cases ?

    • <String, String> => Key error, Value ok => <byte[], String>
    • <String, String> => Key ok, Value error => <String, byte[]>
    • <String, String> => Key error, Value error => <byte[], byte[]>

    ?

    12 replies
    MustaphaGheribi
    @MustaphaGheribi

    Hello,
    I use the stateful retry mechanism so I can retry processing messages a few time with a fixed delay between each attempts and it's not working

            @Bean
        public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                KafkaTemplate<String, Object> template) {
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            configurer.configure(factory, kafkaConsumerFactory);
            factory.setErrorHandler(new SeekToCurrentErrorHandler(
                    new DeadLetterPublishingRecoverer(template), 3));
            factory.setRetryTemplate(retryTemplate());
            return factory;
        }
    
    
        private RetryTemplate retryTemplate() {
            RetryTemplate template = new RetryTemplate();
    
            FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
            backOffPolicy.setBackOffPeriod(1000);
            template.setBackOffPolicy(backOffPolicy);
    
            SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(MAX_ATTEMPTS,
                    Collections.singletonMap(RetryableException.class, true),
                    true);
            template.setRetryPolicy(retryPolicy);
            return template;
        }

    I'm throwing a RetryableException in the KafkaListener .
    Did I miss something ? Thanks

    11 replies
    SauriBabu
    @SauriBabu
    HI
    i am using spring-boot, spring-kafka, micrometer-prometheus in my project .
    my project is message consumer
    anyone knows how metrics could be exposed for this kafka listener ?
    is there any public grafana dashboard which makes use of these metrics ?
    3 replies
    Tiger Wang (王豫)
    @tigerinus

    Hello - a best practice we embrace is to fail a bean initialization when connection check to an external resource fails. For a Kafka consumer bean, what's the best way to check for broker availability during its initialization?

    AFAIK there is no straightforward way to catch Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected warning.

    Thanks!

    Tiger Wang (王豫)
    @tigerinus
    All I can think of is, have a timer to check if onPartitionAssigned() is called within 60sec after startup...and if not, then consider something wrong with the broker and terminate the service.
    I just don't feel it's the best approach.

    A better approach is to construct a KafkaAdmin object during bean initialization, and try to do something like list topics and fail the bean initialization if it can't.

    However I am looking to see if there is a best practice at all.

    Any recommendation?
    Tiger Wang (王豫)
    @tigerinus
    3 replies
    Ebrahim Khosravani
    @mahbodkh

    Hi guys, I stuck with KStream and Java-8. I need your help.
    I was just confused about this method, and I need to create some unit tests with this method. Is there any hint and sample?

    public Function<KStream<String, String>, KStream<String, String>[]> handler() { return transactionStream -> transactionStream .map((key, value) -> extractStatus(key, value)) .branch((key, value) -> true); }

    1 reply
    Pietro Galassi
    @pietrogalassi
    Hi all, is there a way to perform some action right before or after "Shutdown executor service" and "Consumer stopped" are logged by a @KafkaListener ?
    3 replies
    SauriBabu
    @SauriBabu
    Hi , in my project after spring-kafka upgrade from 2.2.3 to 2.7.3 deserializaion started failing
    1 reply
    we are receiving avro message from producer application
    i am also using custom deserializer mentioned here https://gitter.im/spring-projects/spring-kafka?at=5d7535a6ae44a84124a21c3e
    any suggestion about changes in this upgrade which is causing this failure ?
    7 replies
    Zane XiaoYe
    @Zane-XY

    Hi @garyrussell
    It seems that the reactive Kafka consumer stopped once there’s an exception occurred in the processing logic, such as an WebClientResponseException exception.

      receiver.receive()
                    .flatMap(record -> process(record))
                    .doOnError(log::error)
                    .subscribe();

    what is the correct way to avoid that?

    9 replies
    Pietro Galassi
    @pietrogalassi
    Hi, does @KafkaListener goes in graceful shutdown on sigterm (not consuming other messages) ? Should it be configured ?
    14 replies