@KafkaListener(topics = {"a, b, c,d,e,f,g,h,i,j,k"}
Hi @garyrussell. Would you be able to point me to an example where the static <K,V> KafkaSender<K,V> create(ProducerFactory factory, SenderOptions<K,V> options)
from these docs https://projectreactor.io/docs/kafka/release/api/ is used? I appreciate it's Project Reactor Kafka and not Spring-Kafka but Sergei Egorov told me you're very familiar with that project too and that I should try and ask you.
It's the custom ProducerFactory that causes me a bit of a problem. The documentation says factory - A custom producer factory other than the default.
but does that mean I need to extend some type of base class or are there certain attributes/characteristics it must have?
The reason we're trying to do this is because we're trying to get kafka metrics for reactive consumer/producer the same way that we would do it for non-reactive (like this):
kafkaConsumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));
Any help is much appreciated. Cheers
Anyone can please tell me how to add headers in kafkaStreams ?
Hi have something like this:
injestorDataStream.groupByKey()
.aggregate(InjestorWorkerEvent::new, (key, value, aggregate) -> aggregate.addInjested(value),
Materialized.with(Serdes.String(), new JsonSerde<>(InjestorWorkerEvent.class)))
.toStream().to(injestionMonitorTopic,
Produced.with(Serdes.String(), new JsonSerde<>(InjestorWorkerEvent.class)));
But i want to add custom headers to the produced event on topic. How to do it ? Thanks
Hi,
I’d like to send and receive several different types of JSON messages (ReqA, ReqB, etc.) that all extend one specific type (abstract class Request). I would like to use (at)JsonTypeInfo and (at)JsonSubTypes in order to use inheritance.
I have some questions about it and would be grateful for some pieces of advice.
Concerning producers, I want to use StringSerializer for keys and JsonSerializer for values. Then, I was thinking about covering all cases with a single KafkaTemplate<String, Object>.
Is such a setting (single KT<S, O>) sufficient to produce different (JSON) types that inherit from an abstract class? Is there something I should have in mind when using spring-kafka and inheritance?
Can I use a single KafkaTemplate<String, Object> for each type or do I have to make it specific in order to produce messages of a given type i.e KafkaTemplate<String, A>, KafkaTemplate<String, B> ?
For such an use-case should I go for JsonSerializer or StringSerializer with StringJsonMessageConverter?
How can I distinguish different types if they are in a single topic? Are headers way to go?
Thank you.
Hi there, I am trying to implement Circuit Breaker with Resillience4J around Spring Cloud Stream Kafka Binder.
@Bean
public java.util.function.Consumer<Message<SomePojo>> process() {
return message -> {
var consumer = message.getHeaders().get(KafkaHeaders.CONSUMER, Consumer.class);
var acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
var currentState = circuitBreakerRegistry.circuitBreaker(BACKEND_SERVICE).getState();
if (currentState == CircuitBreaker.State.OPEN) {
var topic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class);
var partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
if (acknowledgment != null) {
acknowledgment.nack(10);
}
}
// some business logic
if (acknowledgment != null) {
log.debug("Acknowledged");
acknowledgment.acknowledge();
}
};
}
The above solution works for one message in the topic. When I have multiple messages, the app fails to poll after around 15 minutes. Did I miss anything?
Thank you.
Hey all just got a quick query and hoping one of you might be able to see where i've gone wrong.
in my application.properties i have
spring.kafka.bootstrap-servers=http://atesturl.com:9092
and then i have application-test.properties i have
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Now when i run my actual tests with @ActiveProfiles("test")
, i can see that the app runs, with the embeddedkafka broker and i can see the @KafkaListeners
being called correctly after publishing messages....but i can see in the logs that it is firstly trying to connect using the "http://atesturl.com:9092" in my default application.properties
So i end up with this in the logs
Failed authentication with http://atesturl.com:9092/{the ip address} (Authentication failed)
but then afterwards it seems to use the embedded kafka version. Its almost like its starting up in the application.properties
version before being eventually overriden with the one in the application-test.properties
file
I can see from here https://stackoverflow.com/questions/50244744/spring-kafka-when-is-exactly-consumer-poll-called-behind-the-hood Gary has answered the question of "When is a Consumer's poll called behing the scenes" saying
In 1.3 and above there is a single thread per consumer; the next poll() is performed after the last message from the previous poll has been processed by the listener.
however, if there are 0 messages retrieved from Kafka so basically nothing on the queue to process, how often does it poll then?
Hi, I'm trying to build an application in which there will be two spring boot app. One(A) will send a file path to a Kafka topic & another app (B) will listen to the same topic. After B receives the file path from A it uploads to object storage. Now the problem is the upload takes time as the file size can be any. So many times there is happening to rebalance. I'm not understanding how to implement this in a better way.
What I'm doing now is as follow:
topics = "topic",
containerFactory = "configureKafkaListenerContainerFactory",
errorHandler = "kafkaConsumerErrorHandler")
public void consume(
ConsumerRecord<String, String> record, Acknowledgment acknowledgment)
throws Exception {
UploadService.upload(record.value());
acknowledgment.acknowledge();
}
Can anyone help me to do it in a better way, please?
:point_up: Edit: Hi, I'm trying to build an application in which there will be two spring boot app. One(A) will send a file path to a Kafka topic & another app (B) will listen to the same topic. After B receives the file path from A it uploads to object storage. Now the problem is the upload takes time as the file size can be any. So many times there is happening to rebalance. I'm not understanding how to implement this in a better way.
What I'm doing now is as follow:
@KafkaListener(
topics = "topic",
containerFactory = "configureKafkaListenerContainerFactory",
errorHandler = "kafkaConsumerErrorHandler")
public void consume(
ConsumerRecord<String, String> record, Acknowledgment acknowledgment)
throws Exception {
UploadService.upload(record.value());
acknowledgment.acknowledge();
}
Can anyone help me to do it in a better way, please?
@Configuration @EnableKafkaStreams
. This class creates two beans of type KStream<String, String>. I've configured a shutdown hook which call (inside a @PreDestroy
callback) the method StreamsBuilderFactoryBean::close()
(StreamsBuilderFactoryBean is injected with @Autowire
). Is this the right approach? thanks.
`kafkaTemplate.send(
"test-topic", 1, "body"
).addCallback(
new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("failure");
}
@Override
public void onSuccess(SendResult<Integer, String> integerStringSendResult) {
System.out.println("success");
}
}
);`
org.apache.kafka.clients.consumer.ConsumerConfig The configuration 'spring.json.trusted.packages' was supplied but isn't a known config.
The consumer seems to work and the config looks like described here https://docs.spring.io/spring-boot/docs/2.4.5/reference/htmlsingle/#boot-features-kafka-extra-props@KafkaListener(topics = "in", containerFactory = "testFactory")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
String result = process(record);
kafkaTemplate.send("out", result).addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable throwable) {
LOGGER.error("Failed to send message: {}", result, throwable);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
LOGGER.debug("Successfully send message: {}", result);
ack.acknowledge();
}
});
}
.get()
on to ensure its performed synchronously but i want to unit test all the possible scenarios
sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.common.errors.DisconnectException
Hi Everyone, I am new in Kafka and trying to test my application with embedded kafka my class config like below:
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka
And I am using below versions:
junit:4.13.2
kafka:2.6.0
kafka confluent avro serializer: 5.2.1
But I got below error:
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/zookeeper/client/ZKClientConfig
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1786)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:413)
at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:116)
at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:277)
at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:635)
at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:390)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:325)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:123)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:124)
... 25 more
Caused by: java.lang.NoClassDefFoundError: org/apache/zookeeper/client/ZKClientConfig
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1285)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1272)
at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:315)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1845)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1782)
... 34 more
Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.client.ZKClientConfig
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 39 more
Could somebody please help me if you faced with such an issue before. Thanks in advance
hi - see that the next release has an upgrade to kafka 2.8.0. Should I expect a kafka mismatch with the latest 2.7 release?
Having some weird errors like Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/kafka/common/record/BufferSupplier