I am using spring-integration where I read from kafka.
message has headers , with type attribute that causes issues of Deserialization .
I want to disable headers use - I tried everything:
@Bean
public ConsumerFactory<String, String> consumerFactory(){
properties.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
return new DefaultKafkaConsumerFactory<String,String>(properties,new StringDeserializer(),new JsonDeserializer( Item[].class,false));
}
tried to pass:
-Dspring.cloud.stream.kafka.streams.binder.configuration.spring.json.use.type.headers=false
-Dspring.json.use.type.headers=false
-Dspring.kafka.consumer.properties.spring.json.add.type.headers=false
nothing works, it still reads from header
Hello, I'm using @RetryableTopic
and I was wondering if it was possible (depending on exception thrown class)
containerFactory.setCommonErrorHandler(new DefaultErrorHandler(new ExponentialBackOff()));
) to perform an infinite retry ?Actually I couldn't use the include
+ exclude
together.
Here is the use case :
Hello, guys !
I have a task to introduce idempotency keys as part of the event consuming.
For this purpose, I've created an implementation of RecordInterceptor<K, V>.
Within it, I would like to store the idempotency key into the database.
It should happen as part of the same transaction as the event processing(within the @KafkaListener).
So I can guarantee that if an exception occurs during event processing, storing of idempotency key will be rolled back.
Could you please advice me how can I achieve that?
Here is my code example:
@Component
public class IdempotencyKeyConsumerInterceptor<K,V> implements RecordInterceptor<K, V> {
@Override
@Transactional
public ConsumerRecord<K, V> intercept(final ConsumerRecord<K, V> record) {
String idempotencyKey = getIdempotencyKey(record.headers());
return saveIdempotencyKey(idempotencyKey, record);
}
}
And a listener:
@Component
public class IdempotencyKeyTestListener {
private final LibraryUserService libraryUserService;
@KafkaListener(topics = JSON_TOPIC_NAME_NEW, groupId = GROUP_ID, autoStartup = "false")
@Transactional
public void testListener(@Payload final IdempotencyTestJsonEntity testJsonEntity) {
log.info("Idempotency test listener invoked.")
libraryUserService.saveLibraryUserEntity(testJsonEntity.getMessage());
}
}
Thanks in advance.
Hello, we've been using Spring Kafka happily for a long time, but now I have a use-case I couldn't solve yet.
So we use many @KafkaListeners to listen to a few topics and would like to use only 1 Kafka Consumer in the whole application. As I see Spring Kafka will create many Kafka Consumers in the background, that is generally fine, but in this use case we'd like to use only 1 so that we can use the copartitioning feature of the RangeAssignor rebalancing strategy in the Kafka Client.
Is there a way to annotate the @KafkaListeners or use own own Container in a way that it only creates 1 Kafka Consumer in the background?
I am currently using spring kafka with spring cloud stream binder kafka and I am successfully able to produce and consume messages. In addition, i wanted to also connect to Azure eventhub and hence, when i add the spring cloud azure binder dependency in the pom.xml, it results in spring kafka code not working and results in APIClient exception and client getting disconnected in infinite loop.
I use the spring cloud azure version as follows and follow azure documentation to connect to eventhub
<spring-cloud-azure.version>4.3.0</spring-cloud-azure.version>
Using this library and binder, i get the following in logs
Received error INVALID_REQUEST from node -1 when making an ApiVersionsRequest with correlation id 9. Disconnecting.
Can someone point to me if i need to seperate azure eventhub project into a seperate spring boot application?
Hi Gary & Artem! 👋
We noticed that KafkaTemplate
always closes the producer if it’s not a transactional template even though the docs state the following:
When not using Transactions, by default, the DefaultKafkaProducerFactory creates a singleton producer used by all clients, as recommended in the KafkaProducer javadocs. However, if you call flush() on the template, this can cause delays for other threads using the same producer. Starting with version 2.3, the DefaultKafkaProducerFactory has a new property producerPerThread. When set to true, the factory will create (and cache) a separate producer for each thread, to avoid this issue.
Even though we have one producer we will open and close a new connection to our Kafka cluster for every message we send.
We were wondering if this is intentional (and the reasoning behind this) or if it’s a bug in the code. Thanks for your feedback!
I am currently using spring kafka with spring cloud stream binder kafka and I am successfully able to produce and consume messages. In addition, i wanted to also connect to Azure eventhub and hence, when i add the spring cloud azure binder dependency in the pom.xml, it results in spring kafka code not working and results in APIClient exception and client getting disconnected in infinite loop.
I use the spring cloud azure version as follows and follow azure documentation to connect to eventhub
<spring-cloud-azure.version>4.3.0</spring-cloud-azure.version>
Using this library and binder, i get the following in logs
Received error INVALID_REQUEST from node -1 when making an ApiVersionsRequest with correlation id 9. Disconnecting.
Can someone point to me if i need to seperate azure eventhub project into a seperate spring boot application?
Hi everyone, I have an issue regarding Kafka - Event Hub. I made a demo based on https://blog.nebrass.fr/playing-with-spring-boot-and-kafka-on-azure-event-hub/ and it's working fine, I can have a listener listen to the Azure topic. However, when I try to integrate the same code into my company project, it has a problem related to the SSL handshake. Here is the log:
[2022-08-15 10:25:42][ERROR] o.apache.kafka.clients.NetworkClient.processDisconnection 739 - [AdminClient clientId=first-producer-on-the-planet] Connection to node -1 (<MyNameSpace>.servicebus.windows.net/<myIP>:9093) failed authentication due to: SSL handshake failed
While event hub uses SASL_SSL/PLAIN and it's working fine on the demo (same Kafka client/Kafka Spring version with company app). Hope that anyone can shed some light.
auto.offset.reset
in my application.properties file? I tried using spring.kafka.consumer.auto-offset-reset=latest
.auto.offset.reset = none
and the other one is auto.offset.reset = earliest
based on the startup logs
Hey @garyrussell and everyone,
Can someone please help me understand what's going wrong here?
I want to implement delayed queue (DLQ) using ConcurrentKafkaListenerContainer pause-resume functionality.
When a message is received on a DLQ, we'll check the last processed time and want to pause the container if it is not aged enough.
Since the same logic needs to be implemented for all other DLQs as well, I have used MethodKafkaListenerEndpoint to create containers.
If the message is not aged enough, I throw an exception and I call registry.allcontainers.pause, since I want to pause all other DLQ listeners as well.
It works fine, the offsets are not committed and I see on the broker for the given group, there's a lag of 1.
I have set setAckAfterHandle to false in DefaultErrorHandler as well.
However, when I call registry.allcontainers.resume this last message is not getting consumed.
If I send a new message to the DLQ then it will pick the latest message and reset the lag to 0.
However, I am losing 1 record in this case.
@Component
public class Processor {
@Autowired
public void process(StreamsBuilder builder) {
// do stream processing stuff
}
}
Hi @garyrussell and team, please I need help I am a newbie in this space, and I have an issue getting my configuration right, I am trying to send the same message to different Kafka-topic for different consumers to consume.
but I am not getting any messages at the consumer end, while the producer is sending the message successfully. I guess it's from my application.yml file where I have the config setup.This is the Producer config
spring: data: mongodb: authentication-database: admin user: ${mongodb_user} password: ${mongodb_password} host: localhost port: 27017 database: orderServiceDB application: name: OrderService stream: kafka: binder: brokers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} auto-add-partitions: true min-partition-count: 2 bindings: orderApproval-in-0: destination: vendorServiceTopic, paymentService content-type: application/*+avro group: vendorServiceGroup consumer: max-attempts: 4 back-off-initial-interval: 10000 sendOrder-out-0: destination: orderServiceTopic content-type: application/*+avro producer: partitioned: true partition-key-expression: headers['partitionKey'] partition-count: 2 zipkin: base-url: http://localhost:9411/ sleuth: sampler: probability: 1
And these are the Consumers
spring: data: mongodb: authentication-database: admin user: ${mongodb_user} password: ${mongodb_password} host: localhost port: 27017 database: productServiceDB cloud: schema-registry-client: endpoint: http://${SCHEMA_REGISTRY_HOST:localhost}:${SCHEMA_REGISTRY_PORT:8081} stream: kafka: binder: brokers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} auto-add-partitions: true bindings: sendOrder-in-0: destination: orderEvent content-type: application/*+avro group: orderServiceGroup consumer: instanceCount: 2 instanceIndex: 2 max-attempts: 4 back-off-initial-interval: 10000
spring: data: mongodb: authentication-database: admin user: ${mongodb_user} password: ${mongodb_password} host: localhost port: 27017 database: customerServiceDB application: name: CustomerService cloud: schema-registry-client: endpoint: http://${SCHEMA_REGISTRY_HOST:localhost}:${SCHEMA_REGISTRY_PORT:8081} stream: kafka: binder: brokers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} auto-add-partitions: true bindings: customerService-in-0: destination: customerOrderEvent content-type: application/*+avro group: orderServiceGroup consumer: max-attempts: 4 back-off-initial-interval: 10000 zipkin: base-url: http://localhost:9411/
when I run the code I get the following but I am still confused, can someone explain to me like a baby
partitions assigned: [customerOrderEvent-in-0-0]
partitions assigned: [orderEvent-in-0-0]
thanks
Hey all, I have a question about a ConsumerInterceptor
and why onCommit
may not be being called.
I have a consumer app configured with an annotation based listener. Auto-commit is false and the listener is configured with ack-mode
set to batch.
I'm attempting to write an integration test that verifies records are committed. I'm doing this by configuring a producer in a test configuration along with a ConsumerInterceptor
. The test opens a transaction and uses the injected KafkaTemplate
to publish a message. The test then effectively asserts ConsumerInterceptor::onCommit
is called.
This never happens because it seems like commitSync/async
is never called.
https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java#L3231
The commits
list in KafkaMessageListenerContainer
is always empty.
While the test itself here is not super important I do want to make sure I'm configuring everything correctly. The test itself could in theory just check the configuration ack-mode
. That said I'm really not sure why commit(A)sync
is never called. Anyone happen to have any insights here?
I'm having a problem with MirrorMaker. If this isn't the correct forum, can you please let me know where I should ask?
During the replication process, this line of code is returning null for upstreamTopic
, which creates a NullPointerException on the next line. I can't find an associated Kafka issue. Has anyone ever experienced this? It seems like a Kafka bug.
Hello we are trying to achive at least once with spring-kafka
Consumer:
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
ackMode = AckMode.RECORD
Producer
spring.kafka.producer.acks=all
spring.kafka.producer.retries=2147483647
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.max.in.flight.requests.per.connection=1
Will the above config give us at least once ? Or are we missing something?
Just picked up spring-projects/spring-boot#32984 on #spring-boot and I think I need to add a new field to ContainerProperties
Do I need to submit a pull request to #spring-kafka and have it merged, then update the library version in #spring-boot and make the associated change there?
Is there a way I can bundle these pull requests together?
Spring Boot 3.0.0-RC2
to slowly prepare to the new version that soon will be available. I am no longer able to find classes like org.springframework.kafka.test.EmbeddedKafkaBroker
in spring-kafka-test
. I also can’t find any documentation refering that it was moved somewhere else or replaced.Hello.
Any help please about how RangeAssignor
works.
From javadoc, they say
"
The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition.
For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
The assignment will be:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
"
Is that correct ??
For me it should be
C0: [t0p0, t0p1, t1p0 ]
C1: [t1p1, t0p2, t1p2] because " the number of partitions to assign to each consumer is 3"
Am I wrong ??
Hi, if I run a spring application with @KafkaListener
annotated consumers on multiple server instances, is KafkaListenerEndpointRegistry
aware of consumers on all the instances? Or would it only know listener containers on its own server instance?
I ask because I need to pause a specific topic whenever a particular operation is executing, and I thought of using MessageListenerContainer.pausePartition()
for it, but it should pause the partition on consumers on all server instances of course.