Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    vkpuduran
    @vkpuduran:matrix.org
    [m]
    Hi, having a strange issue with @KafkaListener. My consumer receives a message from a partition (org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1) receives the message but this message is processed by KafkaListenerEndpointContainer#0-1-C-1 and KafkaListenerEndpointContainer#0-0-C-1. Basically a message is processed twice. This happens during the load testing. I am using Spring-kafka-2.6.5. Any idea how the message received from one thread is being shared with another thread?
    1 reply
    vkpuduran
    @vkpuduran:matrix.org
    [m]
    Thank you and let me re check my implementation
    Adarsh Ramamurthy
    @radarsh
    What version of scala-library is supported by Spring Boot 2.4.4 (Spring Kafka 2.6.7, Kafka clients 2.6.0, Jackson 2.11.4 combo)? I get different ClassNotFound exceptions no matter which scala-library version I use. I found spring-projects/spring-kafka#1623 but it still seems to be the case for Spring Boot 2.4.4.
    6 replies
    deepak643
    @deepak643

    Hi team, I am using spring-kafka 2.5.5.
    I am looking forward to get the kafka consumer and producer metrics from my spring application. This application is not using annotations but is manually polling records and sending records from and to the topics.

    In prometheus, I can see only jvm, tomcat and custom metrics posted but I am missing kafka provided metrics. Would you please point me how to enable these metrics? the kafka consumer is created using

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
    
    props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    
    protected KafkaConsumer<byte[], Event> getConsumer(List<String> inputTopics) {
        KafkaConsumer<byte[], Event> consumer = new KafkaConsumer<>(props));
        consumer.subscribe(inputTopics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS revoked: " + partitions);
                consumer.commitAsync();
            }
    
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS assigned: " + partitions);
            }
        });
        return consumer;
    }

    application properties has jmx enabled

    spring.jmx.enabled=true
    management.endpoints.web.base-path=/
    management.endpoint.metrics.enabled=true
    management.endpoints.web.exposure.include=prometheus
    management.endpoint.prometheus.enabled=true
    management.metrics.export.prometheus.enabled=true

    Am I missing any configuration to expose these metrics? suggestions please?

    2 replies
    deepak643
    @deepak643

    I have changed my implementation to use spring-kafka and still see the same issue of kafka metrics not being exposed

    @EnableKafka
    @Configuration
    class KafkaConsumerConfig {
        private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
    
        @Autowired
        private GlobalConfig config;
    
        @Bean
        public ConsumerFactory<byte[], RheosEvent> salesTopicConsumerFactory() {
            Map<String, Object> props = commonConfig();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "XXX");
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName() + ".zzz");
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean(name="salesKafkaListenerContainerFactory")
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], RheosEvent>> salesKafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<byte[], RheosEvent> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConcurrency(1);
            factory.setBatchListener(true);
            factory.setConsumerFactory(salesTopicConsumerFactory());
            factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<byte[], RheosEvent> behaviorTopicConsumerFactory() {
            Map<String, Object> props = commonConfig();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "YYY");
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean(name="behaviorKafkaListenerContainerFactory")
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], RheosEvent>> behaviorKafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<byte[], RheosEvent> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConcurrency(1);
            factory.setBatchListener(true);
            factory.setConsumerFactory(behaviorTopicConsumerFactory());
            factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
    
        private Map<String, Object> commonConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
            props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
            props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
            props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
            return props;
        }
    
        static class CustomRebalanceListener implements ConsumerAwareRebalanceListener{
            @Override
            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS revoked: " + partitions);
                consumer.commitAsync();
            }
    
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS assigned: " + partitions);
            }
        }
    }

    Please suggest if I am missing any configuration

    2 replies
    Ter Leeloo II
    @ter-leeloo-ii

    I am currently playing with 2.7.0 version because I can use pausingPartitions on Containers, yet I cannot get BackOff interval to work ...
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(10000L, 3L)));

    Before on 2.6.7. version no problem - 10s wait between new retries, but in 2.7.0 there is no wait at all.

    I saw there is some change because of not-delayed Containers stopping, but I didn't see any way, what to configure different for that interval to work. Any idea? Thank you

    1 reply
    Kevin Lewis
    @kevinrlewis
    Hello, I'm running into an issue when using spring-kafka to produce messages concurrently to Confluent. Using java 8 parallel streams and sending a message with KafkaTemplate or StreamBridge from spring-cloud-streams I get Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule when attempting to produce the message. When I remove the "parallel" part of the stream it works fine and I am able to produce. When I initialize a KafkaProducer myself without spring-kafka or spring-cloud-stream's wrappers, it works using the parallel streams.
    3 replies
    hairinwind
    @hairinwind

    Hello, I have code like this

    ReadOnlyKeyValueStore<String, Double> store = kafkaStreamsBuilder.getKafkaStreams().store(...)

    but kafkaStreamsBuilder.getKafkaStreams() is null. Any idea what is wrong?
    Many thanks

    4 replies
    Bulat
    @jerdys
    Hi! Does anyone have an implementation of dead letter queue in Spring Boot with Kafka? I looked up online, but every solution I found doesn't work for some reason :(
    4 replies
    andrshmot
    @andrshmot

    Hi guys, a question regarding handling errors: I'm using a spring boot application without REST layer, so no ControllerAdvice is placed. Now, I'm listening to events with this configuration

    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

    So I have a messgerconverter

    @Bean
        public RecordMessageConverter messageConverter() {
            return new StringJsonMessageConverter();
        }

    And this listener

        public void executeConsumer(final ConsumerRecord<String, String> cr,
            @Payload final POJO payload) {
    5 replies
    What I want to know is if there is any possibility to handle conversion errors, like this
    org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading zeroes not allowed
     at [Source: (String)"{"property1":"value", "property2": 005}"; line: 1, column: 51]
        at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:128) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:132) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:264) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:74) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1310) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1293) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1253) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1234) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1149) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:959) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:766) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:704) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
    Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading zeroes not allowed
     at [Source: (String)"{"property1":"value", "property2": 005}"; line: 1, column: 51]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.10.jar:2.9.10]
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693) ~[jackson-core-2.9.10.jar:2.9.10]
        at com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(
    Kaushalya Samarasekera
    @kausnz

    Hi team! If different functional areas of the same application needs to talk to the same Kafka cluster, what's the practice do you follow when comes to the number of KafkaTemplate instance you use? Will you share a single instance or do you tend to have a dedicated instance per feature area within the application?

    And if you have to connect your application to two kafka brokers (two separate clusters), is there a clever way to leverage what's provided by autoconfiguration (spring.kafka.*) without having to initialize the the templates with two groups of custom properties?

    1 reply
    Pietro Galassi
    @pietrogalassi
    Hi all. Please help figure out this scenario: i have a microservices that has more than 1 instance. This microservice reads from topic (ordersTopic) with atleast 24 partitions and aggregate data (sums) then puts the results over another topic (countTopic) . Then this countTopic is read from the same microservices in order to handle some logic on the count. Due to multiple instance can i have bugs on the count numbers ? Thanks a lot.
    3 replies
    vkpuduran
    @vkpuduran:matrix.org
    [m]
    Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
    vkpuduran
    @vkpuduran:matrix.org
    [m]

    :point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
    @Bean(name = "xxxEventKafkaListenerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> smartAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryUsingTopicName(xxxAlertsTopic));
    factory.setBatchListener(true);
    factory.setConcurrency(numberOfConcurrentListeners);
    factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));

        log.info("Created xxxKafkaListerFactory !!");
    
        return factory;
    }

    Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";
    Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.

    2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-alerts-xxx-consumer-local-1, groupId=ps-alerts-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container

    vkpuduran
    @vkpuduran:matrix.org
    [m]

    :point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
    @Bean(name = "xxxEventKafkaListenerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> xxxAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryUsingTopicName(ps-mon.0));
    factory.setBatchListener(true);
    factory.setConcurrency(numberOfConcurrentListeners);
    factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
    log.info("Created xxxKafkaListerFactory !!");

    return factory;
    }

    Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";
    Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.

    2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-alerts-xxx-consumer-local-1, groupId=ps-alerts-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container

    Not sure about what is missing here? Can't we have different factories with different credentials to access the topics? This is a spring boot application using Spring-kafka. Any pointers would be really appreciated?

    :point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
    @Bean(name = "xxxEventKafkaListenerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> xxxAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryUsingTopicName(ps-mon.0));
    factory.setBatchListener(true);
    factory.setConcurrency(numberOfConcurrentListeners);
    factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
    log.info("Created xxxKafkaListerFactory !!");

    return factory;
    }
    consumerFactoryUsingTopicName(String topicName) -> returns the default config values like
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);

    Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";
    Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.

    2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-alerts-xxx-consumer-local-1, groupId=ps-alerts-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container

    Not sure about what is missing here? Can't we have different factories with different credentials to access the topics? This is a spring boot application using Spring-kafka. Any pointers would be really appreciated?

    :point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
    @Bean(name = "xxxEventKafkaListenerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> xxxAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryUsingTopicName(ps-mon.0));
    factory.setBatchListener(true);
    factory.setConcurrency(numberOfConcurrentListeners);
    factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
    log.info("Created xxxKafkaListerFactory !!");

    return factory;
    }
    consumerFactoryUsingTopicName(String topicName) -> returns the default config values like
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);

    Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";
    Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.

    2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-as-xxx-consumer-local-1, groupId=ps-as-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container

    Not sure about what is missing here? Can't we have different factories with different credentials to access the topics? This is a spring boot application using Spring-kafka. Any pointers would be really appreciated?

    Gary Russell
    @garyrussell
    GItter is not a good medium for questions like this; ask on Stack Overflow or in a GitHub issue - also learn how to use markdown here - click the M icon on the right. The above is very hard to read. No, you can only have one set of credentials, it's a limitation of the JVM.
    Tomas Alabes
    @tomasAlabes
    Hi! Performance question to see if you have a recommendation. I have a service of a considerable size, consumes/publishes to 30 topics, 128 partitions each. 80 consumer groups, 32 transactional producers. What's happening to me is that, under load, the amounts of threads go crazy because of the producer:partition relationship. I started using the producerPerConsumerPartition=false property but it starts to freak out and rebalance groups much sooner than with the prop in true. Is there a way to define a pool of threads instead of 1thread:Npartitions or 1thread:1partition? Something in the middle? With 1:N it seems that the throughput goes down quick, and the other needs a mega-server to handle all the threads. Is adding more replicas with 1:1 the only way? (adding replicas in 1:N doesn't seem to change). Using spring-kafka 2.6.7 and kafka 2.6.0.
    3 replies
    Pietro Galassi
    @pietrogalassi

    Hi all, trying to enable StreamConfig with this code :
    @Bean
    public StreamsConfig streamsConfig() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaGroupId);
    configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);

        if (kafkaSSLEnabled) {
            configs.put("security.protocol", "SSL");
            configs.put("ssl.truststore.location", kafkaTrustoreLocation);
            configs.put("ssl.truststore.password", kafkaTrustorePassword);
        }
        return new StreamsConfig(configs);
    }

    does not connect to the AdminServer. How to fix ?

    Either changing to:
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)

    gives:
    The bean 'defaultKafkaStreamsConfig', defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [it/galax/injestor/orchestrator/consumers/KafkaStreamConfig.class] and overriding is disabled.

    2 replies
    Rakesh
    @ravening
    Hi all, is there a way to display the topic name on which the message is received? I have kafka consumer listening on atleast 10 topics... the message can arrive on any topic . so I want to know on which topic the messaging is coming in
    I have code like this @KafkaListener(topics = {"a, b, c,d,e,f,g,h,i,j,k"}
    3 replies
    Tiger Wang (王豫)
    @tigerinus
    Hello - We have this issue that when Kafka brokers must be taken offline, and no consumer service has any idea about that and keeps running. We had to manually terminate all consumer services and re-launch them. Q: How does a consumer know that all Kafka brokers are offline?
    4 replies
    andrshmot
    @andrshmot
    Morning guys, is there any way to log the container properties and the listener ones set in application.properties? In console only ProducerConfig gets logged by default.
    7 replies
    Anders Clausen
    @AndersClausen

    Hi @garyrussell. Would you be able to point me to an example where the static <K,V> KafkaSender<K,V> create(ProducerFactory factory, SenderOptions<K,V> options) from these docs https://projectreactor.io/docs/kafka/release/api/ is used? I appreciate it's Project Reactor Kafka and not Spring-Kafka but Sergei Egorov told me you're very familiar with that project too and that I should try and ask you.
    It's the custom ProducerFactory that causes me a bit of a problem. The documentation says factory - A custom producer factory other than the default.but does that mean I need to extend some type of base class or are there certain attributes/characteristics it must have?
    The reason we're trying to do this is because we're trying to get kafka metrics for reactive consumer/producer the same way that we would do it for non-reactive (like this):

    kafkaConsumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));

    Any help is much appreciated. Cheers

    4 replies
    Pietro Galassi
    @pietrogalassi

    Anyone can please tell me how to add headers in kafkaStreams ?
    Hi have something like this:

        injestorDataStream.groupByKey()
                .aggregate(InjestorWorkerEvent::new, (key, value, aggregate) -> aggregate.addInjested(value),
                        Materialized.with(Serdes.String(), new JsonSerde<>(InjestorWorkerEvent.class)))
                .toStream().to(injestionMonitorTopic,
                        Produced.with(Serdes.String(), new JsonSerde<>(InjestorWorkerEvent.class)));

    But i want to add custom headers to the produced event on topic. How to do it ? Thanks

    10 replies
    Jagmohan Sharma
    @JagmohanSharma
    Hi Team can you please help to know which kafka Consumer property can be used to increase max reconnection attempts when kafka server nodes are restarting so that spring boot based kafka Consumer can reconnect whenever server node comes up
    Currently we are facing issues if it takes longer to bring server up
    1 reply
    robertszooba
    @robertszooba

    Hi,

    I’d like to send and receive several different types of JSON messages (ReqA, ReqB, etc.) that all extend one specific type (abstract class Request). I would like to use (at)JsonTypeInfo and (at)JsonSubTypes in order to use inheritance.


    I have some questions about it and would be grateful for some pieces of advice.

    Concerning producers, I want to use StringSerializer for keys and JsonSerializer for values. Then, I was thinking about covering all cases with a single KafkaTemplate<String, Object>.


    1. Is such a setting (single KT<S, O>) sufficient to produce different (JSON) types that inherit from an abstract class? Is there something I should have in mind when using spring-kafka and inheritance?

    2. Can I use a single KafkaTemplate<String, Object> for each type or do I have to make it specific in order to produce messages of a given type i.e KafkaTemplate<String, A>, KafkaTemplate<String, B> ?

    3. For such an use-case should I go for JsonSerializer or StringSerializer with StringJsonMessageConverter?

    4. How can I distinguish different types if they are in a single topic? Are headers way to go?

    Thank you.

    1 reply
    Pietro Galassi
    @pietrogalassi
    Using a @KafkaListener ho i can set commit to sync ?
    12 replies
    thomasGuerin3
    @thomasGuerin3
    image.png
    1 reply
    image.png
    image.png
    image.png
    sl3329
    @sl3329
    This message was deleted
    3 replies
    Ming Li
    @danwslightinc

    Hi there, I am trying to implement Circuit Breaker with Resillience4J around Spring Cloud Stream Kafka Binder.

         @Bean
        public java.util.function.Consumer<Message<SomePojo>> process() {
            return message -> {
                var consumer = message.getHeaders().get(KafkaHeaders.CONSUMER, Consumer.class);
                var acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                var currentState = circuitBreakerRegistry.circuitBreaker(BACKEND_SERVICE).getState();
                if (currentState == CircuitBreaker.State.OPEN) {
                    var topic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class);
                    var partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
                    consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
                    if (acknowledgment != null) {
                        acknowledgment.nack(10);
                    }
                }
                // some business logic
                if (acknowledgment != null) {
                    log.debug("Acknowledged");
                    acknowledgment.acknowledge();
                }
            };
        }

    The above solution works for one message in the topic. When I have multiple messages, the app fails to poll after around 15 minutes. Did I miss anything?

    Thank you.

    13 replies
    Jack Wilkinson
    @guacjack

    Hey all just got a quick query and hoping one of you might be able to see where i've gone wrong.

    in my application.properties i have

    spring.kafka.bootstrap-servers=http://atesturl.com:9092

    and then i have application-test.properties i have

    spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

    Now when i run my actual tests with @ActiveProfiles("test"), i can see that the app runs, with the embeddedkafka broker and i can see the @KafkaListeners being called correctly after publishing messages....but i can see in the logs that it is firstly trying to connect using the "http://atesturl.com:9092" in my default application.properties

    So i end up with this in the logs

    Failed authentication with http://atesturl.com:9092/{the ip address} (Authentication failed)

    but then afterwards it seems to use the embedded kafka version. Its almost like its starting up in the application.properties version before being eventually overriden with the one in the application-test.properties file

    24 replies
    Jack Wilkinson
    @guacjack
    Anyone know if there is any kind of function or callback i can hook into for when my Kafka Consumer has connected to the broker? I am connecting via @KafkaListener and everything works fine, im just trying to write an integration test at the moment to verify that its connected
    14 replies
    Knut Schleßelmann
    @kschlesselmann
    Hi! Are there currently any known issues with health checks in current releases? I notice the health check response time goes up in one of our services from a couple ms to 10–20 seconds(!). Right now kafka is my prime suspect. It doesn't matter if I choose Spring Boot 2.4.7 or 2.5.1. With 2.4.6 everything seems fine. I'm still debugging to get some better info :-/
    1 reply
    Jack Wilkinson
    @guacjack
    Does Spring-Kafka change the default max.poll.records from 500? Also, is there a way to see the default consumer properties that Spring Kafka is applying?
    52 replies
    Jack Wilkinson
    @guacjack
    Also, how does spring-kafka deal with gracefully shutting down a consumer? If for example a sigterm is sent, does it process all existing records from the last poll before exiting?
    11 replies
    Jack Wilkinson
    @guacjack

    I can see from here https://stackoverflow.com/questions/50244744/spring-kafka-when-is-exactly-consumer-poll-called-behind-the-hood Gary has answered the question of "When is a Consumer's poll called behing the scenes" saying

    In 1.3 and above there is a single thread per consumer; the next poll() is performed after the last message from the previous poll has been processed by the listener.

    however, if there are 0 messages retrieved from Kafka so basically nothing on the queue to process, how often does it poll then?

    2 replies
    Pritam Roy
    @pritamroy:matrix.org
    [m]

    Hi, I'm trying to build an application in which there will be two spring boot app. One(A) will send a file path to a Kafka topic & another app (B) will listen to the same topic. After B receives the file path from A it uploads to object storage. Now the problem is the upload takes time as the file size can be any. So many times there is happening to rebalance. I'm not understanding how to implement this in a better way.

    What I'm doing now is as follow:

    topics = "topic",
    containerFactory = "configureKafkaListenerContainerFactory",
    errorHandler = "kafkaConsumerErrorHandler")
    public void consume(
    ConsumerRecord<String, String> record, Acknowledgment acknowledgment)
    throws Exception {
    UploadService.upload(record.value());
    acknowledgment.acknowledge();
    }

    Can anyone help me to do it in a better way, please?

    Pritam Roy
    @pritamroy:matrix.org
    [m]

    :point_up: Edit: Hi, I'm trying to build an application in which there will be two spring boot app. One(A) will send a file path to a Kafka topic & another app (B) will listen to the same topic. After B receives the file path from A it uploads to object storage. Now the problem is the upload takes time as the file size can be any. So many times there is happening to rebalance. I'm not understanding how to implement this in a better way.

    What I'm doing now is as follow:

    
    @KafkaListener(
    topics = "topic",
    containerFactory = "configureKafkaListenerContainerFactory",
    errorHandler = "kafkaConsumerErrorHandler")
    public void consume(
    ConsumerRecord<String, String> record, Acknowledgment acknowledgment)
    throws Exception {
    UploadService.upload(record.value());
    acknowledgment.acknowledge();
    }
    
    Can anyone help me to do it in a better way, please?
    7 replies
    plmakas
    @plmakas
    Hi, I'm deploying an application with @KafkaListener to Kubernetes. When there's an upgrade, the pods that are being terminated start to flood logs with warning "Connection to node xxx could not be established. Broker may not be available." until they are fully terminated. This makes browsing logs difficult in my centralized logging solution. Is there a way to avoid this warning in my case?
    1 reply
    Pritam Roy
    @pritamroy:matrix.org
    [m]
    Thanks @artembilan @garyrussell for your help.
    Rahul
    @rahul-raj-1
    Hello Team,
    What's the best strategy to read data from multiple topics .I have to read data from 5 topics and perform different processing based on message. So should I have a 1 KafkaListenerContainerFactory or 5 KafkaListenerContainerFactory (i.e 1 for each topic ).is there any recommendation from Spring Kafka?
    3 replies
    winkidzz
    @winkidzz
    Hello, I am using spring boot kafka to create streaming application. The spring:kafka:streams:state-dir property in application.yaml does not get picked up by spring kafka. any help>
    29 replies
    Luigi Cerone
    @LuigiCerone
    Hello everyone, what is the best way to gracefully shutdown Kafka Streams created with this library?
    I have a config class annotated with @Configuration @EnableKafkaStreams. This class creates two beans of type KStream<String, String>. I've configured a shutdown hook which call (inside a @PreDestroy callback) the method StreamsBuilderFactoryBean::close() (StreamsBuilderFactoryBean is injected with @Autowire). Is this the right approach? thanks.
    5 replies