Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Rajneesh
    @RajneeshBiz
    The class 'com.bizongo.ums.kafka.TemplatePayload' is not in the trusted packages: [java.util, java.lang, com.bizongo.service.notification.kafka]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    facing this issue
    private Map<String, Object> consumerConfig() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, Boolean.FALSE);
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    return config;
    }
    2 replies
    I have above consumer configuration
    yevsh
    @yevsh

    I am using spring-integration where I read from kafka.
    message has headers , with type attribute that causes issues of Deserialization .

    I want to disable headers use - I tried everything:

    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
    properties.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
    return new DefaultKafkaConsumerFactory<String,String>(properties,new StringDeserializer(),new JsonDeserializer( Item[].class,false));
    }

    tried to pass:
    -Dspring.cloud.stream.kafka.streams.binder.configuration.spring.json.use.type.headers=false
    -Dspring.json.use.type.headers=false
    -Dspring.kafka.consumer.properties.spring.json.add.type.headers=false

    nothing works, it still reads from header

    36 replies
    Rajh
    @Rajh

    Hello, I'm using @RetryableTopic and I was wondering if it was possible (depending on exception thrown class)

    • Send to retry topic
    • Send to DLT directly
    • Use the default exception handler (containerFactory.setCommonErrorHandler(new DefaultErrorHandler(new ExponentialBackOff()));) to perform an infinite retry ?

    Actually I couldn't use the include + exclude together.

    Here is the use case :

    • When the listener throws a temporary recoverable exception => send to retry topic to retry in 10 seconds
    • When the listener throws an irrecoverable exception => send to DLT directly
    • When the listener throws a network related exception => retry indefinitly with exponential backoff
    7 replies
    ElenaVuchkova
    @ElenaVuchkova

    Hello, guys !
    I have a task to introduce idempotency keys as part of the event consuming.
    For this purpose, I've created an implementation of RecordInterceptor<K, V>.
    Within it, I would like to store the idempotency key into the database.
    It should happen as part of the same transaction as the event processing(within the @KafkaListener).
    So I can guarantee that if an exception occurs during event processing, storing of idempotency key will be rolled back.

    Could you please advice me how can I achieve that?

    Here is my code example:

    @Component
    public class IdempotencyKeyConsumerInterceptor<K,V> implements RecordInterceptor<K, V> {
        @Override
        @Transactional
        public ConsumerRecord<K, V> intercept(final ConsumerRecord<K, V> record) {
            String idempotencyKey = getIdempotencyKey(record.headers());
            return saveIdempotencyKey(idempotencyKey, record);
        }
    }

    And a listener:

    @Component
    public class IdempotencyKeyTestListener {
    
        private final LibraryUserService libraryUserService;
    
        @KafkaListener(topics = JSON_TOPIC_NAME_NEW, groupId = GROUP_ID, autoStartup = "false")
        @Transactional
        public void testListener(@Payload final IdempotencyTestJsonEntity testJsonEntity) {
            log.info("Idempotency test listener invoked.")
            libraryUserService.saveLibraryUserEntity(testJsonEntity.getMessage());
        }
    }

    Thanks in advance.

    45 replies
    Sini
    @Sini_P_twitter

    Hello, we've been using Spring Kafka happily for a long time, but now I have a use-case I couldn't solve yet.
    So we use many @KafkaListeners to listen to a few topics and would like to use only 1 Kafka Consumer in the whole application. As I see Spring Kafka will create many Kafka Consumers in the background, that is generally fine, but in this use case we'd like to use only 1 so that we can use the copartitioning feature of the RangeAssignor rebalancing strategy in the Kafka Client.

    Is there a way to annotate the @KafkaListeners or use own own Container in a way that it only creates 1 Kafka Consumer in the background?

    5 replies
    Rajh
    @Rajh
    Hello, when using @RetryableTopic is the containerFactory.setRecordFilterStrategy() configuration ignored ?
    4 replies
    Pietro Galassi
    @pietrogalassi
    Hi all, spring 2.4.3 we are experiencing VERY high rebalancing time (even 40 minutes!). Actual configuration is 24 partition and concurrency on consumer set to 24. We have pod scaling 2 to 6. When a scaling occurs rebalancing starts and it takes lotsa of time. Also other rabalancing occurs during elaboration. Any advices ?
    8 replies
    jvm-man
    @jvm-man
    Hi All! I have a Spring Cloud Stream application with Kafka binder, and I wanted to black box test it with an other application, which is sending in some messages then asserting on certain results in the DB. The catch is that the message ordering does matter, and some messages need to go through several processing steps which are too hard to assert on. I've introduced a consumer lag measurement service, and I only send the next one if the lag is zero constantly in a window, but the issue is that most of the times the processing is too fast to catch the flip in the values. Do you have any suggestion how to do this properly?
    1 reply
    kkamath30
    @kkamath30

    I am currently using spring kafka with spring cloud stream binder kafka and I am successfully able to produce and consume messages. In addition, i wanted to also connect to Azure eventhub and hence, when i add the spring cloud azure binder dependency in the pom.xml, it results in spring kafka code not working and results in APIClient exception and client getting disconnected in infinite loop.

    I use the spring cloud azure version as follows and follow azure documentation to connect to eventhub

    <spring-cloud-azure.version>4.3.0</spring-cloud-azure.version>

    Using this library and binder, i get the following in logs

    Received error INVALID_REQUEST from node -1 when making an ApiVersionsRequest with correlation id 9. Disconnecting.

    Can someone point to me if i need to seperate azure eventhub project into a seperate spring boot application?

    1 reply
    Albert-Guan
    @Albert-Guan
    Hello! I am using @KafkaListener and @RetryableTopic those two annotations. I have multiple main topics defined under KafkaListener. Is there a way to just create one retry topic for those main topics?
    9 replies
    Tim Ysewyn
    @TYsewyn

    Hi Gary & Artem! 👋
    We noticed that KafkaTemplate always closes the producer if it’s not a transactional template even though the docs state the following:

    When not using Transactions, by default, the DefaultKafkaProducerFactory creates a singleton producer used by all clients, as recommended in the KafkaProducer javadocs. However, if you call flush() on the template, this can cause delays for other threads using the same producer. Starting with version 2.3, the DefaultKafkaProducerFactory has a new property producerPerThread. When set to true, the factory will create (and cache) a separate producer for each thread, to avoid this issue.

    Even though we have one producer we will open and close a new connection to our Kafka cluster for every message we send.
    We were wondering if this is intentional (and the reasoning behind this) or if it’s a bug in the code. Thanks for your feedback!

    See https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java#L703-L70

    2 replies
    thalleslmF
    @thalleslmF

    I am currently using spring kafka with spring cloud stream binder kafka and I am successfully able to produce and consume messages. In addition, i wanted to also connect to Azure eventhub and hence, when i add the spring cloud azure binder dependency in the pom.xml, it results in spring kafka code not working and results in APIClient exception and client getting disconnected in infinite loop.

    I use the spring cloud azure version as follows and follow azure documentation to connect to eventhub

    <spring-cloud-azure.version>4.3.0</spring-cloud-azure.version>

    Using this library and binder, i get the following in logs

    Received error INVALID_REQUEST from node -1 when making an ApiVersionsRequest with correlation id 9. Disconnecting.

    Can someone point to me if i need to seperate azure eventhub project into a seperate spring boot application?

    2 replies
    Ken Gullaksen
    @kenglxn
    Hi. When using DefaultErrorHandler with ExponentialBackOff to retry indefinitely, is there a way via spring kafka to log that the listener method failed? I have attempted using the retrylistener facet on the DefaultErrorHandler, but it does not seem to log while retrying. But only if retries are exhausted and the recoverer is called. In our case we are using the default which is to retry indefinitely with no recoverer being called.
    It would be great to have the retry be logged with some metadata like offset, partition, attempt # and such.
    7 replies
    ikyatov
    @ikyatov
    @garyrussell
    @garyrussell Good evening! You answered on this issue https://stackoverflow.com/questions/69830098/kafka-producer-is-not-recreated-with-refreshscope. Can you advise if you have found any solution?
    thuongtlh
    @thuongtlh

    Hi everyone, I have an issue regarding Kafka - Event Hub. I made a demo based on https://blog.nebrass.fr/playing-with-spring-boot-and-kafka-on-azure-event-hub/ and it's working fine, I can have a listener listen to the Azure topic. However, when I try to integrate the same code into my company project, it has a problem related to the SSL handshake. Here is the log:
    [2022-08-15 10:25:42][ERROR] o.apache.kafka.clients.NetworkClient.processDisconnection 739 - [AdminClient clientId=first-producer-on-the-planet] Connection to node -1 (<MyNameSpace>.servicebus.windows.net/<myIP>:9093) failed authentication due to: SSL handshake failed

    While event hub uses SASL_SSL/PLAIN and it's working fine on the demo (same Kafka client/Kafka Spring version with company app). Hope that anyone can shed some light.

    2 replies
    thuongtlh
    @thuongtlh
    Hi Mr. @garyrussell and all,
    I am handling the situation that my application needs to have 2 Kafka configurations at the same time.
    I think it is the same requirement as mentioned in this thread: https://stackoverflow.com/questions/63110194/spring-kafka-configuration-for-2-different-kafka-cluster-setups
    I tried to make a small demo that describes my issue, each config will work fine but I cannot have both config working at the same time with 2 topics. Could you please take a look and advice me?
    9 replies
    kr1929uti
    @kr1929uti
    Hi everyone, I have a kafka setup with postgres as my source and sink. I am trying to implement a scenario where any ddl changes in postgres source connector (such as column addition, deletion, update on column, column type change) should reflect in my sink postgres table. I already have auto.evolve=true in my sink connector configuration (using JDBC sink connector), but it is not fulfilling the requirements. Any suggestion on this?
    kr1929uti
    @kr1929uti
    Hi all,
    Whenever I encounter schema changes (ddl changes coming in), I want to automate the table backup and table deletion and then table creation with the new schema. (I am using postgres db and I have kafka setup). Any suggestions on how to go about automation?
    Jorick Caberio
    @jorick.caberio_gitlab
    How can I set auto.offset.reset in my application.properties file? I tried using spring.kafka.consumer.auto-offset-reset=latest.
    Im using kafka streams and it creates 2 consumers. One have auto.offset.reset = noneand the other one is auto.offset.reset = earliest based on the startup logs
    1 reply
    mrunal-badhe
    @mrunal-badhe

    Hey @garyrussell and everyone,

    Can someone please help me understand what's going wrong here?

    I want to implement delayed queue (DLQ) using ConcurrentKafkaListenerContainer pause-resume functionality.
    When a message is received on a DLQ, we'll check the last processed time and want to pause the container if it is not aged enough.

    Since the same logic needs to be implemented for all other DLQs as well, I have used MethodKafkaListenerEndpoint to create containers.
    If the message is not aged enough, I throw an exception and I call registry.allcontainers.pause, since I want to pause all other DLQ listeners as well.

    It works fine, the offsets are not committed and I see on the broker for the given group, there's a lag of 1.
    I have set setAckAfterHandle to false in DefaultErrorHandler as well.

    However, when I call registry.allcontainers.resume this last message is not getting consumed.
    If I send a new message to the DLQ then it will pick the latest message and reset the lag to 0.
    However, I am losing 1 record in this case.

    7 replies
    ankisinha3005
    @ankisinha3005
    org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.InvalidTopicException: Topic 'savings.account.check.eligibility' is invalid
    4 replies
    Getting this InvalidTopicException for topic "savings.account.check.eligibility" . Can someone help?
    AGE 17
    @JaxBanlama_twitter
    I have upgrade my spring boot version to 2.7.4 and it requires spring kafka 2.9.0 which requires kafka client +3.0.0 and consequently it requires kafka +3.0.0 which we do not have. Our kafka version is 2.8.1. When I used spring boot 2.7.4, spring-kafka: 2.9.0 and kafka client&stream:3.0.0 the micro service works but is there any risk you see in the future? Thanks a lot.
    1 reply
    p0tat03chipz
    @p0tat03chipz_twitter
    what's the easiest way to add kafka streams metrics to actuator? this
    @Component
    public class Processor {
       @Autowired
       public void process(StreamsBuilder builder) {
         // do stream processing stuff
      }
    }
    p0tat03chipz
    @p0tat03chipz_twitter
    is my exisiting streams processing code. it works but i dont know how to expose the kafka metrics to an actuator endpoint
    2 replies
    karanbir8080
    @karanbir8080
    Hi, I needed to know if i can use spring-kafka in a normal java application without spring boot.
    If so, are there any downsides or performance issue that I need to take care ?
    6 replies
    Abayomi Akinfemiwa
    @olayodepossible_gitlab

    Hi @garyrussell and team, please I need help I am a newbie in this space, and I have an issue getting my configuration right, I am trying to send the same message to different Kafka-topic for different consumers to consume.
    but I am not getting any messages at the consumer end, while the producer is sending the message successfully. I guess it's from my application.yml file where I have the config setup.

    This is the Producer config

    spring:
      data:
        mongodb:
          authentication-database: admin
          user: ${mongodb_user}
          password: ${mongodb_password}
          host: localhost
          port: 27017
          database: orderServiceDB
    
      application:
        name: OrderService
    
      stream:
        kafka:
          binder:
            brokers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}
            auto-add-partitions: true
            min-partition-count: 2
        bindings:
          orderApproval-in-0:
            destination: vendorServiceTopic, paymentService
            content-type: application/*+avro
            group: vendorServiceGroup
            consumer:
              max-attempts: 4
              back-off-initial-interval: 10000
          sendOrder-out-0:
            destination: orderServiceTopic
            content-type: application/*+avro
            producer:
              partitioned: true
              partition-key-expression: headers['partitionKey']
              partition-count: 2
    
        zipkin:
          base-url: http://localhost:9411/
        sleuth:
          sampler:
            probability: 1

    And these are the Consumers

    spring:
      data:
        mongodb:
          authentication-database: admin
          user: ${mongodb_user}
          password: ${mongodb_password}
          host: localhost
          port: 27017
          database: productServiceDB
      cloud:
        schema-registry-client:
          endpoint: http://${SCHEMA_REGISTRY_HOST:localhost}:${SCHEMA_REGISTRY_PORT:8081}
        stream:
          kafka:
            binder:
              brokers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}
              auto-add-partitions: true
          bindings:
            sendOrder-in-0:
              destination: orderEvent
              content-type: application/*+avro
              group: orderServiceGroup
              consumer:
                instanceCount: 2
                instanceIndex: 2
                max-attempts: 4
                back-off-initial-interval: 10000
    spring:
      data:
        mongodb:
          authentication-database: admin
          user: ${mongodb_user}
          password: ${mongodb_password}
          host: localhost
          port: 27017
          database: customerServiceDB
    
      application:
        name: CustomerService
      cloud:
        schema-registry-client:
          endpoint: http://${SCHEMA_REGISTRY_HOST:localhost}:${SCHEMA_REGISTRY_PORT:8081}
        stream:
          kafka:
            binder:
              brokers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}
              auto-add-partitions: true
          bindings:
            customerService-in-0:
              destination: customerOrderEvent
              content-type: application/*+avro
              group: orderServiceGroup
              consumer:
                max-attempts: 4
                back-off-initial-interval: 10000
    
      zipkin:
        base-url: http://localhost:9411/

    when I run the code I get the following but I am still confused, can someone explain to me like a baby

    partitions assigned: [customerOrderEvent-in-0-0]

    partitions assigned: [orderEvent-in-0-0]
    thanks

    2 replies
    Francis
    @francis-a

    Hey all, I have a question about a ConsumerInterceptor and why onCommit may not be being called.

    I have a consumer app configured with an annotation based listener. Auto-commit is false and the listener is configured with ack-mode set to batch.

    I'm attempting to write an integration test that verifies records are committed. I'm doing this by configuring a producer in a test configuration along with a ConsumerInterceptor. The test opens a transaction and uses the injected KafkaTemplate to publish a message. The test then effectively asserts ConsumerInterceptor::onCommit is called.

    This never happens because it seems like commitSync/async is never called.
    https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java#L3231
    The commits list in KafkaMessageListenerContainer is always empty.

    While the test itself here is not super important I do want to make sure I'm configuring everything correctly. The test itself could in theory just check the configuration ack-mode. That said I'm really not sure why commit(A)sync is never called. Anyone happen to have any insights here?

    8 replies
    prapurna-manda
    @prapurna-manda
    Hello All, Can we add custom Tags for kafka similar to DefaultWebMvcTagsProvider in spring boot? or is there any other way to do it ? @garyrussell
    6 replies
    cowlike
    @cowlike_gitlab

    I'm having a problem with MirrorMaker. If this isn't the correct forum, can you please let me know where I should ask?

    During the replication process, this line of code is returning null for upstreamTopic, which creates a NullPointerException on the next line. I can't find an associated Kafka issue. Has anyone ever experienced this? It seems like a Kafka bug.

    2 replies
    Adrian Soria
    @adrianSoria

    Hello we are trying to achive at least once with spring-kafka
    Consumer:

    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=false
    ackMode = AckMode.RECORD

    Producer

    spring.kafka.producer.acks=all
    spring.kafka.producer.retries=2147483647
    spring.kafka.producer.properties.enable.idempotence=true
    spring.kafka.producer.properties.max.in.flight.requests.per.connection=1

    Will the above config give us at least once ? Or are we missing something?

    2 replies
    Stephen Murby
    @stephen-murby

    Just picked up spring-projects/spring-boot#32984 on #spring-boot and I think I need to add a new field to ContainerProperties

    Do I need to submit a pull request to #spring-kafka and have it merged, then update the library version in #spring-boot and make the associated change there?

    Is there a way I can bundle these pull requests together?

    1 reply
    Mahdi Amini
    @bmd007
    hi, anyone managed to bring up single node kafka with kraft (docker compose)? I keep getting KAFKA_ZOOKEEPER_CONNECT is required.
    Mahdi Amini
    @bmd007
    Nuno Marujo
    @nhmarujo
    Hi everyone. I am trying Spring Boot 3.0.0-RC2to slowly prepare to the new version that soon will be available. I am no longer able to find classes like org.springframework.kafka.test.EmbeddedKafkaBroker in spring-kafka-test. I also can’t find any documentation refering that it was moved somewhere else or replaced.
    Can you please point me what were the changes done in that regard? Thank you
    11 replies
    liudianwei
    @liudianwei
    image.png
    8 replies
    Hi everyone, could we config max.poll.interval.ms in yml?
    Pietro Galassi
    @pietrogalassi
    Hi everyone. We are facing a very strange issue. Sometimes our KafkaStreams stops consuming messagges for a long period (maybe missing 1 or 2 messages) and then after a long pause it starts consuming it again. Looking at the kafka cluster there is no consumer when aggregation is stopped and there are consumer when is alive (CALC_AGGREGATOR_GROUP_ID-6fc16e81-fdd6-4748-896a-d195b4f9b376-StreamThread-1-consumer). How is possible that consumer is removed while running ?
    1 reply
    Pietro Galassi
    @pietrogalassi
    Hi everyone. Is it possible to start KafkaConsumer in lazy mode ? I think is impossible ( a consumer should ear on the topic ). Let me know. Thanks.
    11 replies
    benitocm
    @benitocm
    Hi folks. I am using Spring Kafka to consume and replying records with String Key and JsonNode (Jackson) value. I am using a consumer with @KafkaListener and @Sendto annotation to reply. The thing is that when the reply is done the JsonNode is interpreted as an Iterable and several Kafka records are sent instead of just one (the one with an entire JSON value). is it possible to configure Spring Kafka to send only one record with the JsonNode value? Thanks in advance
    10 replies
    Priyanshu Shukla
    @priyanshus1
    Hi Folks,
    When using kafka replying template, what happens if a consumer rebalancing happens.
    Suppose a reply is expected for Consumer A and Consumer A dies! and Consumer B is assigned to that particular partition.
    Will a event still be consumed and correlated correctly to respective thread ?
    2 replies
    Choubani Amir
    @amirensit

    Hello.
    Any help please about how RangeAssignor works.
    From javadoc, they say
    "
    The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition.

    For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.

    The assignment will be:

    C0: [t0p0, t0p1, t1p0, t1p1]
    C1: [t0p2, t1p2]
    "
    Is that correct ??
    For me it should be

    C0: [t0p0, t0p1, t1p0 ]
    C1: [t1p1, t0p2, t1p2] because " the number of partitions to assign to each consumer is 3"
    Am I wrong ??

    3 replies
    Pierpaolo Frasa
    @Fryie

    Hi, if I run a spring application with @KafkaListener annotated consumers on multiple server instances, is KafkaListenerEndpointRegistryaware of consumers on all the instances? Or would it only know listener containers on its own server instance?

    I ask because I need to pause a specific topic whenever a particular operation is executing, and I thought of using MessageListenerContainer.pausePartition() for it, but it should pause the partition on consumers on all server instances of course.

    9 replies
    vijaygopal
    @vijaygopal
    I am using Spring Kafka and running into serialization and deserialization issue. Basically I have a spring boot program with Spring Kafka. As long as I run the program as producer or consumer I don’t see any issues. But if I run the same program to both produce messages to topic-A and listen to messages coming from topic-B then I run into deserialization errors in Producer (which is confusing). Producer and consumer have their own configs and producer produces a different POJO to serialize and consumer de-serializes different POJO, but I am failing to understand why deserializer is invoked while messages are being produced by producer. Can someone shed some light on this?
    4 replies