:point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
@Bean(name = "xxxEventKafkaListenerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> smartAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryUsingTopicName(xxxAlertsTopic));
factory.setBatchListener(true);
factory.setConcurrency(numberOfConcurrentListeners);
factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
log.info("Created xxxKafkaListerFactory !!");
return factory;
}
Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";
Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.
2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-alerts-xxx-consumer-local-1, groupId=ps-alerts-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container
:point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
@Bean(name = "xxxEventKafkaListenerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> xxxAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryUsingTopicName(ps-mon.0));
factory.setBatchListener(true);
factory.setConcurrency(numberOfConcurrentListeners);
factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
log.info("Created xxxKafkaListerFactory !!");
return factory;
}
Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";
Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.
2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-alerts-xxx-consumer-local-1, groupId=ps-alerts-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container
Not sure about what is missing here? Can't we have different factories with different credentials to access the topics? This is a spring boot application using Spring-kafka. Any pointers would be really appreciated?
:point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
@Bean(name = "xxxEventKafkaListenerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> xxxAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryUsingTopicName(ps-mon.0));
factory.setBatchListener(true);
factory.setConcurrency(numberOfConcurrentListeners);
factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
log.info("Created xxxKafkaListerFactory !!");
return factory;
}
consumerFactoryUsingTopicName(String topicName) -> returns the default config values like
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";
Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.
2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-alerts-xxx-consumer-local-1, groupId=ps-alerts-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container
Not sure about what is missing here? Can't we have different factories with different credentials to access the topics? This is a spring boot application using Spring-kafka. Any pointers would be really appreciated?
:point_up: Edit: Hi, I have multiple KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> to consume messages from different topics. My code is as below
@Bean(name = "xxxEventKafkaListenerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> xxxAlertEventKafkaListenerFactory() throws AWSServiceProviderException {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryUsingTopicName(ps-mon.0));
factory.setBatchListener(true);
factory.setConcurrency(numberOfConcurrentListeners);
factory.getContainerProperties().setAckMode(AckMode.valueOf(listenerAckMode));
log.info("Created xxxKafkaListerFactory !!");
return factory;
}
consumerFactoryUsingTopicName(String topicName) -> returns the default config values like
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
Few of the KafkaListenerContainerFactories use the same SASL credentials (user name and pwd) - org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s";
Whereas the other KafkaListenerContainerFactory uses different SASL credentials (user name and pwd). This is causing some issue and I am getting the below error.
2021-05-07 19:31:52 ERROR o.a.k.c.Metadata:283 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - [Consumer clientId=consumer-ps-as-xxx-consumer-local-1, groupId=ps-as-xxx-consumer-local] Topic authorization failed for topics [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Authorization Exception and no authorizationExceptionRetryInterval set
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ps-mon.0]
2021-05-07 19:31:52 ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:140 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] - - - Fatal consumer exception; stopping container
Not sure about what is missing here? Can't we have different factories with different credentials to access the topics? This is a spring boot application using Spring-kafka. Any pointers would be really appreciated?
producerPerConsumerPartition=false
property but it starts to freak out and rebalance groups much sooner than with the prop in true
. Is there a way to define a pool of threads instead of 1thread:Npartitions or 1thread:1partition? Something in the middle? With 1:N it seems that the throughput goes down quick, and the other needs a mega-server to handle all the threads. Is adding more replicas with 1:1 the only way? (adding replicas in 1:N doesn't seem to change). Using spring-kafka 2.6.7 and kafka 2.6.0.
Hi all, trying to enable StreamConfig with this code :
@Bean
public StreamsConfig streamsConfig() {
Map<String, Object> configs = new HashMap<>();
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaGroupId);
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
if (kafkaSSLEnabled) {
configs.put("security.protocol", "SSL");
configs.put("ssl.truststore.location", kafkaTrustoreLocation);
configs.put("ssl.truststore.password", kafkaTrustorePassword);
}
return new StreamsConfig(configs);
}
does not connect to the AdminServer. How to fix ?
Either changing to:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
gives:
The bean 'defaultKafkaStreamsConfig', defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [it/galax/injestor/orchestrator/consumers/KafkaStreamConfig.class] and overriding is disabled.
@KafkaListener(topics = {"a, b, c,d,e,f,g,h,i,j,k"}
Hi @garyrussell. Would you be able to point me to an example where the static <K,V> KafkaSender<K,V> create(ProducerFactory factory, SenderOptions<K,V> options)
from these docs https://projectreactor.io/docs/kafka/release/api/ is used? I appreciate it's Project Reactor Kafka and not Spring-Kafka but Sergei Egorov told me you're very familiar with that project too and that I should try and ask you.
It's the custom ProducerFactory that causes me a bit of a problem. The documentation says factory - A custom producer factory other than the default.
but does that mean I need to extend some type of base class or are there certain attributes/characteristics it must have?
The reason we're trying to do this is because we're trying to get kafka metrics for reactive consumer/producer the same way that we would do it for non-reactive (like this):
kafkaConsumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));
Any help is much appreciated. Cheers
Anyone can please tell me how to add headers in kafkaStreams ?
Hi have something like this:
injestorDataStream.groupByKey()
.aggregate(InjestorWorkerEvent::new, (key, value, aggregate) -> aggregate.addInjested(value),
Materialized.with(Serdes.String(), new JsonSerde<>(InjestorWorkerEvent.class)))
.toStream().to(injestionMonitorTopic,
Produced.with(Serdes.String(), new JsonSerde<>(InjestorWorkerEvent.class)));
But i want to add custom headers to the produced event on topic. How to do it ? Thanks
Hi,
I’d like to send and receive several different types of JSON messages (ReqA, ReqB, etc.) that all extend one specific type (abstract class Request). I would like to use (at)JsonTypeInfo and (at)JsonSubTypes in order to use inheritance.
I have some questions about it and would be grateful for some pieces of advice.
Concerning producers, I want to use StringSerializer for keys and JsonSerializer for values. Then, I was thinking about covering all cases with a single KafkaTemplate<String, Object>.
Is such a setting (single KT<S, O>) sufficient to produce different (JSON) types that inherit from an abstract class? Is there something I should have in mind when using spring-kafka and inheritance?
Can I use a single KafkaTemplate<String, Object> for each type or do I have to make it specific in order to produce messages of a given type i.e KafkaTemplate<String, A>, KafkaTemplate<String, B> ?
For such an use-case should I go for JsonSerializer or StringSerializer with StringJsonMessageConverter?
How can I distinguish different types if they are in a single topic? Are headers way to go?
Thank you.
Hi there, I am trying to implement Circuit Breaker with Resillience4J around Spring Cloud Stream Kafka Binder.
@Bean
public java.util.function.Consumer<Message<SomePojo>> process() {
return message -> {
var consumer = message.getHeaders().get(KafkaHeaders.CONSUMER, Consumer.class);
var acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
var currentState = circuitBreakerRegistry.circuitBreaker(BACKEND_SERVICE).getState();
if (currentState == CircuitBreaker.State.OPEN) {
var topic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class);
var partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
if (acknowledgment != null) {
acknowledgment.nack(10);
}
}
// some business logic
if (acknowledgment != null) {
log.debug("Acknowledged");
acknowledgment.acknowledge();
}
};
}
The above solution works for one message in the topic. When I have multiple messages, the app fails to poll after around 15 minutes. Did I miss anything?
Thank you.
Hey all just got a quick query and hoping one of you might be able to see where i've gone wrong.
in my application.properties i have
spring.kafka.bootstrap-servers=http://atesturl.com:9092
and then i have application-test.properties i have
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Now when i run my actual tests with @ActiveProfiles("test")
, i can see that the app runs, with the embeddedkafka broker and i can see the @KafkaListeners
being called correctly after publishing messages....but i can see in the logs that it is firstly trying to connect using the "http://atesturl.com:9092" in my default application.properties
So i end up with this in the logs
Failed authentication with http://atesturl.com:9092/{the ip address} (Authentication failed)
but then afterwards it seems to use the embedded kafka version. Its almost like its starting up in the application.properties
version before being eventually overriden with the one in the application-test.properties
file
I can see from here https://stackoverflow.com/questions/50244744/spring-kafka-when-is-exactly-consumer-poll-called-behind-the-hood Gary has answered the question of "When is a Consumer's poll called behing the scenes" saying
In 1.3 and above there is a single thread per consumer; the next poll() is performed after the last message from the previous poll has been processed by the listener.
however, if there are 0 messages retrieved from Kafka so basically nothing on the queue to process, how often does it poll then?
Hi, I'm trying to build an application in which there will be two spring boot app. One(A) will send a file path to a Kafka topic & another app (B) will listen to the same topic. After B receives the file path from A it uploads to object storage. Now the problem is the upload takes time as the file size can be any. So many times there is happening to rebalance. I'm not understanding how to implement this in a better way.
What I'm doing now is as follow:
topics = "topic",
containerFactory = "configureKafkaListenerContainerFactory",
errorHandler = "kafkaConsumerErrorHandler")
public void consume(
ConsumerRecord<String, String> record, Acknowledgment acknowledgment)
throws Exception {
UploadService.upload(record.value());
acknowledgment.acknowledge();
}
Can anyone help me to do it in a better way, please?
:point_up: Edit: Hi, I'm trying to build an application in which there will be two spring boot app. One(A) will send a file path to a Kafka topic & another app (B) will listen to the same topic. After B receives the file path from A it uploads to object storage. Now the problem is the upload takes time as the file size can be any. So many times there is happening to rebalance. I'm not understanding how to implement this in a better way.
What I'm doing now is as follow:
@KafkaListener(
topics = "topic",
containerFactory = "configureKafkaListenerContainerFactory",
errorHandler = "kafkaConsumerErrorHandler")
public void consume(
ConsumerRecord<String, String> record, Acknowledgment acknowledgment)
throws Exception {
UploadService.upload(record.value());
acknowledgment.acknowledge();
}
Can anyone help me to do it in a better way, please?
@Configuration @EnableKafkaStreams
. This class creates two beans of type KStream<String, String>. I've configured a shutdown hook which call (inside a @PreDestroy
callback) the method StreamsBuilderFactoryBean::close()
(StreamsBuilderFactoryBean is injected with @Autowire
). Is this the right approach? thanks.
`kafkaTemplate.send(
"test-topic", 1, "body"
).addCallback(
new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("failure");
}
@Override
public void onSuccess(SendResult<Integer, String> integerStringSendResult) {
System.out.println("success");
}
}
);`
org.apache.kafka.clients.consumer.ConsumerConfig The configuration 'spring.json.trusted.packages' was supplied but isn't a known config.
The consumer seems to work and the config looks like described here https://docs.spring.io/spring-boot/docs/2.4.5/reference/htmlsingle/#boot-features-kafka-extra-props@KafkaListener(topics = "in", containerFactory = "testFactory")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
String result = process(record);
kafkaTemplate.send("out", result).addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
LOGGER.error("Failed to send message: {}", result, throwable);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
LOGGER.debug("Successfully send message: {}", result);
ack.acknowledge();
}
});
}
.get()
on to ensure its performed synchronously but i want to unit test all the possible scenarios
sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.