Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Bastien Bouclet
    @bgK
    Hi, when using a batch listener, is there a way to partially positively acknowledge a batch (synchronously commit the offsets up to a point in the batch, and then continue processing the records until finally acknowledging the whole batch) ?
    8 replies
    shravani154
    @shravani154
    Hi. How to modify the existing kafka record header value in producer and consumer records. I don't want to duplicate the key instead I want to override the existing key value.
    2 replies
    Josh Wein
    @JoshWein
    I haven't been able to find any docs on this - is it possible to use a Java 17 Record as a Kafka message? I'm able to produce a message that is a record, but deserialization using 'org.apache.kafka.common.serialization.StringDeserializer' and 'org.springframework.kafka.support.converter.StringJsonMessageConverter' for the batch listener, seems to fail. From my understanding Records are not exactly like Objects when it comes to serialization/deserialization so that may be why it doesn't work out of the box
    2 replies
    learningdeveloper2021
    @learningdeveloper2021

    Approaches to handle 10 million records of 10 tables each with several joins

    I have 10 million records of 10 tables each with several joins. I am looking for the best alternative or DB assign or approach to read the records very quickly that is the query should be fast.

    option#1 - normalize the tables, don't go for joins unnecessarily
    option#2 - add all the columns in 1st query where multiple times the
    where conditions will be used in the looping construct
    option#3 - go for nosql database instead of mysql

    Please advise

    Thanks

    1 reply
    Rajh
    @Rajh

    Hello.
    The DeadLetterPublishingRecoverer use a template resolver to get a valid KafkaTemplate to publish accepted messages.
    But according to this
    https://github.com/spring-projects/spring-kafka/blob/0d430b95ff1c398c7d02978db5a5c0c369901216/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L450

    The template is only based on the value class.

    My messages being <String, String> I created a template for <String, String> messages.

    But how to handle deserializations error cases ?

    • <String, String> => Key error, Value ok => <byte[], String>
    • <String, String> => Key ok, Value error => <String, byte[]>
    • <String, String> => Key error, Value error => <byte[], byte[]>

    ?

    12 replies
    MustaphaGheribi
    @MustaphaGheribi

    Hello,
    I use the stateful retry mechanism so I can retry processing messages a few time with a fixed delay between each attempts and it's not working

            @Bean
        public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                KafkaTemplate<String, Object> template) {
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            configurer.configure(factory, kafkaConsumerFactory);
            factory.setErrorHandler(new SeekToCurrentErrorHandler(
                    new DeadLetterPublishingRecoverer(template), 3));
            factory.setRetryTemplate(retryTemplate());
            return factory;
        }
    
    
        private RetryTemplate retryTemplate() {
            RetryTemplate template = new RetryTemplate();
    
            FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
            backOffPolicy.setBackOffPeriod(1000);
            template.setBackOffPolicy(backOffPolicy);
    
            SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(MAX_ATTEMPTS,
                    Collections.singletonMap(RetryableException.class, true),
                    true);
            template.setRetryPolicy(retryPolicy);
            return template;
        }

    I'm throwing a RetryableException in the KafkaListener .
    Did I miss something ? Thanks

    11 replies
    SauriBabu
    @SauriBabu
    HI
    i am using spring-boot, spring-kafka, micrometer-prometheus in my project .
    my project is message consumer
    anyone knows how metrics could be exposed for this kafka listener ?
    is there any public grafana dashboard which makes use of these metrics ?
    3 replies
    Tiger Wang (王豫)
    @tigerinus

    Hello - a best practice we embrace is to fail a bean initialization when connection check to an external resource fails. For a Kafka consumer bean, what's the best way to check for broker availability during its initialization?

    AFAIK there is no straightforward way to catch Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected warning.

    Thanks!

    Tiger Wang (王豫)
    @tigerinus
    All I can think of is, have a timer to check if onPartitionAssigned() is called within 60sec after startup...and if not, then consider something wrong with the broker and terminate the service.
    I just don't feel it's the best approach.

    A better approach is to construct a KafkaAdmin object during bean initialization, and try to do something like list topics and fail the bean initialization if it can't.

    However I am looking to see if there is a best practice at all.

    Any recommendation?
    Tiger Wang (王豫)
    @tigerinus
    3 replies
    Ebrahim Khosravani
    @mahbodkh

    Hi guys, I stuck with KStream and Java-8. I need your help.
    I was just confused about this method, and I need to create some unit tests with this method. Is there any hint and sample?

    public Function<KStream<String, String>, KStream<String, String>[]> handler() { return transactionStream -> transactionStream .map((key, value) -> extractStatus(key, value)) .branch((key, value) -> true); }

    1 reply
    Pietro Galassi
    @pietrogalassi
    Hi all, is there a way to perform some action right before or after "Shutdown executor service" and "Consumer stopped" are logged by a @KafkaListener ?
    3 replies
    SauriBabu
    @SauriBabu
    Hi , in my project after spring-kafka upgrade from 2.2.3 to 2.7.3 deserializaion started failing
    1 reply
    we are receiving avro message from producer application
    i am also using custom deserializer mentioned here https://gitter.im/spring-projects/spring-kafka?at=5d7535a6ae44a84124a21c3e
    any suggestion about changes in this upgrade which is causing this failure ?
    7 replies
    Zane XiaoYe
    @Zane-XY

    Hi @garyrussell
    It seems that the reactive Kafka consumer stopped once there’s an exception occurred in the processing logic, such as an WebClientResponseException exception.

      receiver.receive()
                    .flatMap(record -> process(record))
                    .doOnError(log::error)
                    .subscribe();

    what is the correct way to avoid that?

    9 replies
    Pietro Galassi
    @pietrogalassi
    Hi, does @KafkaListener goes in graceful shutdown on sigterm (not consuming other messages) ? Should it be configured ?
    14 replies
    yaboong
    @yaboong

    Hello, I'm using RetryingBatchErrorHandler for a batch listener with the FixedBackOff and it seems to work well.
    However when the error handler retries, no retrying log is shown so there's no way to acknowledge for me whether this handler is retrying or not. It only shows the error log that I configured as ConsumerRecordRecoverer only when the retries are exhausted.

    Is there any way to print the every retrying log while the error handler is retrying?

    Below is the part of the container factory configuration

    factory.setBatchListener(true);
    final var errorHandler = new RetryingBatchErrorHandler(new FixedBackOff(3000, 5), (consumerRecord, e) -> {
        log.error("failed record - {}", consumerRecord);
    });
    factory.setBatchErrorHandler(errorHandler);
    3 replies
    pivotghub
    @pivotghub
    @garyrussell Hello Gary, I have seen places where StringSerDeSer is being used where JsonSerDeSer can be used and vice versa for an object let us say User.class. I do not see anywhere this has been specified what to use and when. Please can you help ? Thanks.
    8 replies
    Billy Conner Jr.
    @billyconnerjr
    I am fairly new to Kafka. I have a producer sending data to a broker. I can use the console consumer to view the messages sent ok. However, it seems that after a few days those messages are no longer there. I thought data sent to kafka would remain there indefinitely. Is there a configuration setting I have incorrect?
    Apocalypse
    @apocalypse32
    Hi, I'm curious how spring kafka client resolve the exact broker address?
    Any documentation?
    9 replies
    Miguel González
    @magg

    Hello guys, I'm using ackMode=MANUAL_IMMEDIATE and receiving on my listener Acknowledgment as a parameter, but I keep seeing this error sometimes on my logs "Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group" full stack trace on:

    https://gist.githubusercontent.com/magg/e1e39d07c1c52010ff7e0b57636f4a71/raw/f3dc440f1f6f30b6b12629ea9c38ffb64e58c774/gistfile1.txt

    on the listener method i'm using acknowledgment.acknowledge() (error is from this line); and I catch a custom exception, I do acknowledgment.nack.

    Any suggestions on how to avoid this

    9 replies
    Miguel González
    @magg
    hey guys if I use manual ack should I remove the use of a SeekToCurrentErrorHandler in my consumer config? If something goes would it be retried twice if I have both configurations (i.e acknowledgment.nack and SeekToCurrentErrorHandler)
    1 reply
    yaboong
    @yaboong
    Hello! I'm currently using kafka batch consumer and it works fine on local environment. But on the development environment (on AWS), it shows this error log and doesn't work at all but I don't know what this log means...
    Is there anyone who knows something about this?
    2022-03-12 19:14:12.176 ERROR 37634 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
    
    java.lang.UnsupportedOperationException: Container should never call this
        at org.springframework.kafka.listener.ListenerInvokingBatchErrorHandler.handle(ListenerInvokingBatchErrorHandler.java:36) ~[spring-kafka-2.6.7.jar!/:2.6.7]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1431) ~[spring-kafka-2.6.7.jar!/:2.6.7]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar!/:2.6.7]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:an]
    7 replies
    Tiger Wang (王豫)
    @tigerinus
    Hello all - Does anyone has any update regarding Reactive Kafka support in Spring? I don't find any reference doc so I am assuming it's still in premature state, but I'd like to confirm. Thanks!
    4 replies
    Oleksii Suprun
    @aleksey-suprun

    Hi All! Does anyone know how to configure spring.json.trusted.packages for Kafka Streams? I'm using the following config:

    @Configuration
    @EnableKafka
    @EnableKafkaStreams
    public class KafkaStreamsConfig {
    
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfig(KafkaProperties kafkaProperties) {
            return new KafkaStreamsConfiguration(Map.of(
                    StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getStreams().getApplicationId(),
                    StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getStreams().getBootstrapServers(),
                    StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
                    StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
                    StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()
            ));
        }

    My application.yml is the following:

    spring:
      kafka:
        streams:
          bootstrap-servers: XXX.XXX.XXX.XXX:9092
          application-id: spring-kafka-test
          properties:
            processing.guarantee: exactly_once
            spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*

    But when I change the package of the entity published into Kafka, I'm receiving the following exception:

    Caused by: java.lang.IllegalArgumentException: The class 'com.mypackage.streams.entity.kafka.raw.Entity' is not in the trusted packages: [java.util, java.lang, com.mypackage.streams.entity.kafka, com.mypackage.streams.entity.kafka.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
        at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
        at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
        at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        ... 9 more

    Looks like the property added to the appplication.yml does not work.

    2 replies
    Rajh
    @Rajh
    Hello, is Authentication/Authorization Exception and no authExceptionRetryInterval set trigerring Fatal consumer exception; stopping container considered a normal termination as for isInExpectedState() ?
    1 reply
    Diogo Almeida
    @diogojpb

    Hello everyone,
    We have made some refactorings regarding the Kafka client configurations in the sub-modules (Gradle) of our project and we started to suffer with IllegalStateException("Expected 1 but got 0 partitions") during the integration tests.
    Inside a @BeforeAll function, we expect containers to be properly assigned to topics and call ContainerTestUtils.waitForAssignment() for that. However, we are getting the exception mentioned above and initially, there has been no change in our clients' settings.
    We use spring-boot:2.6.5 (with spring-kafka:2.8.4) and our test broker runs on docker.

    Any suggestion in this regard? The only way we found to work around the situation was by adding a Thread.sleep(10_000) (which is not an option, of course) in @BeforeAll or manually defining the Topic and Partition in @KafkaListeners.

    4 replies
    Laurensius Bernhard
    @l7777777b
    Hi, im using confluent kafka and spring kafka. My question is how active connection is counted? If a single app consuming 2 topic with total 5 partition with concurrency value 2, and producing to 1 topic with 3 partition, how many connection i will create? Because i feel like there are too many connections are showing in the metric, probably around 10-15 connections for that single app.
    1 reply
    Miguel González
    @magg
    hey guys, I have an app that is deployed on Kubernetes with two containers where I'm pausing programmatically the MessageListenerContainer using pausePartition() method. I'm facing a strange issue where the listener is losing some assigned partitions with no rebalance (at least according to the logs). The containers are assigned 8 partitions each on deploy. the method getAssignedPartitions at first is reporting 8 partitions for each of the containers but after some time some are lost... while most of the other ones are paused. Is there a way to debug these kinds of problems? Also I tried to stop and start the MessageListenerContainer hoping to trigger a rebalance and have those lost partitions return but that didn't help. Is there a way to trigger a rebalance programmatically?
    6 replies
    Miguel González
    @magg
    hello, can I have KafkaListener be multi threaded? If so will a single thread will be assigned to a single partition?
    11 replies
    Miguel González
    @magg
    after setting concurrency greater than 1, I'm wondering if I can pause the listeners in the same fashion I was doing before I was using
    KafkaListenerEndpointRegistry registry;
    MessageListenerContainer beListener = registry.getListenerContainer(ID);
    beListener.pausePartition(topicPartition);
    8 replies
    Kannadasan S
    @Kannadasan89
    Hi, How to periodically update or refresh metdata in background after N mins... to avoid force update of metadata update when it expires while calling send ()
    2 replies
    Adrian Soria
    @adrianSoria
    We are using spring-kafka's producerTemplate. We are using the send() function async. Can we potentally loose messages using async producer send? Should we use sync send with send().get() if we dont want to loose messages?
    5 replies
    Smark11
    @Smark11
    Hi, is there a best practice for having visibility into the health of all KafkaListeners? Our team is currently trying to create a HealthCheck Endpoint that shows the health of all listeners
    6 replies
    Rajh
    @Rajh

    Hi, I had my thread blocked in a kafkaconsumer in my application.
    StackOverflow

    I was wondering if there was anything that could detect this and restart the container or update an actuator healthcheck to trigger a pod restart in a kubernetes environment

    1 reply
    Gary Russell
    @garyrussell
    @gerald24 Please move this to the spring-cloud-stream room.
    1 reply
    Zane XiaoYe
    @Zane-XY
    Hi @garyrussell , do you know if reactor-kafka supports the similar flow control mechanisms like alpakka did? https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations
    4 replies
    Billy Conner Jr.
    @billyconnerjr
    I am sending data to a Kafka broker in Avro format. I have a consumer that will sink this data to MySQL Database. However, the connector fails during deserialization because it's trying to use the incorrect schema (using a different schema ID and displays the incorrect schema name). My connector config specifies which topic it should use, but it still will not use the correct schema. Has anyone else seen this behavior? I can't think of why or how this would happen.
    1 reply
    WissamSa
    @WissamSa
    hey guys, I'm stuck with spring kafka stream with window error consOprStream-KSTREAM-MAP-0000000012-repartition. A serializer (key: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer / value: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer) is not compatible to the actual key or value type (key type: com.kafka.OPERATION_LAST_TIMESTAMP_KEY / value type: com.kafka.OperationLastTimestamp.envelopes.MessageImpl)
    1 reply
    Zane XiaoYe
    @Zane-XY
    Hi @garyrussell
    What is the different between maxInFlight option in reactor-kafka sender and Kafka’s max.in.flight.requests.per.connection?
    What doesn’t reactor-kafka take the Kafka’s configuration as the indicator for backpressure?
    1 reply
    Josh Wein
    @JoshWein

    Is there an easy way to copy a Message exactly to forward it to another topic?
    I've found that a Message received using

    @KafkaListener(
          id = "my-group-id",
          topics = "topic",
          containerFactory = "containerFactory"
      )
      public void listen(Message<Object> message,
          Acknowledgment acknowledgment
      )

    The headers all have kafka_ or kafka_received prepended to them.
    So if I try to forward this message it fails because topic, id, partition etc are all null

    2 replies