Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Jagmohan Sharma
    @JagmohanSharma
    Hi @garyrussell , I need one input regarding one issue with kafka consumer as could not find relevant. If kafka consumer is down or out of the kafka group due to any reason, how can we restart consumer without our container restart. Currently we are deploying our microservicr as docker container which requires an restart on such issues?
    2 replies
    Jagmohan Sharma
    @JagmohanSharma
    1 reply
    alex gerhalt
    @lagerholt_gitlab
    Is there a Spring guildeline that dictates whether ConcurrentKafkaListenerContainerFactoryConfigurer has package private get/set methods ? I would like to make my own autoconfiguration mapping without duplicating this class and either make the methods protected or public. Please let me know if I can i make a pull request. Thank you
    1 reply
    cmadho
    @cmadho
    How can we set Producer props before publishing to DLT?
    14 replies
    ankur jadiya
    @jadiyaankur_twitter
    My mdc object logging is lost in another thread of kafka callback how to retain MDC object logging in kafka callback
    1 reply
    Jorg Heymans
    @jorgheymans
    Hi, i am wondering if producer metrics can be enabled for a stream producer in spring-kafka ? Similar to producerFactory.addListener(new MicrometerProducerListener(meterRegistry))for a "normal" non-stream producer
    2 replies
    cmadho
    @cmadho
    Hi, I am using @EmbeddedKafka for testing. I want to test whether the faulty records are going to the DLT. How can I test that? As of now I just tried creating a new Kafka consumer that subscribes to the DLT.
    4 replies
    mphmelsen
    @mphmelsen
    I was searching for an example to mock the schema registry in a spring kafka test. I could not find an example or description on how to go about this. Can you tell me what the best way is to cope with this?
    sreekanthjanapati
    @sreekanthjanapati
    how we can mention poll-interval with @KafkaListener, i got the solution in this [link] (spring-projects/spring-kafka#819 ) but it's showing error if add poll-interval=1000 because it's not implemented in KafkaListener class, I'm using Kafka latest version 2.6.1, could anyone please help me how to add poll interval property in @kafkaListener
    2 replies
    parameswaranvv
    @parameswaranvv
    @mphmelsen i actually used an embedded wiremock to stub the schema registry calls.
    Piotr Joński
    @sta-szek
    hi guys, is there any metric from spring-kafka that will tell me status of consumer thread pool?
    usecase: when i set kafka.concurrency to smth else than 1, e.g. 5 then i would like to monitor if the pool is depleted and raise an alert. or saying otherwords, i would like to know when all threads in that pool are active and there are still kafka events to process (e.g. kafka lag)
    so far i found only spring_kafka_listener_seconds_sum which gives me overview on handling time, but not pool condition.
    Piotr Joński
    @sta-szek
    is any thread pool used for that? :arrow_up: ?
    18 replies
    Anders Clausen
    @AndersClausen
    Hi all. I was wondering if anybody had a suggestion to what the best possible way is to assemble multi-part messages from kafka? Is this best achieved by using Kafka Streams or do you have something better to recommend? Cheers
    8 replies
    Pedro Alvillar
    @PedroPan182_twitter
    Hello all. I was wondering if there is any issue with using the @KafkaListener annotation twice on the same method. In other words:
    @KafkaListener(id = "someId")
    @KafkaListener(id = "someOtherId")
    public void onMessage(ConsumerRecord<String, SomeClass> record) {
    //some logic to process here
    }
    3 replies
    sreekanthjanapati
    @sreekanthjanapati
    Here i wanna read 5 records for every poll (1 second ) from topic, for that i writeen below code, but it's not working as expected , it's polling 1 record per second and it missing some records from topic Can anyone please help to resolve this.
    @KafkaListener(topics = { "dax-topic" } , properties = {"max.poll.records:5"})
    public void onMessage(Consumer consumer, ConsumerRecord<String, String> consumerRecord) {
                 consumer.poll(60000);
              log.info("ConsumerRecord : {} ", consumerRecord);
    }
    8 replies
    Eric Kolotyluk
    @kolotyluk
    This message was deleted
    1 reply
    Anders Clausen
    @AndersClausen
    Can somebody please confirm something for me. Can I have two consumer groups (each listening to one topic) registered with spring-kafka in one spring boot service? Cheers
    2 replies
    Anders Clausen
    @AndersClausen

    I've written a kafka consumer with the help of spring-kafka but I can't get my test with @EmbeddedKafka to work. It doesn't go into the method that has @KafkaListener set. I'm getting the following error message at the end but not sure if it's a red herring

    org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 0 (/127.0.0.1:9092) could not be established. Broker may not be available.

    These are the annotations we're using

    @ExtendWith(SpringExtension.class)
    @EmbeddedKafka(partitions = 1, topics = {"MSK.ebdd.topic.v1",
            "MSSebdd.topic.v1"},
            brokerProperties = {"listeners=PLAINTEXT://127.0.0.1:9092", "port=9092", "offsets.topic.replication.factor=1"})
    @SpringBootTest(classes = { GDSKafkaConsumer.class, KafkaConsumerConfig.class }, properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")

    and here is how we set up the producer and the producer record

            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TPDOC_TOPIC_NAME, 0, "key", payload);
    
            Headers headers = producerRecord.headers();
            addHeaders(producerRecord.headers(), JSON_HEADERS_TPDOC, "1",
                    "b535a84d-e762-4557-9665-5f721feb73", "0");
    
            RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(MSK.ebdd.topic.v1, 0, "test-key", payload, headers)).get();
            producer.flush();

    Who can spot what I'm doing wrong?

    I'm using Junit5 but can't really find that many examples with spring-kafka & junit5 - is that because it's not as easy to use as junit4?

    10 replies
    Anders Clausen
    @AndersClausen
    Hi @garyrussell . Any chance you can tell me what the latest is on this ticket spring-projects/spring-kafka#1123 Really keen on using reactive kafka if possible :-)
    2 replies
    Marcel Koopman
    @marcelkoopman
    Hello, what can be done about an endless kafka rebalancing? On our production server we are having a rebalance all night, it never seems to end. Which settings should be looked at or is there any recent issue related to this? We have kafka broker 2.0
    Marcel Koopman
    @marcelkoopman
    Spring kafka 2.5.5-release
    2 replies
    mailniranjan
    @mailniranjan
    Hi Gary. This is regarding the usage of FetcherMetricsRegistry class used in https://stackoverflow.com/questions/56540759/how-to-monitor-kafka-consumer-lag-for-transactional-consumers. It is part of org.apache.kafka.clients.consumer.internals package. Is it fine to use classes under internals package as they could be made private in future? My requirement is to get metrics like records-lag-max and others which are mentioned in https://docs.confluent.io/current/kafka/monitoring.html. If there is some other way through which we can fetch these metrics, please share those details.
    5 replies
    luissangge
    @luissangge

    Hello, I'm having a problem with concurrency. If a put factory.setConcurrency(5) all working fine, but If I set concurrency in application properties it's not working spring.kafka.listener.concurrency=5

    @Bean(name = "listenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setIdleEventInterval(60000L);
    factory.setAckDiscarded(false); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.setConcurrency(5);
    factory.setErrorHandler(currentErrorHandler());
    return factory;
    }

    16 replies
    Anders Clausen
    @AndersClausen
    Hi @garyrussell. We've been following your test example from this SO post https://stackoverflow.com/questions/52783066/how-to-write-unit-test-for-kafkalistener and were wondering what could cause the inconsistencies we get - one minute the test passes and the next it fails (same code).
    This is what the test looks like:
        @Test
        @DisplayName("Tests that a message sent to the TpDoc topic is processed by the consumer")
        public void consumerShouldProcessTpDocGDSMessages() throws Exception {
    
            ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry
                    .getListenerContainer("TPDOC-LISTENER");
            container.stop();
            @SuppressWarnings("unchecked")
            AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container
                    .getContainerProperties().getMessageListener();
            CountDownLatch latch = new CountDownLatch(2);
            container.getContainerProperties()
                    .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, String>() {
    
                        @Override
                        public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment,
                                              Consumer<?, ?> consumer) {
                            messageListener.onMessage(data, acknowledgment, consumer);
                            latch.countDown();
                        }
    
                    });
            container.start();
    
            final Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafkaBroker);
            Producer<String, String> producer = new DefaultKafkaProducerFactory<>(producerProps,
                    new StringSerializer(), new StringSerializer()).createProducer();
    
            String payload = TestUtils
                    .readFileToString("gds/tpdoc/Import_Party_Code_Changed_header_and_data_tags_removed.json");
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME_TPDOC, 0, "test-tpdoc-key", payload);
    
            addHeaders(producerRecord.headers(), JSON_HEADERS_TPDOC, "1",
                    "b535a84d-e762-4557-9665-5f721fbbeb73", "0");
    
            producer.send(producerRecord).get();
            RecordMetadata recordMetadata = producer.send(producerRecord).get();
            System.out.println("Producer record metadata -> " + recordMetadata);
    
            assertThat(latch.await(20, TimeUnit.SECONDS))
                    .describedAs("Messages successfully consumed")
                    .isTrue();
    
            producer.flush();
        }
    2 replies
    Jagmohan Sharma
    @JagmohanSharma
    Hi Team/ @garyrussell , I am looking for some clarification regarding properties which we can be used to avoid producer timeout due to either more time taken since batch creation with blocked batch or timeout with metadata read. I am confused if I should increase max.block.ms or delivery.timeout.ms?? And if we also need to set buffer.memory with these timeouts to avoid memory blockage issue??
    1 reply
    orkhanmirzayev
    @orkhanmirzayev
    Hi Team/@garyrussell. I want to get response back the controller but I am unable to do it. Here is my producer and consumer code:

    @Configuration
    public class KafkaProducerConfig {

    @Value("${kafka.boot.server}")
    private String kafkaServer;
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerConfig());
    }
    
    @Bean
    public ProducerFactory<String, String> producerConfig() {
        // TODO Auto-generated method stub
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        //Uncomment the below if you want to send String instead of an Object through Kafka
        //config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    }

    @RestController
    @RequestMapping("/kafkaProducer")
    public class KafkaProducerController {

    private final KafkaSender sender;
    
    public KafkaProducerController(KafkaSender sender) {
        this.sender = sender;
    }
    
    @PostMapping(value = "/callback")
    public ResponseEntity<String> sendDataCallback(@RequestBody Student student) throws ExecutionException, InterruptedException {
        sender.sendDataCallback(student);
        System.out.println("COMPLETED");
        return new ResponseEntity<>("Data sent to Kafka", HttpStatus.OK);
    }

    }

    @Service
    public class KafkaSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSender.class);
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    @Value("${kafka.topic.name}")
    private String topicName;
    
    public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    public void sendDataCallback(Student student) throws ExecutionException, InterruptedException {
        Map<String, Object> headers = new HashMap<>();
        headers.put(KafkaHeaders.TOPIC, topicName);
    
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(new GenericMessage<>(student, headers));
        future.addCallback(new StudentCallback(student));
        LOGGER.info("Data - " + student.toString() + " sent to Kafka Topic - " + topicName);
    
    }

    }

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {

    private final KafkaProperties kafkaProperties;
    
    public KafkaConsumerConfig(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }
    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        final JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
        jsonDeserializer.addTrustedPackages("*");
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer
        );
    }
    @Value("${kafka.boot.server}")
    private String kafkaServer;
    
    @Bean
    public ProducerFactory<String, String> producerConfig() {
        // TODO Auto-generated method stub
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        //Uncomment the below if you want to send String instead of an Object through Kafka
        //config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, String> kafkaProducerFactory) {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        MessagingMessageConverter messageConverter = new MessagingMessageConverter();
        messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
        kafkaTemplate.setMessageConverter(messageConverter);
        return kafkaTemplate;
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Student> kafkaListenerContainerFactory(KafkaTemplate kafkaTemplate) {
        ConcurrentKafkaListenerContainerFactory<String, Student> listener = new ConcurrentKafkaListenerContainerFactory<>();
        listener.setConsumerFactory(consumerFactory());
        listener.setReplyTemplate(kafkaTemplate);
        return listener;
    }

    }

    @Service
    public class KafkaReciever {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReciever.class);
    
    @KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.consumer.group.id}")
    @SendTo
    public Student recieveData(@Payload  Student student) throws InterruptedException {
        LOGGER.info("Data - " + student.toString() + " recieved");
        student.setFirstName("Farid");
        return student;
    }
    How to get updated student object back to my controller?
    2 replies
    mailniranjan
    @mailniranjan
    Hi Gary. What is the purpose of client id which can be set in container properties? And also how the client id is related to member id which is returned by consumer's group metadata? in my case the member id was equal to client id followed by random guid. I am not able to understand this. please help. I have used KafkaMessageListenerContainer.
    4 replies
    CruncherBigData
    @CruncherBigData
    @garyrussell in regards to this question https://stackoverflow.com/questions/64289817/defaultkafkaheadermapper-could-not-decode-json-type how do you want me to show the headers ? the key spring_json_header_types is coming as a byte array !
    52 replies
    Ghenadii Batalski
    @ghenadiibatalski
    Hello all, i need to reply some messages. Is it possible to reset offsets on a running consumer group? Regards, Gena
    5 replies
    mailniranjan
    @mailniranjan
    Hi All. I have one doubt. This is not related to spring kafka. https://stackoverflow.com/questions/64389519/what-should-be-the-kafka-serde-configuration-when-we-use-kafka-streams. I didn't get response from stack overflow for the past 2 days. If anyone knows the answer please reply.
    2 replies
    Aleksey Vasin
    @youagree
    Hello, may be somebody know, when transaction rollback while sending message at uncatch exception?
    in spring boot application, @Transactional on this method
    1 reply
    hanoisteve
    @hanoisteve
    So I am using Request/Reply ReplyingKafkaTemplate to call from service A to Service B. I have a timeout on in Service A to Service B request which only seems to get resolved by changing the consumer group of Service A and redeploying service A. Not an optimal way to resolve this issue. Does anyone have an idea as to why this is occurring?
    hanoisteve
    @hanoisteve
    6 replies
    shrutipabboju
    @shrutipabboju
    Hi all, while consuming the message from Kafka in Avro format. The consumer is throwing this error. org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.avro.generic.GenericData$Record] to [com.sample.sample.Style] for GenericMessage. Can someone help on this?
    shrutipabboju
    @shrutipabboju
    This is my consumer: @StreamListener(Processor.INPUT)
    public void consumeStyleDetails(Style styleDetails){
    logger.info("Processing Style Details: {}", styleDetails);
    logger.info("Sample:{}",styleDetails.getMarketCode());
    }
    These two lines are associated with the producer, Message<Style> message = MessageBuilder.withPayload(style).build();
    source.output().send(message);
    Gary Russell
    @garyrussell
    Please learn how to format code in gitter - see the Markdown link on the right M.
    Use ``` fences before and after the code
    This question is more about Avro than Spring - we are not Avro experts here.
    It's probably some configuration problem with the Avro Deserializer.
    In any case, this room is for spring-kafka - use the spring-cloud-stream room for questions about that project.
    Gary Russell
    @garyrussell
    Also, you'll find a much bigger audience on Stack Overflow, tagged with avro, etc.