Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • May 12 17:27
    olegz commented #2172
  • May 12 17:26
    olegz commented on 7c384e7
  • May 12 17:13
    ghilainm commented #2172
  • May 12 16:38
    spencergibb commented on 7c384e7
  • May 12 16:36
    olegz commented on 7c384e7
  • May 12 16:23
    olegz commented #2172
  • May 12 15:59
    spencergibb commented on 7c384e7
  • May 12 10:18
    ghilainm commented #2172
  • May 12 09:21
    ghilainm commented #2172
  • May 12 01:11
    pivotal-cla commented #2174
  • May 12 01:10
    pivotal-cla commented #2174
  • May 12 01:10
    vxavictor513 opened #2174
  • May 11 16:00
    olegz commented #2156
  • May 11 15:51
    breucode commented #2156
  • May 11 14:33
    olegz commented #2172
  • May 11 10:13
    ferblaca commented #2170
  • May 10 16:04
    ghilainm commented #2172
  • May 10 16:03
    ghilainm commented #2172
  • May 10 15:27
    olegz commented #2172
  • May 10 15:22
    ghilainm commented #2172
terancet
@terancet
Could anybody point me the right direction for the further investigation?
terancet
@terancet
Also, should I check the the configuration property auto.offset.reset and set its value to earliest in order to consume messages from the first offset commit?
Gary Russell
@garyrussell
I suggest DEBUG logging for org.apache.kafka. It will automatically be set to earliest if the binding has a group property. You can see the ConsumerConfig in the INFO log emitted by the kafka-clients.
Anonymous
@keirjames

Hello everybody! Quick & easy question from me:
Is it possible to use Spring Cloud Stream w/ Kafka Streams Binder to consume as a KStream from one Kafka Broker, and produce the KStream to a different Kafka Broker?

ie/ BrokerA -> Application -> BrokerB

Thanks!

5 replies
pedrorlmarques
@pedrorlmarques
Hello, I'm using spring cloud stream Kafka binder 3.1.0, and I'm facing a behaviour that I don't know if it's normal or not.
I have the following configuration ackMode: MANUAL_IMMEDIATE and also I configured the DLQName and enableDlq to true.
The use case is the following:
1- The listener receives a message and processes it with success and acknowledges it.
2- The listener receives a message that it throws an exception and sends directly to the DLQ and i'm assuming that automatically acknowledges it.
3- The listener receives a message and processes it with success acknowledges it.
4 - In the end if i check the spring.cloud.stream.binder.kafka.offset metrics the value is 0 but then immediately it changes to 2.
The consumer doesn't process these 2 unconsumed messages until I restart the consumer or upon successfully processing a message.
Is this a valid behaviour?
24 replies
Kevin Lewis
@kevinrlewis
Hello, I'm running into an issue using spring-cloud-stream kafka with jasypt spring boot starter. I'm showing that when I start my application regardless of what my application does it starts sending configProps to kafka to a configPropsSingleton-out-0 topic. Is there any way to disable this? Seems like there is some connection with spring integration invoking the message to be send: bean 'configPropsSingleton_integrationflow.router#0'. Let me know if I need to provide more information.
25 replies
eplouhinec
@eplouhinec
Hello! I'm trying to add management of correlationId in my @StreamListener methods which listen kafka events. I use headers to put correlationId in it. Then, in ly listener, I read this header value and put it in MDC. At the end of the method, I remove it from MDC.
I'm not convinced by robustness of this system. Furthermore, I have to do it on each @StreamListener method which is very redundant. Do you have some tips to give to me? AOP is the only way to factorize this?
Swethaa Ramesh
@swethaaramesh_twitter
Hello, I'm trying to develop an application that uses a @StreamListener interface and calls different classes to handle the event based on a header... I'm trying to test this using Mocks but unable to do so.. are there any testing examples with mocks ?
4 replies
ashevade1
@ashevade1
Hi I am using the cloud stream kafka binder , version 3.0.1.RELEASE. I am trying to fetch the RecordMetadata when a message is published. I am not clear on the documentation, when it says it can be achieved by specifying the 'recordMetadaçtaChannel' which is bean to which the record metadata is sent. Does anyone have any samples of how to do this. Thanks.
jay
@legendjaks

I would like to run my consumers in different thread.

Environment:
- springBoot: 2.4.4
- springCloud: 2020.0.2
- springCloudStream: 3.1.1

Configuration:

spring:
  cloud:
    stream:
      bindings:
        createProduct-in-0:
          destination: create_product
          group: playground-service
      function:
        definition: createProduct
      kafka:
        binder:
          brokers: localhost:9092
          autoAddPartitions: true
          configuration:
            num.stream.threads: 2
        bindings:
          createProduct-in-0:
            consumer:
              concurrency: 2

but still all consumers run in one single thread,

