.setConcurrency()
method of ConcurrentKafkaListenerContainerFactory
). Spring version - 1.5.6.RELEASE, spring-kafka version - 1.2.2.RELEASE
Approaches to handle 10 million records of 10 tables each with several joins
I have 10 million records of 10 tables each with several joins. I am looking for the best alternative or DB assign or approach to read the records very quickly that is the query should be fast.
option#1 - normalize the tables, don't go for joins unnecessarily
option#2 - add all the columns in 1st query where multiple times the
where conditions will be used in the looping construct
option#3 - go for nosql database instead of mysql
Please advise
Thanks
Hello.
The DeadLetterPublishingRecoverer
use a template resolver to get a valid KafkaTemplate
to publish accepted messages.
But according to this
https://github.com/spring-projects/spring-kafka/blob/0d430b95ff1c398c7d02978db5a5c0c369901216/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L450
The template is only based on the value class.
My messages being <String, String> I created a template for <String, String> messages.
But how to handle deserializations error cases ?
?
Hello,
I use the stateful retry mechanism so I can retry processing messages a few time with a fixed delay between each attempts and it's not working
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<String, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template), 3));
factory.setRetryTemplate(retryTemplate());
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
template.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(MAX_ATTEMPTS,
Collections.singletonMap(RetryableException.class, true),
true);
template.setRetryPolicy(retryPolicy);
return template;
}
I'm throwing a RetryableException
in the KafkaListener
.
Did I miss something ? Thanks
Hello - a best practice we embrace is to fail a bean initialization when connection check to an external resource fails. For a Kafka consumer bean, what's the best way to check for broker availability during its initialization?
AFAIK there is no straightforward way to catch Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
warning.
Thanks!
A better approach is to construct a KafkaAdmin object during bean initialization, and try to do something like list topics and fail the bean initialization if it can't.
However I am looking to see if there is a best practice at all.
Hi guys, I stuck with KStream and Java-8. I need your help.
I was just confused about this method, and I need to create some unit tests with this method. Is there any hint and sample?
public Function<KStream<String, String>, KStream<String, String>[]> handler() {
return transactionStream -> transactionStream
.map((key, value) -> extractStatus(key, value))
.branch((key, value) -> true);
}
Hi @garyrussell
It seems that the reactive Kafka consumer stopped once there’s an exception occurred in the processing logic, such as an WebClientResponseException
exception.
receiver.receive()
.flatMap(record -> process(record))
.doOnError(log::error)
.subscribe();
what is the correct way to avoid that?
Hello, I'm using RetryingBatchErrorHandler for a batch listener with the FixedBackOff and it seems to work well.
However when the error handler retries, no retrying log is shown so there's no way to acknowledge for me whether this handler is retrying or not. It only shows the error log that I configured as ConsumerRecordRecoverer only when the retries are exhausted.
Is there any way to print the every retrying log while the error handler is retrying?
Below is the part of the container factory configuration
factory.setBatchListener(true);
final var errorHandler = new RetryingBatchErrorHandler(new FixedBackOff(3000, 5), (consumerRecord, e) -> {
log.error("failed record - {}", consumerRecord);
});
factory.setBatchErrorHandler(errorHandler);
Hello guys, I'm using ackMode=MANUAL_IMMEDIATE and receiving on my listener Acknowledgment as a parameter, but I keep seeing this error sometimes on my logs "Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group" full stack trace on:
on the listener method i'm using acknowledgment.acknowledge() (error is from this line); and I catch a custom exception, I do acknowledgment.nack.
Any suggestions on how to avoid this
2022-03-12 19:14:12.176 ERROR 37634 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.ListenerInvokingBatchErrorHandler.handle(ListenerInvokingBatchErrorHandler.java:36) ~[spring-kafka-2.6.7.jar!/:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1431) ~[spring-kafka-2.6.7.jar!/:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar!/:2.6.7]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:an]
Hi All! Does anyone know how to configure spring.json.trusted.packages
for Kafka Streams? I'm using the following config:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfig(KafkaProperties kafkaProperties) {
return new KafkaStreamsConfiguration(Map.of(
StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getStreams().getApplicationId(),
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getStreams().getBootstrapServers(),
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()
));
}
My application.yml is the following:
spring:
kafka:
streams:
bootstrap-servers: XXX.XXX.XXX.XXX:9092
application-id: spring-kafka-test
properties:
processing.guarantee: exactly_once
spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*
But when I change the package of the entity published into Kafka, I'm receiving the following exception:
Caused by: java.lang.IllegalArgumentException: The class 'com.mypackage.streams.entity.kafka.raw.Entity' is not in the trusted packages: [java.util, java.lang, com.mypackage.streams.entity.kafka, com.mypackage.streams.entity.kafka.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
... 9 more
Looks like the property added to the appplication.yml does not work.
Hello everyone,
We have made some refactorings regarding the Kafka client configurations in the sub-modules (Gradle) of our project and we started to suffer with IllegalStateException("Expected 1 but got 0 partitions")
during the integration tests.
Inside a @BeforeAll
function, we expect containers to be properly assigned to topics and call ContainerTestUtils.waitForAssignment()
for that. However, we are getting the exception mentioned above and initially, there has been no change in our clients' settings.
We use spring-boot:2.6.5
(with spring-kafka:2.8.4
) and our test broker runs on docker.
Any suggestion in this regard? The only way we found to work around the situation was by adding a Thread.sleep(10_000)
(which is not an option, of course) in @BeforeAll
or manually defining the Topic and Partition in @KafkaListeners
.