hey team, can you confirm when using KafkaTemplate's method send(topic, key, value) it will calculate the partition based on the message key?
if you go to https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java#L406
it basically creates a ProducerRecord, like this: https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java#L133
which seems to send null as partition
I found this question https://stackoverflow.com/questions/45556142/spring-kafka-partitioning seems to confirm it, but where does it happen?
When i set ReceiverOptions.commintInterval in my java configuration, i don't see changes in consumer configuration. There only one property - auto.commit.interaval.ms.
Does ReceiverOptions.commintInterval(Duration d) in reactor-kafka overrides kafka property auto.commit.interval.ms?
When i use reactor-kafka can i set commit interval via kafka properties or i should set it exactly in ReceiverOptions configuration?
ConcurrentKafkaListenerContainerFactory
called kafkaListenerContainerFactory
and i want to add new listener with batch listener turned on, so i created another ConcurrentKafkaListenerContainerFactory
called kafkaBatchListenerContainerFactory
.@KafkaListener
to use the new batch listener containerFactory, the topic still the same.spring.kafka.consumer.group-id: test-group
and auto-offset-reset: earliest
Guys, I need to make connection with ssl to kafka, that is I should prepare the kafka configuration server side as well.
I created the keystore.jks + truststore.jks
which I read from zone : https://dzone.com/articles/kafka-ssl-client-authentication-in-multi-tenancy-a
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed
And also I change the config/service.properties
like this:
listeners=PLAINTEXT://:9092,SSL://:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.client.auth=required
ssl.keystore.location=/home/kafka/kafka/conf/kafka.server.keystore.jks
ssl.keystore.password=eg12345
ssl.truststore.location=/home/kafka/kafka/conf/kafka.server.truststore.jks
ssl.truststore.password=eg12345
ssl.key.password=eg12345
ssl.protocol=TLS
ssl.endpoint.identification.algorithm=
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
#ssl.keymanager.algorithm=SunX509
#ssl.trustmanager.algorithm=PKIX
ssl.keystore.type=JKS
ssl.truststore.type=JKS
#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#allow.everyone.if.no.acl.found=true
javax.net.debug=ssl
Then kafka is going up without any issue.
but when ever I tried with client which is spring configured spring starter. I received the handshakes failed error form both side. kafka.log
and client .
Also this is my config from spring side:
spring:
kafka:
bootstrap-servers:
- broker:9093
ssl:
key-password: eg12345
key-store-location: classpath:/jks/kafka.client.keystore.jks
key-store-password: eg12345
key-store-type: JKS
trust-store-location: classpath:/jks/kafka.client.truststore.jks
trust-store-password: eg12345
trust-store-type: JKS
protocol: SSL
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-consumer-group
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Error Receive from client side:[Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Bootstrap broker broker:9093 (id: -1 rack: null) disconnected
Error Receive from kafka side:[SocketServer brokerId=0] Failed authentication with /47.251.66.91 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
So I'm trying out the Cloud Stream Kafka Binder:
I have a question regarding the functional programming model. My topology has two input topics and one output topic, so I'm using a BiFunction. In my case I need to do some other processing with the input Kstreams that I receive, like filtering and repartition the streams. So can I create two other Functions that process those input streams and then have the bifunction use them as parameters with Function Composition. or should I do all that processing in the BiFunction?
.setConcurrency()
method of ConcurrentKafkaListenerContainerFactory
). Spring version - 1.5.6.RELEASE, spring-kafka version - 1.2.2.RELEASE
Approaches to handle 10 million records of 10 tables each with several joins
I have 10 million records of 10 tables each with several joins. I am looking for the best alternative or DB assign or approach to read the records very quickly that is the query should be fast.
option#1 - normalize the tables, don't go for joins unnecessarily
option#2 - add all the columns in 1st query where multiple times the
where conditions will be used in the looping construct
option#3 - go for nosql database instead of mysql
Please advise
Thanks
Hello.
The DeadLetterPublishingRecoverer
use a template resolver to get a valid KafkaTemplate
to publish accepted messages.
But according to this
https://github.com/spring-projects/spring-kafka/blob/0d430b95ff1c398c7d02978db5a5c0c369901216/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L450
The template is only based on the value class.
My messages being <String, String> I created a template for <String, String> messages.
But how to handle deserializations error cases ?
?
Hello,
I use the stateful retry mechanism so I can retry processing messages a few time with a fixed delay between each attempts and it's not working
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<String, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template), 3));
factory.setRetryTemplate(retryTemplate());
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
template.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(MAX_ATTEMPTS,
Collections.singletonMap(RetryableException.class, true),
true);
template.setRetryPolicy(retryPolicy);
return template;
}
I'm throwing a RetryableException
in the KafkaListener
.
Did I miss something ? Thanks
Hello - a best practice we embrace is to fail a bean initialization when connection check to an external resource fails. For a Kafka consumer bean, what's the best way to check for broker availability during its initialization?
AFAIK there is no straightforward way to catch Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
warning.
Thanks!
A better approach is to construct a KafkaAdmin object during bean initialization, and try to do something like list topics and fail the bean initialization if it can't.
However I am looking to see if there is a best practice at all.
Hi guys, I stuck with KStream and Java-8. I need your help.
I was just confused about this method, and I need to create some unit tests with this method. Is there any hint and sample?
public Function<KStream<String, String>, KStream<String, String>[]> handler() {
return transactionStream -> transactionStream
.map((key, value) -> extractStatus(key, value))
.branch((key, value) -> true);
}