Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Pietro Galassi
    @pietrogalassi
    Is there a way to send message in batch using KafkaTemplate ? I need to send 20k messages all together.
    24 replies
    Tomas Alabes
    @tomasAlabes
    Hi, I'd like to validate a setup that's giving me a headache (using kafka 2.6.1). All flows below should be transactional.
    I have many @KafkaListeners using a MyListenerContainerFactory, created with a ChainedTM, which has a KafkaTM created with MyListenersProducerFactory. This PF has a static txn id. As part of this consumer flow, I want to produce events, all in 1 txn.
    Also, I want to produce events from a browser request (produce-only flow), KT's created by ProducerOnlyFactorys, each with unique txn id's.
    Questions:
    • Using MyListenersProducerFactory to create the KafkaTemplates for the listeners' producers, I'm getting Group X has received offset commits from consumers as well as transactional producers. Mixing both types of offset commits will generally result in surprises and should be avoided. Should these templates be non-transactional? What's the right way to create those producers?
    • Can I avoid thinking about "am I in a consumer flow or producer-only flow?" to know what type of KT to use? As it can be error-prone to devs (ideally all KTs being created the same way by the same transactional PF)
    • If I'm not wrong, with the latest kafka you can set unique txn ids to all producers and simplify it a bit.
    8 replies
    Tomas Alabes
    @tomasAlabes
    Have you ever faced this exception in brokers when producing events? java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read from segment FileRecords(size=1966, file=/var/lib/kafka/__consumer_offsets-0/00000000000000000000.log, start=0, end=2147483647) I've been trying everything to debug this but I'm not being able to pin what's causing it (I had to update this kafka broker server properties: https://gist.github.com/tomasAlabes/0bd3e03546e399db6c6e6b8d4a78686b)
    1 reply
    Andev
    @mrmisterious_gitlab

    Hi guys, I think I'm probably missing some theory regarding produce/consume json.
    What I'm trying to achieve is

    • from producer microservice, send json object
    • from consumer microservice, consume the json objects sent by any producer microservice.
    • set the configuration using application properties.

    Point is, I want for my consumer microservice, to consume multiple json objects from multiple producers, so the property spring.kafka.consumer.properties.spring.json.value.default.type I thinks is not enough for this case.

    Now, problem is, if I dont use spring.kafka.consumer.properties.spring.json.value.default.type, I get No type information in headers and no default type provided error.

    So after reading the documentation, I thought spring.kafka.consumer.properties.spring.json.type.mapping and spring.kafka.producer.properties.spring.json.type.mapping could match my requirements, but after setting them as

    spring.kafka.producer.properties.spring.json.type.mapping=test:producerpackage.producerobject
    spring.kafka.consumer.properties.spring.json.type.mapping=test:consumerpackage.consumerobject

    I'm still getting the same error.

    Maybe json type mapping property is not the one suitable for this purpose, or maybe is not even possible to achieve this.

    19 replies
    文金龙
    @wenjinlonggithub
    Hi, Is there any example on how to change multi-language on every HTTP request by setting the header a special language type, using feign interceptor to intercept every HTTP requesting?
    3 replies
    yuristpsa
    @yuristpsa
    Hey all,
    Is there any way to define different backoffs according to the exception type in error handler in listener factory ?
    I managed to get close to a solution using retry template, but it competes with error handler.
    4 replies
    Rajh
    @Rajh

    Hello, I'm using KafkaStream with props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer); with a DeadLetterPublishingRecoverer while my application is configured with String key and Avro value.

    Once I got a deser error the DLT publication fails with :
    org.apache.kafka.common.errors.SerializationException: Can't convert key of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in key.serializer

    But when I'm using kafka (without stream) its working.

    What am I doing wrong ? Should I change the KafkaTemplate for DeadLetterPublishingRecoverer into a byte[],byte[] ? Will it still work with normal kafka ?

    10 replies
    Sean Farrow
    @SeanFarrow
    Hi all, ARe there any examples of using the embedded broker from spring-kafka-test with scalatest?
    1 reply
    Johan
    @ninhc1010l
    Hi guys. Any solution to delay a consumer? I just want to publish an event, and it will be consumed in 15 mins after published
    1 reply
    Ashish Meshram
    @ashishjmeshram_twitter
    Hi All, I have two separate application. One Producer and One Consumer. Producer is sending message with value as a custom User object. I am trying to consume that user object at consumer application but I am getting .MessageConversionException: failed to resolve class name. Class not found
    These are two different application, each with User object but in different packages.
    
    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
          group-id: group_id
          auto-offset-reset: earliest
          key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          properties:
            spring.json.trusted.packages: '*'
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
    @KafkaListener(id = "myId", topics = "topic1")
        public void listen(User user) {
            System.out.println(user);
        }
    I am using latest version of spring boot and kafka
    Ashish Meshram
    @ashishjmeshram_twitter
    It works if same application is producing and consuming messages
    1 reply
    ChandanPadalkar
    @ChandanPadalkar
    Yes in same application we can have producer and consumer
    Sean Farrow
    @SeanFarrow
    Hi, is there a way of preventing a spring up from starting if a Kafka topic does not exist?
    1 reply
    Aleh Danilovich
    @MCPDanilovich
    Hi, Is it possible to start partition reassigment when we use cooperative-sticky protocol. I have 1 topic (100 partitions) and 100 consumers in general (5 k8s pods with 20 consumer threads with the same group.id). Consumers are devided evenly in general. But in situation then i want scale my app up to 10 pods for example, all new consumers still stay unassigned to any partition. Protocol = sticky. Is it possible to use part of consumers from all new pods ? For example 10 pods - 10 consumer threads per pod ?
    1 reply
    Tomas Alabes
    @tomasAlabes
    One perf question, I have 2 instances of my service, each with 100 consumers consuming around 30 topics with 128 partitions each (concurrency: 1). As soon as the 2nd instance goes up, the rebalancing spikes the CPU with a loop of Attempt to heartbeat failed since group is rebalancing making the services unusable (3 cpu limit). Is there any particular tip you could give to reduce such load? I can't use static membership because I have kafka 2.1.1. Thank you!
    4 replies
    Hushen Savani
    @hushensavani
    Hello, I'm using Kafka listeners in Spring boot project. I want to upgrade Kafka dependencies in Spring boot... pls share some pointers which dependencies to change?
    I want to upgrade to 2.6.6 in order to use RecordInterceptor
    1 reply
    Laurensius Bernhard
    @l7777777b
    Hi not a really technical question, im consuming a betting transaction (a very high throughput) using spring kafka, alot of optimisation is ongoing to support this since currently its cant handle 100msg/second or about 40kbps fast enough, its constantly lagging behind. before im optimising my big transactional process, im trying to implement batchlistener. is it going to help?
    2 replies
    ashevade1
    @ashevade1
    I am new to using spring-kafka, So please excuse me if this is a basic questions. To Consume messages, we specify a use the @KafkaListener annotation with topics and a group id specified. FOr e.g @KafkaListener( topics = "topic1", group-id="group-1") This works fine. But my requirement is to get the list of topics at Run time, based on some business logic. Is there a way this can be done ? For eg want something like this List<String> topics = getTopics() ; for(String topic: topics) { // start consuming messages for each topic }.Thanks for any help.
    Yuen Fung Lee
    @leeyf1995_gitlab

    Hi all, I got a technical question about running spring-kafka version 2.2.7 with AWS MSK (Kafka version 2.2.1)

    I have a consumer application with two running counts in the container platform. The consumer application uses one consumer group and listens to two topics A and B

    After running normally for few weeks without any restart, we have suddenly run into an error indicating the application could not commit the message in topic A.
    The commit exception:

    Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

    Right after the commit exception, there are two messages showing

    1. group coordinator (id rack null) is unavailable or invalid will attempt rediscovery
    
    2. discovered group coordinator

    Since we are using SeekToCurrentHandler, the application could still process the next message, just that the processed message could not be committed to Kafka. The same error messages would appear for each message.

    During the error period, messages in another topic B could be processed totally fine.

    I have then rolling restarted the application runtimes and the problem is resolved.

    The application is deployed to three different QA environments and only one of them has faced this issue.

    2 replies
    mailniranjan
    @mailniranjan
    Currently in consumer interceptor we save some metrics info like when was the last record fetched from kafka, etc. We have a requirement to save kafka consumer metrics as well. I know we have KafkaConsumer's metrics method available to get the metrics info. But in interceptor, we don't have access to the consumer object. Please guide me on how to get access to consumer object in consumer interceptor?
    6 replies
    Deepak Emani
    @deepakemani-maersk
    Hi, Can you please point to any example done for Spring cloud contract with avro message? Looks like there needs more set up than a simple kafka container.
    1 reply
    infilitry
    @infilitry
    Hi guys, I keep running into this issue Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. This is even after making sure the configs on kafka are fine
    Any idea what I'm missing...?
    infilitry
    @infilitry
    nvm, I've worked it out
    deepak21singh
    @deepak21singh
    I am trying to implement the spring cloud config server with auto refresh properties with kafka bus...i am getting following error while i am using <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-config-monitor</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
    </dependency>

    Description:

    An attempt was made to call a method that does not exist. The attempt was made from the following location:

    org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:598)

    The following method did not exist:

    org.springframework.kafka.listener.ContainerProperties.setAuthorizationExceptionRetryInterval(Ljava/time/Duration;)V

    The method's class, org.springframework.kafka.listener.ContainerProperties, is available from the following locations:

    jar:file:/C:/Users/User/.m2/repository/org/springframework/kafka/spring-kafka/2.3.3.RELEASE/spring-kafka-2.3.3.RELEASE.jar!/org/springframework/kafka/listener/ContainerProperties.class

    It was loaded from the following location:

    file:/C:/Users/User/.m2/repository/org/springframework/kafka/spring-kafka/2.3.3.RELEASE/spring-kafka-2.3.3.RELEASE.jar
    Gary Russell
    @garyrussell
    Your spring-kafka version is too old; that property was added in 2.3.5. You should let Spring Boot's dependency management bring in the right versions for all the dependencies.
    Yassine Gharbaoui
    @yascov
    Hello everyone when, is there a way to get the assigned partitions within a consumer?
    1 reply
    pyramesh
    @pyramesh
    @garyrussell , I am facing a issue in kafka that where all messages were replayed after consumer application restart. my configuration can be found in the below link. Could you please guide me https://stackoverflow.com/questions/66903764/kafka-messages-from-topic-are-being-replayed-after-consumer-restart
    2 replies
    mailniranjan
    @mailniranjan
    I have a requirement to show last sync from Kafka. What i understand is spring for kafka doesn't call the message listener when there are no records. But i want spring for kafka to call the message listener even when there are no records. Is this possible?
    vikrantch-hk
    @vikrantch-hk
    @garyrussell not sure if this is right channel to ask this question, but could't get much on web. Is there any way in spring kafka or integration to achieve delay queue behaviour in kafka.
    4 replies
    heesuk-ahn
    @heesuk-ahn

    Hi All,

    I have a question about the kafka consumer reblanace protocol. :)

    If a new consumer joins, In the first poll request JoinGroupRequest Will be sent,
    and will the Rebalance protocol proceed immediately?

    mailniranjan
    @mailniranjan

    I have a requirement to show last sync from Kafka. What i understand is spring for kafka doesn't call the message listener when there are no records available in kafka. But i want spring for kafka to call the message listener even when there are no records in kafka during this poll. Is this possible?

    At the end of every poll, I want to update last sync from Kafka value in DB. Please share your thoughts.

    5 replies
    venkatasreekanth
    @venkatasreekanth
    Quick question: We use kafka to move data from MDM system to other system, we have separate seed topics for refreshing data from higher environments. I have made the consumer factory a conditional bean but I am not sure how to make the kafkalistener conditional as well
    The internet says to set autoStartup but I don't see much documentation about it
    Hoping someone here can help me
    1 reply
    ChandanPadalkar
    @ChandanPadalkar
    what are best practices to handle exceptions related to kafka?
    1 reply
    Spencer Goldberg
    @goldberg.spencer_gitlab
    What is the right set of versions to use for spring-kafka, springboot, and kafka-streams? Getting ClassNotFound exceptions when trying to use spring-kafka 2.6.7, kafka streams 2.6.1, spring boot starter 2.4.4
    12 replies
    Dumitru Nicolae Marasoiu
    @nicolae.marasoiu:matrix.org
    [m]
    Hi, using boot 2.4.4, spring-kafka 2.6.7. In "production", the @KafkaListener gets called with proper CustomEvent Payload, but in component tests, it fails with Cannot convert from [org.apache.avro.generic.GenericData$Record] to CustomEvent...it is even stranger, in the sense that component tests pass on mac but not in docker but when trying to debug always hit Cannot convert from [org.apache.avro.generic.GenericData$Record] to..CustomEvent..where should this conversion happen? thank you
    2 replies
    vkpuduran
    @vkpuduran:matrix.org
    [m]
    Hi, having a strange issue with @KafkaListener. My consumer receives a message from a partition (org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1) receives the message but this message is processed by KafkaListenerEndpointContainer#0-1-C-1 and KafkaListenerEndpointContainer#0-0-C-1. Basically a message is processed twice. This happens during the load testing. I am using Spring-kafka-2.6.5. Any idea how the message received from one thread is being shared with another thread?
    1 reply
    vkpuduran
    @vkpuduran:matrix.org
    [m]
    Thank you and let me re check my implementation
    Adarsh Ramamurthy
    @radarsh
    What version of scala-library is supported by Spring Boot 2.4.4 (Spring Kafka 2.6.7, Kafka clients 2.6.0, Jackson 2.11.4 combo)? I get different ClassNotFound exceptions no matter which scala-library version I use. I found spring-projects/spring-kafka#1623 but it still seems to be the case for Spring Boot 2.4.4.
    6 replies
    deepak643
    @deepak643

    Hi team, I am using spring-kafka 2.5.5.
    I am looking forward to get the kafka consumer and producer metrics from my spring application. This application is not using annotations but is manually polling records and sending records from and to the topics.

    In prometheus, I can see only jvm, tomcat and custom metrics posted but I am missing kafka provided metrics. Would you please point me how to enable these metrics? the kafka consumer is created using

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
    
    props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    
    protected KafkaConsumer<byte[], Event> getConsumer(List<String> inputTopics) {
        KafkaConsumer<byte[], Event> consumer = new KafkaConsumer<>(props));
        consumer.subscribe(inputTopics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS revoked: " + partitions);
                consumer.commitAsync();
            }
    
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS assigned: " + partitions);
            }
        });
        return consumer;
    }

    application properties has jmx enabled

    spring.jmx.enabled=true
    management.endpoints.web.base-path=/
    management.endpoint.metrics.enabled=true
    management.endpoints.web.exposure.include=prometheus
    management.endpoint.prometheus.enabled=true
    management.metrics.export.prometheus.enabled=true

    Am I missing any configuration to expose these metrics? suggestions please?

    2 replies
    deepak643
    @deepak643

    I have changed my implementation to use spring-kafka and still see the same issue of kafka metrics not being exposed

    @EnableKafka
    @Configuration
    class KafkaConsumerConfig {
        private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
    
        @Autowired
        private GlobalConfig config;
    
        @Bean
        public ConsumerFactory<byte[], RheosEvent> salesTopicConsumerFactory() {
            Map<String, Object> props = commonConfig();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "XXX");
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName() + ".zzz");
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean(name="salesKafkaListenerContainerFactory")
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], RheosEvent>> salesKafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<byte[], RheosEvent> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConcurrency(1);
            factory.setBatchListener(true);
            factory.setConsumerFactory(salesTopicConsumerFactory());
            factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<byte[], RheosEvent> behaviorTopicConsumerFactory() {
            Map<String, Object> props = commonConfig();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "YYY");
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean(name="behaviorKafkaListenerContainerFactory")
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], RheosEvent>> behaviorKafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<byte[], RheosEvent> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConcurrency(1);
            factory.setBatchListener(true);
            factory.setConsumerFactory(behaviorTopicConsumerFactory());
            factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
    
        private Map<String, Object> commonConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
            props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
            props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
            props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
            return props;
        }
    
        static class CustomRebalanceListener implements ConsumerAwareRebalanceListener{
            @Override
            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS revoked: " + partitions);
                consumer.commitAsync();
            }
    
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS assigned: " + partitions);
            }
        }
    }

    Please suggest if I am missing any configuration

    2 replies
    Ter Leeloo II
    @ter-leeloo-ii

    I am currently playing with 2.7.0 version because I can use pausingPartitions on Containers, yet I cannot get BackOff interval to work ...
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(10000L, 3L)));

    Before on 2.6.7. version no problem - 10s wait between new retries, but in 2.7.0 there is no wait at all.

    I saw there is some change because of not-delayed Containers stopping, but I didn't see any way, what to configure different for that interval to work. Any idea? Thank you

    1 reply