Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Aleh Danilovich
    @MCPDanilovich
    Hi, Is it possible to start partition reassigment when we use cooperative-sticky protocol. I have 1 topic (100 partitions) and 100 consumers in general (5 k8s pods with 20 consumer threads with the same group.id). Consumers are devided evenly in general. But in situation then i want scale my app up to 10 pods for example, all new consumers still stay unassigned to any partition. Protocol = sticky. Is it possible to use part of consumers from all new pods ? For example 10 pods - 10 consumer threads per pod ?
    1 reply
    Tomas Alabes
    @tomasAlabes
    One perf question, I have 2 instances of my service, each with 100 consumers consuming around 30 topics with 128 partitions each (concurrency: 1). As soon as the 2nd instance goes up, the rebalancing spikes the CPU with a loop of Attempt to heartbeat failed since group is rebalancing making the services unusable (3 cpu limit). Is there any particular tip you could give to reduce such load? I can't use static membership because I have kafka 2.1.1. Thank you!
    4 replies
    Hushen Savani
    @hushensavani
    Hello, I'm using Kafka listeners in Spring boot project. I want to upgrade Kafka dependencies in Spring boot... pls share some pointers which dependencies to change?
    I want to upgrade to 2.6.6 in order to use RecordInterceptor
    1 reply
    Laurensius Bernhard
    @l7777777b
    Hi not a really technical question, im consuming a betting transaction (a very high throughput) using spring kafka, alot of optimisation is ongoing to support this since currently its cant handle 100msg/second or about 40kbps fast enough, its constantly lagging behind. before im optimising my big transactional process, im trying to implement batchlistener. is it going to help?
    2 replies
    ashevade1
    @ashevade1
    I am new to using spring-kafka, So please excuse me if this is a basic questions. To Consume messages, we specify a use the @KafkaListener annotation with topics and a group id specified. FOr e.g @KafkaListener( topics = "topic1", group-id="group-1") This works fine. But my requirement is to get the list of topics at Run time, based on some business logic. Is there a way this can be done ? For eg want something like this List<String> topics = getTopics() ; for(String topic: topics) { // start consuming messages for each topic }.Thanks for any help.
    Yuen Fung Lee
    @leeyf1995_gitlab

    Hi all, I got a technical question about running spring-kafka version 2.2.7 with AWS MSK (Kafka version 2.2.1)

    I have a consumer application with two running counts in the container platform. The consumer application uses one consumer group and listens to two topics A and B

    After running normally for few weeks without any restart, we have suddenly run into an error indicating the application could not commit the message in topic A.
    The commit exception:

    Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

    Right after the commit exception, there are two messages showing

    1. group coordinator (id rack null) is unavailable or invalid will attempt rediscovery
    
    2. discovered group coordinator

    Since we are using SeekToCurrentHandler, the application could still process the next message, just that the processed message could not be committed to Kafka. The same error messages would appear for each message.

    During the error period, messages in another topic B could be processed totally fine.

    I have then rolling restarted the application runtimes and the problem is resolved.

    The application is deployed to three different QA environments and only one of them has faced this issue.

    2 replies
    mailniranjan
    @mailniranjan
    Currently in consumer interceptor we save some metrics info like when was the last record fetched from kafka, etc. We have a requirement to save kafka consumer metrics as well. I know we have KafkaConsumer's metrics method available to get the metrics info. But in interceptor, we don't have access to the consumer object. Please guide me on how to get access to consumer object in consumer interceptor?
    6 replies
    deepakemani-maersk
    @deepakemani-maersk
    Hi, Can you please point to any example done for Spring cloud contract with avro message? Looks like there needs more set up than a simple kafka container.
    1 reply
    infilitry
    @infilitry
    Hi guys, I keep running into this issue Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. This is even after making sure the configs on kafka are fine
    Any idea what I'm missing...?
    infilitry
    @infilitry
    nvm, I've worked it out
    deepak21singh
    @deepak21singh
    I am trying to implement the spring cloud config server with auto refresh properties with kafka bus...i am getting following error while i am using <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-config-monitor</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
    </dependency>

    Description:

    An attempt was made to call a method that does not exist. The attempt was made from the following location:

    org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:598)

    The following method did not exist:

    org.springframework.kafka.listener.ContainerProperties.setAuthorizationExceptionRetryInterval(Ljava/time/Duration;)V

    The method's class, org.springframework.kafka.listener.ContainerProperties, is available from the following locations:

    jar:file:/C:/Users/User/.m2/repository/org/springframework/kafka/spring-kafka/2.3.3.RELEASE/spring-kafka-2.3.3.RELEASE.jar!/org/springframework/kafka/listener/ContainerProperties.class

    It was loaded from the following location:

    file:/C:/Users/User/.m2/repository/org/springframework/kafka/spring-kafka/2.3.3.RELEASE/spring-kafka-2.3.3.RELEASE.jar
    Gary Russell
    @garyrussell
    Your spring-kafka version is too old; that property was added in 2.3.5. You should let Spring Boot's dependency management bring in the right versions for all the dependencies.
    Yassine Gharbaoui
    @yascov
    Hello everyone when, is there a way to get the assigned partitions within a consumer?
    1 reply
    pyramesh
    @pyramesh
    @garyrussell , I am facing a issue in kafka that where all messages were replayed after consumer application restart. my configuration can be found in the below link. Could you please guide me https://stackoverflow.com/questions/66903764/kafka-messages-from-topic-are-being-replayed-after-consumer-restart
    2 replies
    mailniranjan
    @mailniranjan
    I have a requirement to show last sync from Kafka. What i understand is spring for kafka doesn't call the message listener when there are no records. But i want spring for kafka to call the message listener even when there are no records. Is this possible?
    vikrantch-hk
    @vikrantch-hk
    @garyrussell not sure if this is right channel to ask this question, but could't get much on web. Is there any way in spring kafka or integration to achieve delay queue behaviour in kafka.
    4 replies
    heesuk-ahn
    @heesuk-ahn

    Hi All,

    I have a question about the kafka consumer reblanace protocol. :)

    If a new consumer joins, In the first poll request JoinGroupRequest Will be sent,
    and will the Rebalance protocol proceed immediately?

    mailniranjan
    @mailniranjan

    I have a requirement to show last sync from Kafka. What i understand is spring for kafka doesn't call the message listener when there are no records available in kafka. But i want spring for kafka to call the message listener even when there are no records in kafka during this poll. Is this possible?

    At the end of every poll, I want to update last sync from Kafka value in DB. Please share your thoughts.

    5 replies
    venkatasreekanth
    @venkatasreekanth
    Quick question: We use kafka to move data from MDM system to other system, we have separate seed topics for refreshing data from higher environments. I have made the consumer factory a conditional bean but I am not sure how to make the kafkalistener conditional as well
    The internet says to set autoStartup but I don't see much documentation about it
    Hoping someone here can help me
    1 reply
    ChandanPadalkar
    @ChandanPadalkar
    what are best practices to handle exceptions related to kafka?
    1 reply
    Spencer Goldberg
    @goldberg.spencer_gitlab
    What is the right set of versions to use for spring-kafka, springboot, and kafka-streams? Getting ClassNotFound exceptions when trying to use spring-kafka 2.6.7, kafka streams 2.6.1, spring boot starter 2.4.4
    12 replies
    Dumitru Nicolae Marasoiu
    @nicolae.marasoiu:matrix.org
    [m]
    Hi, using boot 2.4.4, spring-kafka 2.6.7. In "production", the @KafkaListener gets called with proper CustomEvent Payload, but in component tests, it fails with Cannot convert from [org.apache.avro.generic.GenericData$Record] to CustomEvent...it is even stranger, in the sense that component tests pass on mac but not in docker but when trying to debug always hit Cannot convert from [org.apache.avro.generic.GenericData$Record] to..CustomEvent..where should this conversion happen? thank you
    2 replies
    vkpuduran
    @vkpuduran:matrix.org
    [m]
    Hi, having a strange issue with @KafkaListener. My consumer receives a message from a partition (org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1) receives the message but this message is processed by KafkaListenerEndpointContainer#0-1-C-1 and KafkaListenerEndpointContainer#0-0-C-1. Basically a message is processed twice. This happens during the load testing. I am using Spring-kafka-2.6.5. Any idea how the message received from one thread is being shared with another thread?
    1 reply
    vkpuduran
    @vkpuduran:matrix.org
    [m]
    Thank you and let me re check my implementation
    Adarsh Ramamurthy
    @radarsh
    What version of scala-library is supported by Spring Boot 2.4.4 (Spring Kafka 2.6.7, Kafka clients 2.6.0, Jackson 2.11.4 combo)? I get different ClassNotFound exceptions no matter which scala-library version I use. I found spring-projects/spring-kafka#1623 but it still seems to be the case for Spring Boot 2.4.4.
    6 replies
    deepak643
    @deepak643

    Hi team, I am using spring-kafka 2.5.5.
    I am looking forward to get the kafka consumer and producer metrics from my spring application. This application is not using annotations but is manually polling records and sending records from and to the topics.

    In prometheus, I can see only jvm, tomcat and custom metrics posted but I am missing kafka provided metrics. Would you please point me how to enable these metrics? the kafka consumer is created using

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
    
    props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    
    protected KafkaConsumer<byte[], Event> getConsumer(List<String> inputTopics) {
        KafkaConsumer<byte[], Event> consumer = new KafkaConsumer<>(props));
        consumer.subscribe(inputTopics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS revoked: " + partitions);
                consumer.commitAsync();
            }
    
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS assigned: " + partitions);
            }
        });
        return consumer;
    }

    application properties has jmx enabled

    spring.jmx.enabled=true
    management.endpoints.web.base-path=/
    management.endpoint.metrics.enabled=true
    management.endpoints.web.exposure.include=prometheus
    management.endpoint.prometheus.enabled=true
    management.metrics.export.prometheus.enabled=true

    Am I missing any configuration to expose these metrics? suggestions please?

    2 replies
    deepak643
    @deepak643

    I have changed my implementation to use spring-kafka and still see the same issue of kafka metrics not being exposed

    @EnableKafka
    @Configuration
    class KafkaConsumerConfig {
        private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
    
        @Autowired
        private GlobalConfig config;
    
        @Bean
        public ConsumerFactory<byte[], RheosEvent> salesTopicConsumerFactory() {
            Map<String, Object> props = commonConfig();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "XXX");
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName() + ".zzz");
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean(name="salesKafkaListenerContainerFactory")
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], RheosEvent>> salesKafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<byte[], RheosEvent> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConcurrency(1);
            factory.setBatchListener(true);
            factory.setConsumerFactory(salesTopicConsumerFactory());
            factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<byte[], RheosEvent> behaviorTopicConsumerFactory() {
            Map<String, Object> props = commonConfig();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "YYY");
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean(name="behaviorKafkaListenerContainerFactory")
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<byte[], RheosEvent>> behaviorKafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<byte[], RheosEvent> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConcurrency(1);
            factory.setBatchListener(true);
            factory.setConsumerFactory(behaviorTopicConsumerFactory());
            factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
    
        private Map<String, Object> commonConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
            props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
            props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
            props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
            return props;
        }
    
        static class CustomRebalanceListener implements ConsumerAwareRebalanceListener{
            @Override
            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS revoked: " + partitions);
                consumer.commitAsync();
            }
    
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                logger.info("PARTITIONS assigned: " + partitions);
            }
        }
    }

    Please suggest if I am missing any configuration

    2 replies
    Ter Leeloo II
    @ter-leeloo-ii

    I am currently playing with 2.7.0 version because I can use pausingPartitions on Containers, yet I cannot get BackOff interval to work ...
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(10000L, 3L)));

    Before on 2.6.7. version no problem - 10s wait between new retries, but in 2.7.0 there is no wait at all.

    I saw there is some change because of not-delayed Containers stopping, but I didn't see any way, what to configure different for that interval to work. Any idea? Thank you

    1 reply
    Kevin Lewis
    @kevinrlewis
    Hello, I'm running into an issue when using spring-kafka to produce messages concurrently to Confluent. Using java 8 parallel streams and sending a message with KafkaTemplate or StreamBridge from spring-cloud-streams I get Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule when attempting to produce the message. When I remove the "parallel" part of the stream it works fine and I am able to produce. When I initialize a KafkaProducer myself without spring-kafka or spring-cloud-stream's wrappers, it works using the parallel streams.
    3 replies
    hairinwind
    @hairinwind

    Hello, I have code like this

    ReadOnlyKeyValueStore<String, Double> store = kafkaStreamsBuilder.getKafkaStreams().store(...)

    but kafkaStreamsBuilder.getKafkaStreams() is null. Any idea what is wrong?
    Many thanks

    4 replies
    Bulat
    @jerdys
    Hi! Does anyone have an implementation of dead letter queue in Spring Boot with Kafka? I looked up online, but every solution I found doesn't work for some reason :(
    4 replies
    andrshmot
    @andrshmot

    Hi guys, a question regarding handling errors: I'm using a spring boot application without REST layer, so no ControllerAdvice is placed. Now, I'm listening to events with this configuration

    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

    So I have a messgerconverter

    @Bean
        public RecordMessageConverter messageConverter() {
            return new StringJsonMessageConverter();
        }

    And this listener

        public void executeConsumer(final ConsumerRecord<String, String> cr,
            @Payload final POJO payload) {
    5 replies
    What I want to know is if there is any possibility to handle conversion errors, like this
    org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading zeroes not allowed
     at [Source: (String)"{"property1":"value", "property2": 005}"; line: 1, column: 51]
        at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:128) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:132) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:264) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:74) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1310) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1293) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1253) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1234) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1149) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:959) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:766) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:704) ~[spring-kafka-2.2.11.RELEASE.jar:2.2.11.RELEASE]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
    Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading zeroes not allowed
     at [Source: (String)"{"property1":"value", "property2": 005}"; line: 1, column: 51]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.10.jar:2.9.10]
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693) ~[jackson-core-2.9.10.jar:2.9.10]
        at com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(
    Kaushalya Samarasekera
    @kausnz

    Hi team! If different functional areas of the same application needs to talk to the same Kafka cluster, what's the practice do you follow when comes to the number of KafkaTemplate instance you use? Will you share a single instance or do you tend to have a dedicated instance per feature area within the application?

    And if you have to connect your application to two kafka brokers (two separate clusters), is there a clever way to leverage what's provided by autoconfiguration (spring.kafka.*) without having to initialize the the templates with two groups of custom properties?

    1 reply
    Pietro Galassi
    @pietrogalassi
    Hi all. Please help figure out this scenario: i have a microservices that has more than 1 instance. This microservice reads from topic (ordersTopic) with atleast 24 partitions and aggregate data (sums) then puts the results over another topic (countTopic) . Then this countTopic is read from the same microservices in order to handle some logic on the count. Due to multiple instance can i have bugs on the count numbers ? Thanks a lot.
    3 replies
    vkpuduran
    @vkpuduran:matrix.org
    [m]
    Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
    vkpuduran
    @vkpuduran:matrix.org
    [m]

    :point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
    @Bean(name = "xxxEventKafkaListenerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> smartAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryUsingTopicName(xxxAlertsTopic));
    factory.setBatchListener(true);
    factory.setConcurrency(numberOfConcurrentListeners);
    factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));

        log.info("Created xxxKafkaListerFactory !!");
    
        return factory;
    }

    Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";
    Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.

    2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-alerts-xxx-consumer-local-1, groupId=ps-alerts-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container

    vkpuduran
    @vkpuduran:matrix.org
    [m]

    :point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
    @Bean(name = "xxxEventKafkaListenerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> xxxAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryUsingTopicName(ps-mon.0));
    factory.setBatchListener(true);
    factory.setConcurrency(numberOfConcurrentListeners);
    factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
    log.info("Created xxxKafkaListerFactory !!");

    return factory;
    }

    Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";
    Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.

    2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-alerts-xxx-consumer-local-1, groupId=ps-alerts-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container

    Not sure about what is missing here? Can't we have different factories with different credentials to access the topics? This is a spring boot application using Spring-kafka. Any pointers would be really appreciated?

    :point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
    @Bean(name = "xxxEventKafkaListenerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> xxxAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryUsingTopicName(ps-mon.0));
    factory.setBatchListener(true);
    factory.setConcurrency(numberOfConcurrentListeners);
    factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
    log.info("Created xxxKafkaListerFactory !!");

    return factory;
    }
    consumerFactoryUsingTopicName(String topicName) -> returns the default config values like
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);

    Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";
    Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.

    2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-alerts-xxx-consumer-local-1, groupId=ps-alerts-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container

    Not sure about what is missing here? Can't we have different factories with different credentials to access the topics? This is a spring boot application using Spring-kafka. Any pointers would be really appreciated?

    :point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
    @Bean(name = "xxxEventKafkaListenerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> xxxAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryUsingTopicName(ps-mon.0));
    factory.setBatchListener(true);
    factory.setConcurrency(numberOfConcurrentListeners);
    factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
    log.info("Created xxxKafkaListerFactory !!");

    return factory;
    }
    consumerFactoryUsingTopicName(String topicName) -> returns the default config values like
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);

    Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";
    Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.

    2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-as-xxx-consumer-local-1, groupId=ps-as-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
    2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container

    Not sure about what is missing here? Can't we have different factories with different credentials to access the topics? This is a spring boot application using Spring-kafka. Any pointers would be really appreciated?

    Gary Russell
    @garyrussell
    GItter is not a good medium for questions like this; ask on Stack Overflow or in a GitHub issue - also learn how to use markdown here - click the M icon on the right. The above is very hard to read. No, you can only have one set of credentials, it's a limitation of the JVM.
    Tomas Alabes
    @tomasAlabes
    Hi! Performance question to see if you have a recommendation. I have a service of a considerable size, consumes/publishes to 30 topics, 128 partitions each. 80 consumer groups, 32 transactional producers. What's happening to me is that, under load, the amounts of threads go crazy because of the producer:partition relationship. I started using the producerPerConsumerPartition=false property but it starts to freak out and rebalance groups much sooner than with the prop in true. Is there a way to define a pool of threads instead of 1thread:Npartitions or 1thread:1partition? Something in the middle? With 1:N it seems that the throughput goes down quick, and the other needs a mega-server to handle all the threads. Is adding more replicas with 1:1 the only way? (adding replicas in 1:N doesn't seem to change). Using spring-kafka 2.6.7 and kafka 2.6.0.
    3 replies
    Pietro Galassi
    @pietrogalassi

    Hi all, trying to enable StreamConfig with this code :
    @Bean
    public StreamsConfig streamsConfig() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaGroupId);
    configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);

        if (kafkaSSLEnabled) {
            configs.put("security.protocol", "SSL");
            configs.put("ssl.truststore.location", kafkaTrustoreLocation);
            configs.put("ssl.truststore.password", kafkaTrustorePassword);
        }
        return new StreamsConfig(configs);
    }

    does not connect to the AdminServer. How to fix ?

    Either changing to:
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)

    gives:
    The bean 'defaultKafkaStreamsConfig', defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [it/galax/injestor/orchestrator/consumers/KafkaStreamConfig.class] and overriding is disabled.

    1 reply
    Rakesh
    @ravening
    Hi all, is there a way to display the topic name on which the message is received? I have kafka consumer listening on atleast 10 topics... the message can arrive on any topic . so I want to know on which topic the messaging is coming in
    I have code like this @KafkaListener(topics = {"a, b, c,d,e,f,g,h,i,j,k"}