2021-03-20 19:17:31.161  INFO [playground-service,03b04f0d730a9a76,768bacebed1187e6] 37454 --- [container-0-C-1] c.p.stream.CreateProductHandler  : Received ProductDto(id=null, name=iPhone 8)
2021-03-20 19:17:49.212  INFO [playground-service,57f8ece5553f7aeb,fcf5651f72f43e44] 37454 --- [container-0-C-1] c.p.stream.CreateProductHandler  : Received ProductDto(id=null, name=Galaxy A1)

any configuration missing?

2 replies
verified that these 2 messages are from different partition
Hau Van
@redondoxx
This message was deleted
7 replies
Hau Van
@redondoxx
Hi guys, when i declare the producer and the consumer in the same Spring Application, spring boot not firing the message to kafka, but the consumer still receive the message, why is it?
Swethaa Ramesh
@swethaaramesh_twitter
Hi, quick question, if I want to listen on a topic that may have different event types, can I use a generic object in the @StreamListener interface, something like this :
@StreamListener(ChannelBindings.inTopic)
public void handle(@Headers MessageHeaders headers, @Payload Object event),
Or would we need to specify the exact type for conversion
5 replies
antunonline
@antunonline

Hi, I'm having trouble running demo code for Kafka Streams Spring integration as documented in https://spring.io/blog/2019/12/02/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-1-programming-model.
Once I start the application I always get the following exception:

Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.5.jar:5.3.5]
    at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    ... 13 common frames omitted

By looking at the code the exception is related to KafkaStreamsMicrometerListener begin null, but this class is not mentioned in mentioned documentation.
Is anybody familiar with this problem?

1 reply
antunonline
@antunonline
I have found a solution for this problem. For some reason if Spring Actuator is not enabled Spring Kafka Streams module wont work.
So to make it work just include spring actuator in your pom.xml.
Igor Rudenko
@fac30ff

Hi again :) I have a problem I have construction like

