olegz on 3.0.x
Make StreamBridge generate Mess… (compare)
olegz on master
Make StreamBridge generate Mess… (compare)
Hello!
The document says (same for the consumer)
The following properties are only available for Kafka Streams producers and must be prefixed with
spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
For convenience, if there multiple output bindings and they all require a common value, that can be configured by using the prefixspring.cloud.stream.kafka.streams.default.producer.
so I tried using it like below but it doesn't work.
spring.cloud.stream.kafka.streams.default.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.default.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.default.producer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.default.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
this works
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
Is there anyone who's having the same issue?
TimeoutException: Failed to send request after 30000 ms
and DisconnectException: null
and, they don't consumer any messages from the topic.
I would like to run my consumers in different thread.
Environment:
- springBoot: 2.4.4
- springCloud: 2020.0.2
- springCloudStream: 3.1.1
Configuration:
spring:
cloud:
stream:
bindings:
createProduct-in-0:
destination: create_product
group: playground-service
function:
definition: createProduct
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true
configuration:
num.stream.threads: 2
bindings:
createProduct-in-0:
consumer:
concurrency: 2
but still all consumers run in one single thread,
2021-03-20 19:17:31.161 INFO [playground-service,03b04f0d730a9a76,768bacebed1187e6] 37454 --- [container-0-C-1] c.p.stream.CreateProductHandler : Received ProductDto(id=null, name=iPhone 8)
2021-03-20 19:17:49.212 INFO [playground-service,57f8ece5553f7aeb,fcf5651f72f43e44] 37454 --- [container-0-C-1] c.p.stream.CreateProductHandler : Received ProductDto(id=null, name=Galaxy A1)
any configuration missing?
Hi, I'm having trouble running demo code for Kafka Streams Spring integration as documented in https://spring.io/blog/2019/12/02/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-1-programming-model.
Once I start the application I always get the following exception:
Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.5.jar:5.3.5]
at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
... 13 common frames omitted
By looking at the code the exception is related to KafkaStreamsMicrometerListener
begin null, but this class is not mentioned in mentioned documentation.
Is anybody familiar with this problem?
Hi again :) I have a problem I have construction like
public Consumer<KStream<String, Object>> consume() {
return obj -> obj.process(() -> new MyProcessor() {
my logic with state store
context.forward(key , value , to.child()) <- and here I want to send to topic
})
but problem that when I get topology from my processor no node exist? how to set up it?
I found two ways to handle errors in Kafka messages processing,
spring:
cloud:
stream:
bindings:
createProduct-in-0:
destination: create_product
group: playground-service
consumer:
concurrency: 2
function:
definition: createProduct
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true
bindings:
createProduct-in-0:
consumer:
autoCommitOnError: true
autoCommitOffset: true
enableDlq: true
dlqName: retry_queue
With this the failed messages are pushed to configured DLQ. While processing, if consumer is restarted also, the message is re-processed next.
DeadLetterPublishingRecoverer(kafkaOperations) { r, _ ->
TopicPartition("retry_queue", r.partition())
}
With this some times getting org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic retry_queue not present in
metadata after 60000 ms
. The message is committed in Kafka, but it is not sent to DLQ. So there is a message loss with this approach.
What will be the recommended approach? Is there anyway to avoid commit, if there is any error in DeadLetterPublishingRecoverer
In the next version of spring-kafka (2.7), the DLPR will throw an exception buy default if the send fails. With older versions, you can override
publish()
and wait for the future to complete.
When I use DeadLetterPublishingRecoverer
, if the topic
or partition
doesn’t exist, it is throwing org.apache.kafka.common.errors.TimeoutException
. Do I need to make sure those exists? In case of DLQ approach, it is taken care by the spring
NewTopic
@Bean
to provision the topic. See https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics
KafkaAdmin
so you just need the topic bean (and spring.kafka.bootstrap-servers
).
Correct; auto-provisioning of the dead letter topic is only done when using binder-level DLQ; you can simply add a
NewTopic
@Bean
to provision the topic. See https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics
Great. Tried and it worked. Thanks
Hi Team,
I am new to cloud stream, but documentation is great and easy to understand.
I am stuck at an issue, of handling errors in Kinesis-binder.
My sample project use kafka and kinesis (2.1.0) binders with Ilford depdendencies.
I am trying to extract <destination-channel>.<Group>.errors and errorChannel , with ServiceActivator help to push the data in DLQ
@ServiceActivator(inputChannel = "errorChannel")
public Boolean onError(Message message) {
For Kafka failures I am receiving errors in errorChannel,
but kinesis binder is not publishing any message,
I can see in logs <destination-channel>.<Group>.errors
also as default subscriber System Logger I can see error message in console, but not able to capture in function.
Can anybody point me to right direction here.!
ERROR [] [SimpleAsyncTaskExecutor-9] o.s.c.s.c.ApplicationJsonMessageMarshallingConverter Failed to convert payload [B@18dc3602
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.example.model.Message` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123123123)
at [Source: (byte[])"123123123"; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1455)
at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1081)
at com.fasterxml.jackson.databind.deser.ValueInstantiator.createFromInt(ValueInstantiator.java:262)
at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromInt(StdValueInstantiator.java:356)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromNumber(BeanDeserializerBase.java:1359)
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:178)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:166)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3563)
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.lambda$convertParameterizedType$0(ApplicationJsonMessageMarshallingConverter.java:151)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:166)
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:104)
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)
at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.fromMessage(SmartCompositeMessageConverter.java:61)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputMessageIfNecessary(SimpleFunctionRegistry.java:1066)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:864)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:579)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:434)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:79)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:717)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:559)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at
Hey guys, I have a question, I am using reactive form of the new spring cloud stream function for my kafka and have a requirement to read the partition sequentially: it means, it should not consume multi message at the same time. I tried approach below:
spring.cloud.stream.binding.my-bind.consumer.concurrency: 1
I also tried to use
.subscribeOn(Schedulers.single())
but still none of them are working and it consume multi messages from same partition at the same time :(
Spring Cloud Stream function
way in Reactive Kafka Producer
. When I tried StreamBridge
, got error that it is a blocking method. When I checked this (https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleProducer.java) example, this is using KafkaSender
for producer. Any otherway other than using ReactiveKafkaProducerTemplate
and KafkaSender
for sending messages in reactive way?
Hi, I am consuming from a 3 partition topic using spring cloud stream kafka functional model. I am running the application on kubernetes. The moment I scale the application to 2 pods, the consumers on Confluent dashboard goes to 0. Any idea why this happens ? I tried increasing concurrency using spring.cloud.stream.instanceCount=3, but even with just one pod, I see just one thread on the application.
Can anyone explain what I am missing ?
spring:
profiles:
active: ${environment}
application:
name: lp-location
cloud:
gcp:
project-id: operations-lower
stream:
function:
definition: processMaster
bindings:
processMaster-in-0:
destination: MASTER
kafka:
streams:
binder:
functions:
processLpMaster:
applicationId: Location-MasterSync
brokers: localhost:9092
autoCreateTopics: false
configuration:
schema.registry.url: http://localhost:8081
specific.avro.reader: true
commit.interval.ms: 1000
bindings:
processMaster-in-0:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
I have some questions regarding manual ack on consumer. I'm implementing a retry mechanism which pauses the consumer for a certain duration based on the consumed payload's retry timestamp. I'm trying to disable the consumer's auto commit offset and only acknowledge the offset before I send the payload to retry. So basically if the payload is not yet ready for the retry I would not acknowledge the offset and immediately pause the consumer. After the consumer gets resumed it will be able to consume the same payload (since the offset did not advance) and check the timestamp to determine if the payload is ready for retry.
In my app setting I have the following config: spring.cloud.stream.kafka.bindings.retryProcessor-in-0.consumer.autoCommitOffset=false
.
In the java code I have the following:
MessageHeaders headers = message.getHeaders();
ApiPayload apiPayload = (ApiPayload) message.getPayload();
Consumer<?, ?> consumer = headers.get(KafkaHeaders.CONSUMER, Consumer.class);
String topic = headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
Integer partitionId = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
if (apiPayload.getRetryTime() > system.currentTimeMillis()){
consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
} else {
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
// do the retry...
}
However when I was testing the code the offset seems to be AUTO committed. Because after the consumer is resumed it does not consume the message.
By default SCSt Kafka uses ByteArraySerializer
for both key & value. I am using the same for compatibility with existing code.
When I send message via StreamBridge
producer & consumer works fine.
e.g.,
val message = MessageBuilder.withPayload(product)
.setHeader("partitionKey", product.name)
.build()
streamBridge.send(postProductBinding, message)
But when I try to use KafkaSender (https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleProducer.java), getting
Caused by: java.lang.ClassCastException: class [B cannot be cast to class c.p..dto.ProductDto ([B is in module java.base of loader 'bootstrap'; c.p..dto.ProductDto is in unnamed module of loader 'app')
Here is my producer configuration,
fun buildProducerProperties(): Map<String, Any> {
val configs = mutableMapOf<String, Any>()
configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = brokers
configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
return configs
}
@Bean
fun <T> kafkaSender(): KafkaSender<ByteArray, T> {
val senderOptions: SenderOptions<ByteArray, T> = SenderOptions.create(buildProducerProperties())
return KafkaSender.create(senderOptions)
}
val kafkaSender: KafkaSender<ByteArray, ByteArray>
val payload = toByteArray(product)
kafkaSender.send(Flux.just(payload).map { p -> SenderRecord.create(ProducerRecord<>("create_product", p), null) })
.timeout(Duration.ofMinutes(1))
.doOnError { throwable -> log.error("Error Sending", throwable) }
.subscribe { res ->
val meta = res.recordMetadata()
log.info(meta.topic() + " " + meta.offset())
}