ConcurrentKafkaListenerContainerFactory
called kafkaListenerContainerFactory
and i want to add new listener with batch listener turned on, so i created another ConcurrentKafkaListenerContainerFactory
called kafkaBatchListenerContainerFactory
.@KafkaListener
to use the new batch listener containerFactory, the topic still the same.spring.kafka.consumer.group-id: test-group
and auto-offset-reset: earliest
Guys, I need to make connection with ssl to kafka, that is I should prepare the kafka configuration server side as well.
I created the keystore.jks + truststore.jks
which I read from zone : https://dzone.com/articles/kafka-ssl-client-authentication-in-multi-tenancy-a
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed
And also I change the config/service.properties
like this:
listeners=PLAINTEXT://:9092,SSL://:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.client.auth=required
ssl.keystore.location=/home/kafka/kafka/conf/kafka.server.keystore.jks
ssl.keystore.password=eg12345
ssl.truststore.location=/home/kafka/kafka/conf/kafka.server.truststore.jks
ssl.truststore.password=eg12345
ssl.key.password=eg12345
ssl.protocol=TLS
ssl.endpoint.identification.algorithm=
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
#ssl.keymanager.algorithm=SunX509
#ssl.trustmanager.algorithm=PKIX
ssl.keystore.type=JKS
ssl.truststore.type=JKS
#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#allow.everyone.if.no.acl.found=true
javax.net.debug=ssl
Then kafka is going up without any issue.
but when ever I tried with client which is spring configured spring starter. I received the handshakes failed error form both side. kafka.log
and client .
Also this is my config from spring side:
spring:
kafka:
bootstrap-servers:
- broker:9093
ssl:
key-password: eg12345
key-store-location: classpath:/jks/kafka.client.keystore.jks
key-store-password: eg12345
key-store-type: JKS
trust-store-location: classpath:/jks/kafka.client.truststore.jks
trust-store-password: eg12345
trust-store-type: JKS
protocol: SSL
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-consumer-group
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Error Receive from client side:[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Bootstrap broker broker:9093 (id: -1 rack: null) disconnected
Error Receive from kafka side:[SocketServer brokerId=0] Failed authentication with /47.251.66.91 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
So I'm trying out the Cloud Stream Kafka Binder:
I have a question regarding the functional programming model. My topology has two input topics and one output topic, so I'm using a BiFunction. In my case I need to do some other processing with the input Kstreams that I receive, like filtering and repartition the streams. So can I create two other Functions that process those input streams and then have the bifunction use them as parameters with Function Composition. or should I do all that processing in the BiFunction?
.setConcurrency()
method of ConcurrentKafkaListenerContainerFactory
). Spring version - 1.5.6.RELEASE, spring-kafka version - 1.2.2.RELEASE
Approaches to handle 10 million records of 10 tables each with several joins
I have 10 million records of 10 tables each with several joins. I am looking for the best alternative or DB assign or approach to read the records very quickly that is the query should be fast.
option#1 - normalize the tables, don't go for joins unnecessarily
option#2 - add all the columns in 1st query where multiple times the
where conditions will be used in the looping construct
option#3 - go for nosql database instead of mysql
Please advise
Thanks
Hello.
The DeadLetterPublishingRecoverer
use a template resolver to get a valid KafkaTemplate
to publish accepted messages.
But according to this
https://github.com/spring-projects/spring-kafka/blob/0d430b95ff1c398c7d02978db5a5c0c369901216/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L450
The template is only based on the value class.
My messages being <String, String> I created a template for <String, String> messages.
But how to handle deserializations error cases ?
?
Hello,
I use the stateful retry mechanism so I can retry processing messages a few time with a fixed delay between each attempts and it's not working
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<String, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template), 3));
factory.setRetryTemplate(retryTemplate());
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
template.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(MAX_ATTEMPTS,
Collections.singletonMap(RetryableException.class, true),
true);
template.setRetryPolicy(retryPolicy);
return template;
}
I'm throwing a RetryableException
in the KafkaListener
.
Did I miss something ? Thanks
Hello - a best practice we embrace is to fail a bean initialization when connection check to an external resource fails. For a Kafka consumer bean, what's the best way to check for broker availability during its initialization?
AFAIK there is no straightforward way to catch Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
warning.
Thanks!
A better approach is to construct a KafkaAdmin object during bean initialization, and try to do something like list topics and fail the bean initialization if it can't.
However I am looking to see if there is a best practice at all.
Hi guys, I stuck with KStream and Java-8. I need your help.
I was just confused about this method, and I need to create some unit tests with this method. Is there any hint and sample?
public Function<KStream<String, String>, KStream<String, String>[]> handler() {
return transactionStream -> transactionStream
.map((key, value) -> extractStatus(key, value))
.branch((key, value) -> true);
}
Hi @garyrussell
It seems that the reactive Kafka consumer stopped once there’s an exception occurred in the processing logic, such as an WebClientResponseException
exception.
receiver.receive()
.flatMap(record -> process(record))
.doOnError(log::error)
.subscribe();
what is the correct way to avoid that?
Hello, I'm using RetryingBatchErrorHandler for a batch listener with the FixedBackOff and it seems to work well.
However when the error handler retries, no retrying log is shown so there's no way to acknowledge for me whether this handler is retrying or not. It only shows the error log that I configured as ConsumerRecordRecoverer only when the retries are exhausted.
Is there any way to print the every retrying log while the error handler is retrying?
Below is the part of the container factory configuration
factory.setBatchListener(true);
final var errorHandler = new RetryingBatchErrorHandler(new FixedBackOff(3000, 5), (consumerRecord, e) -> {
log.error("failed record - {}", consumerRecord);
});
factory.setBatchErrorHandler(errorHandler);