public Consumer<KStream<String, Object>> consume() {
return obj -> obj.process(() -> new MyProcessor() {
my  logic with state store
context.forward(key , value , to.child()) <- and here I want to send to topic
})

but problem that when I get topology from my processor no node exist? how to set up it?

3 replies
alqh
@alqh
Hello. I'm trying to use the Kafka stream, but I want to batch a few messages together then process them all together, is there.a way i can do this ?
2 replies
jay
@legendjaks

I found two ways to handle errors in Kafka messages processing,

1 - Use DLQ

spring:
  cloud:
    stream:
      bindings:
        createProduct-in-0:
          destination: create_product
          group: playground-service
          consumer:
            concurrency: 2
      function:
        definition: createProduct
      kafka:
        binder:
          brokers: localhost:9092
          autoAddPartitions: true
        bindings:
          createProduct-in-0:
            consumer:
              autoCommitOnError: true
              autoCommitOffset: true
              enableDlq: true
              dlqName: retry_queue

With this the failed messages are pushed to configured DLQ. While processing, if consumer is restarted also, the message is re-processed next.

2 - Using DeadLetterPublishingRecoverer

DeadLetterPublishingRecoverer(kafkaOperations) { r, _ ->
            TopicPartition("retry_queue", r.partition())
        }

With this some times getting org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic retry_queue not present in metadata after 60000 ms. The message is committed in Kafka, but it is not sent to DLQ. So there is a message loss with this approach.

What will be the recommended approach? Is there anyway to avoid commit, if there is any error in DeadLetterPublishingRecoverer

Gary Russell
@garyrussell
In the next version of spring-kafka (2.7), the DLPR will throw an exception buy default if the send fails. With older versions, you can override publish() and wait for the future to complete.
jay
@legendjaks

In the next version of spring-kafka (2.7), the DLPR will throw an exception buy default if the send fails. With older versions, you can override publish() and wait for the future to complete.

Thanks. As per your suggestion I have overridden the publish and it worked

jay
@legendjaks

In the next version of spring-kafka (2.7), the DLPR will throw an exception buy default if the send fails. With older versions, you can override publish() and wait for the future to complete.

When I use DeadLetterPublishingRecoverer, if the topic or partition doesn’t exist, it is throwing org.apache.kafka.common.errors.TimeoutException. Do I need to make sure those exists? In case of DLQ approach, it is taken care by the spring

Gary Russell
@garyrussell
Correct; auto-provisioning of the dead letter topic is only done when using binder-level DLQ; you can simply add a NewTopic @Bean to provision the topic. See https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics
Boot auto-configures a KafkaAdmin so you just need the topic bean (and spring.kafka.bootstrap-servers).
jay
@legendjaks

Correct; auto-provisioning of the dead letter topic is only done when using binder-level DLQ; you can simply add a NewTopic @Bean to provision the topic. See https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics

Great. Tried and it worked. Thanks

ms0205
@ms0205
I am getting "Backoff none exhausted for ConsumerRecord" from "org.springframework.kafka.listener.SeekToCurrentErrorHandler". I am guessing, this is happening after my application closes after an exception - ""message":"Failed to stop bean 'inputBindingLifecycle'". is there any way to clean these bindings or recover from this?
3 replies
I am using Kafka binder
parthashah
@Parthashah

Hi Team,

I am new to cloud stream, but documentation is great and easy to understand.
I am stuck at an issue, of handling errors in Kinesis-binder.

My sample project use kafka and kinesis (2.1.0) binders with Ilford depdendencies.
I am trying to extract <destination-channel>.<Group>.errors and errorChannel , with ServiceActivator help to push the data in DLQ

@ServiceActivator(inputChannel = "errorChannel")
 public Boolean onError(Message message) {

For Kafka failures I am receiving errors in errorChannel,
but kinesis binder is not publishing any message,
I can see in logs <destination-channel>.<Group>.errors also as default subscriber System Logger I can see error message in console, but not able to capture in function.

Can anybody point me to right direction here.!

4 replies
parthashah
@Parthashah
Console logs for reference
 ERROR [] [SimpleAsyncTaskExecutor-9] o.s.c.s.c.ApplicationJsonMessageMarshallingConverter  Failed to convert payload [B@18dc3602
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.example.model.Message` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123123123)
 at [Source: (byte[])"123123123"; line: 1, column: 1]

at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1455)
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1081)
    at com.fasterxml.jackson.databind.deser.ValueInstantiator.createFromInt(ValueInstantiator.java:262)
    at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromInt(StdValueInstantiator.java:356)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromNumber(BeanDeserializerBase.java:1359)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:178)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:166)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3563)
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.lambda$convertParameterizedType$0(ApplicationJsonMessageMarshallingConverter.java:151)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:166)
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:104)
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)
    at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.fromMessage(SmartCompositeMessageConverter.java:61)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputMessageIfNecessary(SimpleFunctionRegistry.java:1066)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:864)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:579)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:434)
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:79)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:717)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:559)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at
Sidney Lee
@sesidlee_twitter
hi, guys.
i was wondering if anyone could point me to some best practice or good example for configuring multiple event types for the same topic. we have a need where event ordering is important in two rather different use cases. our event types can contain a lot of fields so common avros or unions won't work (and we are moving away from confluent as platform though, if that is relevant).
  • any tips would be greatly appreciated.
40 replies
Igor Rudenko
@fac30ff
Hi, ladies and gentelmens.
I have a question is possible with functional style (spring cloud stream kafka streams) consume avro object (with acro serde) store in state store middleware object as json (with json serde) and then produce avro again? And if you know about this approach please help in thread answer :)
7 replies
Navid Ghahremani
@ghahramani
This message was deleted

Hey guys, I have a question, I am using reactive form of the new spring cloud stream function for my kafka and have a requirement to read the partition sequentially: it means, it should not consume multi message at the same time. I tried approach below:

spring.cloud.stream.binding.my-bind.consumer.concurrency: 1

I also tried to use

.subscribeOn(Schedulers.single())

but still none of them are working and it consume multi messages from same partition at the same time :(

Leonardo Torres
@leonardo_ta_twitter
Hello guys, good morning . I am looking for the way to use a different serializer for each binding and not declare a default one at binder level
Is it possible with ClientFactoryCustomizer ? , i want to try but don't find any example
i am using spring cloud stream kafka wiht native encoding / decoding
5 replies
jay
@legendjaks
I want to use Spring Cloud Stream function way in Reactive Kafka Producer. When I tried StreamBridge, got error that it is a blocking method. When I checked this (https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleProducer.java) example, this is using KafkaSender for producer. Any otherway other than using ReactiveKafkaProducerTemplate and KafkaSender for sending messages in reactive way?
4 replies
hnazmatrix
@hnazmatrix:matrix.org
[m]

Hi, I am consuming from a 3 partition topic using spring cloud stream kafka functional model. I am running the application on kubernetes. The moment I scale the application to 2 pods, the consumers on Confluent dashboard goes to 0. Any idea why this happens ? I tried increasing concurrency using spring.cloud.stream.instanceCount=3, but even with just one pod, I see just one thread on the application.

Can anyone explain what I am missing ?

spring:
  profiles:
    active: ${environment}
  application:
    name: lp-location
  cloud:
    gcp:
      project-id: operations-lower
    stream:
      function:
        definition: processMaster
      bindings:
        processMaster-in-0:
          destination: MASTER
      kafka:
        streams:
          binder:
            functions:
              processLpMaster:
                applicationId: Location-MasterSync
            brokers: localhost:9092
            autoCreateTopics: false
            configuration:
              schema.registry.url: http://localhost:8081
              specific.avro.reader: true
              commit.interval.ms: 1000
          bindings:
            processMaster-in-0:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
10 replies
Ali Yeganeh
@AliYeganeh
Hi
How can I intercept Spring Cloud Stream ?
2 replies
Andras Hatvani
@andrashatvani
hi, is there a way to automatically deserialize json/bytearray-serialized events into the according type? @sobychacko
7 replies
Ali Yeganeh
@AliYeganeh
Is it possible to log inbound messages from stream function?
5 replies
AttitudeL
@AttitudeL

I have some questions regarding manual ack on consumer. I'm implementing a retry mechanism which pauses the consumer for a certain duration based on the consumed payload's retry timestamp. I'm trying to disable the consumer's auto commit offset and only acknowledge the offset before I send the payload to retry. So basically if the payload is not yet ready for the retry I would not acknowledge the offset and immediately pause the consumer. After the consumer gets resumed it will be able to consume the same payload (since the offset did not advance) and check the timestamp to determine if the payload is ready for retry.

In my app setting I have the following config: spring.cloud.stream.kafka.bindings.retryProcessor-in-0.consumer.autoCommitOffset=false.
In the java code I have the following:

MessageHeaders headers = message.getHeaders();
ApiPayload apiPayload = (ApiPayload) message.getPayload();
Consumer<?, ?> consumer = headers.get(KafkaHeaders.CONSUMER, Consumer.class);
String topic = headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
Integer partitionId = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
if (apiPayload.getRetryTime() > system.currentTimeMillis()){
    consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
} else {
    if (acknowledgment != null) {
        acknowledgment.acknowledge();
    }
    // do the retry...
}

However when I was testing the code the offset seems to be AUTO committed. Because after the consumer is resumed it does not consume the message.

13 replies
jay
@legendjaks

By default SCSt Kafka uses ByteArraySerializer for both key & value. I am using the same for compatibility with existing code.

When I send message via StreamBridge producer & consumer works fine.

e.g.,

val message = MessageBuilder.withPayload(product)
                .setHeader("partitionKey", product.name)
                .build()
streamBridge.send(postProductBinding, message)

But when I try to use KafkaSender (https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleProducer.java), getting

Caused by: java.lang.ClassCastException: class [B cannot be cast to class c.p..dto.ProductDto ([B is in module java.base of loader 'bootstrap'; c.p..dto.ProductDto is in unnamed module of loader 'app')

Here is my producer configuration,

fun buildProducerProperties():  Map<String, Any> {

    val configs = mutableMapOf<String, Any>()
    configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = brokers
    configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
    configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
    return configs
}

@Bean
fun <T> kafkaSender(): KafkaSender<ByteArray, T> {

    val senderOptions: SenderOptions<ByteArray, T> = SenderOptions.create(buildProducerProperties())
    return KafkaSender.create(senderOptions)
}

val kafkaSender: KafkaSender<ByteArray, ByteArray>
val payload = toByteArray(product)
kafkaSender.send(Flux.just(payload).map { p -> SenderRecord.create(ProducerRecord<>("create_product", p), null) })
        .timeout(Duration.ofMinutes(1))
        .doOnError { throwable -> log.error("Error Sending", throwable) }
        .subscribe { res ->
            val meta = res.recordMetadata()
            log.info(meta.topic() + " " + meta.offset())
        }
21 replies
jay
@legendjaks
Instead of configuring Spring Cloud Stream Bindings in properties file as spring.cloud.stream.bindings, is there anyway to configure this programatically ? like using @Bean ?
Multiple services will produce to one topic and only one service will consume. So if I can configure in code, I can re-use the code
jay
@legendjaks
In imperative way SeekToCurrentErrorHandler can be used for handling errors in Kafka. Is there anything available for Reactive kafka consumers?
jay
@legendjaks

In imperative way SeekToCurrentErrorHandler can be used for handling errors in Kafka. Is there anything available for Reactive kafka consumers?

Got it. Its out of scope of SCSt. spring-cloud/spring-cloud-stream#1922

Vladimir Kabanov
@vkabanorv
Hi all! Sorry if this is a noob question. How/where can I get spring-cloud-stream-binder-kafka-streams version 3.1.3-SNAPSHOT for use in my Java project? The most recent version on maven is 3.1.2. I need 3.1.3 because I'm running into this issue https://stackoverflow.com/questions/66881210/spring-cloud-stream-kafka-streams-binder-kafkaexception-could-not-start-stream
5 replies
Sidney Lee
@sesidlee_twitter
hi, guys. does anyone know how to configure the internal repartition/changelog topics in spring?
https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#internal-topic-parameters
we are defining value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy to enable multiple event types within one of our topics, but this does not seem to be applied to the internal repartition/changelog topics associated with that topic... any suggestions?
6 replies
Ali Yeganeh
@AliYeganeh
This message was deleted
2 replies