Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
  • Oct 16 19:24
    marcospds commented #2230
  • Oct 15 17:49

    olegz on 3.1.x

    Temporarily disable few tests (compare)

  • Oct 15 17:39

    olegz on 3.1.x

    GH-2235 Fix partitioning suppor… Update function dependency in P… (compare)

  • Oct 14 14:30
    sobychacko commented #2219
  • Oct 12 21:56
    bono007 commented #2230
  • Oct 12 19:32
    sobychacko commented #2230
  • Oct 12 19:31
    sobychacko commented #2230
  • Oct 12 14:54
    sabbyanandan assigned #2231
  • Oct 12 14:54
    sabbyanandan assigned #2233
  • Oct 10 19:37
    TheNotary commented #2024
  • Oct 10 10:13
    5aab commented #2234
  • Oct 10 03:36
    bono007 commented #2228
  • Oct 10 03:14
    bono007 commented #2230
  • Oct 07 20:32
    artembilan opened #2233
  • Oct 07 17:03
    olegz commented #2221
  • Oct 07 15:11
    olegz commented #2221
  • Oct 07 15:05

    olegz on main

    GH-2221 - Add internal caching … (compare)

  • Oct 07 15:00
    olegz commented #2221
  • Oct 07 14:11
    jaiananth commented #2230
  • Oct 07 14:10
    jaiananth commented #2230
Troy Taillefer
Hi Anyone have a good example of fanning out from a single input to multiple outputs with different binders ? Guidance and hints on how to do this would also be greatly appreciated.
Hi, I'm using Spring cloud Stream for connecting to Kafka, i have not found any example for setting JASS config with "OAuthBearerLoginModule" to authenticate producer/consumer using service principal. any reference and suggestions please
3 replies
K solo

Hello everyone, I’m having a particularly ‘nigglesome’ problem with interceptors I desperately need help with.
I’m using ProducerInterceptor and ConsumerInterceptor within a spring cloud streams application and have configured them via a StreamsBuilderFactoryConfig bean, they are firing as expected and I can retrieve some attributes of the Producer and Consumer Records, there is however a problem retrieving the actual ‘key’ and ‘value’ of these records. I’ve tried to assign/cast it to the expected value but I get a class cast exception. My initial instincts suggest this might be a serialisation/deserialisation issue. There isn’t however any indication of this elsewhere as the application works fine. The consumer binder has a default key/value serdes set as :

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde

Whilst. The producer binder has a key/value serdes set as :

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

Example output I get printing out the key/value from a Producer record is :

ProducerRecord(topic=.supplier.batch.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=[B@1089613d, value=[B@4a4e10fd, timestamp=1623572275792

Received Message timestamp =1623571603723, partition =3, offset = 92, key = [B@49e41727, value = [B

The following exception is thrown when trying to assign the key to a String instance :
Error executing interceptor onConsume callback
java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap’)

I’ve copied the ‘onConsume’ method of the consumer interceptor to provide a little more detail

public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> records) {
    log.info("---<< ProducerRecord being received {} >>---", records);
    for (ConsumerRecord<String, Object> record : records) {
        if (record.topic().equals(LOCATION_SUPPLIER_SOURCE)) {
            log.info("---<< setting timestamp for  >>---");
            String key = record.key();
            System.out.printf("Received Message: timestamp =%s, partition =%s, offset = %d, key = %s, value = %s\n",
                    record.timestamp(), record.partition(), record.offset(), record.key(), record.value().getClass().getName());
    return records;
26 replies
Soumendra Daas

Hi, I am trying to figure out how to start off my Kafka Consumer ( using Spring Cloud String Kafka of course) at a custom offset value. I will be using MirrorMaker 2.0 to replicate the data to a secondary cluster. To start the application I will need to figure out the translated offset and then make the consumer "seek" to that offfset. Is there any way in SpringCloudStreamKafka to make the consumer "seek" to a particular offset at startup ?

I have two alternate approaches in mind (a) use determine the offset translation outside the applicaiton then use kafka-consumer-groups to set the offset, or (b) do this in KafkaRebalanceListener::onPartitionAssigned ( will need to throw in some logic to ensure that does not happen during every rebalance etc )

3 replies
Is it possible for the middleware to do something after the application Consumer successfully consumes the message? It looks like Spring Cloud Stream allows middleware to provide an error handler. Is there a way to provide a success handler?
6 replies
ashish ranjan
This message was deleted
1 reply
David Riccitelli
hello, I have a processor backed by a Function. The function succeeds, but the its output is too large, yielding org.apache.kafka.common.errors.RecordTooLargeException: The message is 1292103 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
from the business perspective, it's fine for the message not to be delivered (I can't change the max size)
however, the Function stops working after the above error
future calls yield: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
is there a way to recover from this error?
2 replies
David Riccitelli
Would something like this help?
    Function<Flux<PojoIn>, Flux<PojoOut>> myProcessor(MyService myService {

        return flux -> flux.concatMap(myService::doSomething)
                .onErrorResume(t -> {
                    if (log.isErrorEnabled()) log.error("An error occurred: {}", t.getMessage(), t);
                    return Mono.empty();
David Riccitelli
nope it doesn't
the problem is that it looks that the function is then disconnected from the input topic, Failure was detected during execution of the reactive function

maybe this is relevant, is there a way to catch this?

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@6af5820a]; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1348377 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration., failedMessage=GenericMessage [payload=byte[1348200], headers={contentType=application/json, id=42341146-3252-2754-9d39-2d45572cfafa, timestamp=1624377510439}]

David Riccitelli
is @garyrussell around? 😇
Daniel Hall
Does anyone know off hand when support for compression was added for spring-cloud-stream-binder-kafka? I'm seeing documentation for it in 3.0.8 but not 3.0.7. Was 3.0.8 when it was added?
3 replies

I have a stream application with 2 consumers, is there a way to configure one of the consumer "auto.offset.reset" to be the "earliest" and another one to be the "latest"? I'm using functional style binding.

Currently I configur "auto.offset.reset" as the following:

    definition: foo
    brokers: localhost:9092
    autoCreateTopics: false
      auto.offset.reset: earliest

However, this config affects all consumer instead of the one that I wanted in the binding function foo.

Can anyone please help? Thanks a lot.

27 replies

Hello, to use some of the new features in kafka streams 2.4 (specifically KIP-307: Allow to define custom processor names with KStreams DSL), I changed the 'org.springframework.boot' version to 2.3.0.M2 and everything seems to work fine. I would like to name the source and sink processors so I don't have names like 'KSTREAM-SOURCE-0000000002' and 'KSTREAM-SINK-0000000013' but the kafka streams binder doesn't support it and it won't be available until Hoxton.SR5. Is there anyway I can name the source and sink topics so I don't get these autogenerated names?

I'm looking for the same configuration options. Is it possible to configure the function bindings so that we can actuallly name the source/sink processors?

8 replies
Moid Mohammad
@sobychacko: Hello, I am getting this ClassCastException while working with Spring cloud stream
stack trace:
java.lang.ClassCastException: class com.sun.proxy.$Proxy83 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy83 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:91) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:202) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.8.jar:5.3.8]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
1 reply
Ankita Luthra
Hi, Can someone point to example or sample code which is using custom/single dlq for multiple input output binders. In our use case we have multiple source and sink(ibm mq, google pubsub and kafka), if some exception occurs during processing or deserialisation, after configured retries the error message for all binders should go to a single kafka topic (dlq), not to dlq of source binder. I am using functional style of spring cloud streams.
5 replies
Roussi Abdelghani
Hi, Is there an example of producing a message (received from an endpoint) using the functional style ?
6 replies
Jeon ChangMin
Hi, Is there any reason the hdfs sink was deprecated in pre-packaged applications from https://dataflow.spring.io/docs/applications/pre-packaged/.
5 replies
can someone help?
I used AvroSchemaRegistryClientMessageConverter (without confluent)
with content-type: application/*+avro
consumer: batch-mode: true
but AvroSchemaRegistryClientMessageConverter not applied for payload with ArrayList of byte[]
does it exist any default mechanism for using Avro (without confluent) + batch mode ?
1 reply
Roussi Abdelghani

Hi all,
I have multiple consumers that consumes from the same topic based on a particular header event_type. I've tried many configs but I didn't get it to work. I'm trying to not duplicating the binding configs since they share the same one

this is the configuration I'm using :

        definition: consumerA;consumerB;consumerC
          destination: same_topic
          group: same_group
            ackEachRecord: true
            autoCommitOffset: true
            autoCommitOnError: false
          brokers: localhost
          defaultBrokerPort: 9092
            auto.offset.reset: earliest
          headerMapperBeanName: kafkaHeaderMapper

with custom KAfkaHeaderMap :

public class KafkaHeaderMapperConfig {

    @Bean(name = "kafkaHeaderMapper")
    public KafkaHeaderMapper customHeaderMapper(){
        DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
        Map<String, Boolean> conf = new HashMap<>();
        conf.put("event_type", true);
        return mapper;


And this is my custom MessageRoutingCallback (which doesn't trigger once a message is pushed to the topic) :

public class CustomMessageRoutingCallback implements MessageRoutingCallback {

    public static final String EVENT_TYPE = "event_type";

    public String functionDefinition(Message<?> message) {
        return Optional.of(message.getHeaders())
            .filter(headers -> headers.containsKey(EVENT_TYPE))
            .map(messageHeaders -> messageHeaders.get(EVENT_TYPE))
            .map(eventType -> EventTypeToBinding.valueOf((String)eventType))
            .orElseThrow(() -> new IllegalStateException("event_type was not recognized !! supported values are " + Arrays.toString(EventTypeToBinding.values())));
public enum EventTypeToBinding {

    private final String binding;

And finally the declaration of my beans :

 * Configuration class for all bindings used in the application
public class MessageConfiguration {

     * {@link MessageRoutingCallback} for mapping to the appropriate
     * binding by checking the {@link CustomMessageRoutingCallback#EVENT_TYPE}
     * @return
    public MessageRoutingCallback messageRoutingCallback() {
        return new CustomMessageRoutingCallback();

    public Consumer<A> consumerA() {
        return a -> doSomething();

    public Consumer<B> consumerB() {
        return b -> doSomething();

    public Consumer<C> consumerC() {
        return c -> doSomething();

I got confused by configuration properties, for example should I use spring.cloud.stream.bindings.<channelName>. or spring.cloud.stream.kafka.bindings.<channelName>. !

what am I missing ?

31 replies

Hello guys :)

I added spring-cloud-starter-config, spring-cloud-starter-bootstrap to encrypt the key like this.

management.metrics.export.datadog.apiKey: '{cipher}2dc3840...'

But after adding the dependency and applying the {cipher} to my properties
spring.cloud.stream.kafka.streams.binder.functions.{input}.application-id property is not bound to KafkaStreamsConsumerProperties.

application-id property binding works perfectly if I don't use encryption supported by spring-cloud-starter-config, spring-cloud-starter-bootstrap dependency.

Any help would be appreciated.

4 replies

Hello all,

I am using a spring cloud reactive consumer for receiving messages from rabbitmq using the following function (just a sample)

    public Consumer<Flux<Message<SampleMessageDto>>> consumer() {
        return payload -> payload.map(Message::getPayload).log().subscribe();

But the problem is if I am sending any data which does not match the SampleMessageDto, I am still getting the data as byte[] in the payload. How can I filter these messages out

1 reply
Jeon ChangMin
Hi, I have experienced "Connection Reset" issues in cdc-source-debezium.
So I posted this issues in debezium project.
They told me that the issues was resolved in latest version of debezium.
Currently, cdc-source-supplier depends on version of "1.3.1 Final" which is released in 2020-12.
But latest version is "1.6.0 Final".
Is there any plan to upgrade this version ?
2 replies

Is there any way to get a reference to the Serdes that SCS creates? we generally use String keys and Avro values but we have an aggregation that needs a complex Avro key... for our Materialized.as we need to specify the serde - so I was wondering if we can just inject it as a bean. I tried to add it as a param to my function bean but got an exception:

Parameter 1 of method processRevenue in com.axispoint.rytebox.revenue.ingestion.RevenueIngestionApplication required a single bean, but 2 were found:

- messageConverterDelegateSerde: defined by method 'messageConverterDelegateSerde' in class path resource [org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.class]
- compositeNonNativeSerde: defined by method 'compositeNonNativeSerde' in class path resource [org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.class]
3 replies
Igor Rudenko
Hi there. I've got a problem, I need to pause consumer if s3 bucket contains any file.
So I use @PostContruct in my service where I check If s3bucket contain something I inject BindingsEndpoint and setState to Pause (I've log with this file and it present), but in real when I start application method with @StreamListener starting consume as if I don't pause anything
4 replies
Hi there, I have few topics in kafka and I read from them data. If spring cannot handle message and throw exception so spring record to dlqName topic, because spring.cloud.stream.kafka.bindings.input.consumer.enable-dlq=true, I need to record message in the same topic from which the message was read. I want to do it with spring expression, for example which doesn’t work spring.cloud.stream.kafka.bindings.input.consumer.dlqName=${spring.cloud.stream.bindings.input.destination}.
Can I do it with spring expression?
Keir Bailey

Hello! Thanks in advance for the help, it's great that there is so much support for SCS!

My query is around DLQs. I am using the functional style Spring Cloud Stream w/ Kafka Streams Binder. I am consuming data from Kafka as a String (where the string is valid JSON). Once the steam is consumed I .mapValues(jsonString -> convertToAvroSpecificRecord). During the conversion from String to Avro I validate the data and ensure it fits the schema.

Is there a way that I can utilise the spring cloud DLQ infrastructure to send stream events to the DLQ topic defined in the application.yml configuration? Currently what I'm doing is returning an KStream<Key, Either<ValidationFailure<String>, A>> where A is the desired SpecificRecord object. After that I'm branching the Either stream and manually sending the ValidationFailure<String> records to a manually configured DLQ topic.

Any help on this would be great! Would love to clean up this code! Thanks


Is it possible in Spring Cloud Streams, with kafka-binder, to use batch and get access to the MessageHeaders.
I'm trying to do something like

spring.cloud.stream.bindings.<input>.consumer.batch-mode: true

public Consumer<List<Message<String>>> process {
// do something with headers...

This is throwing -

ERROR LoggingHandler org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@71f46378]; nested exception is java.lang.ClassCastException: class org.springframework.messaging.support.GenericMessage cannot be cast to class java.util.List (org.springframework.messaging.support.GenericMessage is in unnamed module of loader 'app'; java.util.List is in module java.base of loader 'bootstrap'), failedMessage=GenericMessage ...

Thanks in advance!

3 replies

Hellow, using latest version of spring cloud stream (3.1.3) the content type referenced in the Message headers is not honoured on the output wire.

I am using functional programmation model with the following method signature

    public Message<?> apply(Message<?> originalMessage) {
        return MessageBuilder.fromMessage(originalMessage)
                .setHeader(MessageHeaders.CONTENT_TYPE, someCondition ?  MimeTypeUtils.APPLICATION_XML_VALUE : "application/edifact")

The message payload received is of type byte[] and the output message payload is also of type byte[].

The contentType header is overridden to application/json which is not expected.

1 reply
Jeon ChangMin
Hello, does jdbc sink application support "UPDATE" statement ?
3 replies


Is there a way to requeue the message instead of sending to dlq in Kafka while using reactive stream. For eg: in the below case if the db is down, I would like to requeue the message. Is that possible?

public Consumer<Flux<Message<SampleDto>>> consumer() {
// db operation.

Taulant Mehmeti
Hi, is there any resource or example of a cloud stream Kafka producer that produces only once, when called.
Hi All
New to Cloud Stream but know integration quite well. My issue is simple: Returning list of messages from a Supplier function that's a PollableBean is supposed to split into multiple messages according to docs, but it always ends up as 1 message with an array of Messages in the payload in Rabbit.
private SftpFolderScannerController sftpFolderScannerController;
    @PollableBean(splittable = true)
    public Supplier<List<Message<String>>> filenames() {
        return () ->  sftpFolderScannerController.scanAll();
2 replies
I have tried returning a list of Strings as well with same result. Ends up as array in Rabbit instead of multiple messages
Bill Bauernschmidt
Hello, I'm evaluating the use of Spring Cloud Stream and SCDF to basically bridge different messaging systems. Examples, receive any message posted to an external RabbitMQ, transform them and publish them to a Kafka topic, IBM MQ or an AWS SNS. I was very surprised to see that there was no prebuilt sinks for any of those that I could find. Am I not looking in the right place or would those need to be custom written? We are also looking into Apache NiFi and it does provide all of those but we are a Spring shop and the preference would be to go with the Spring solution if possible. Thanks for any guidance!
3 replies
Marcus Lyth

Hello, is it possible to return a list that will publish each event as separate messages?

I tried returning a list of Message<?> and expected that to work but it still published the event as a serialized list of Message<?>, do I have to use batch consumer to do batch producing?

8 replies
Knut Schleßelmann
Hi! If I have a simple Processor like Function<Flux<In>, Flux<Out>>, when does SCSt acknowledge the offset if I use the kafka binder? Currently we experience lost messages and try to find the cause.
22 replies
How can I disable exchange/queue autocreation with RabbitMQ configuration. I want my application to throw error if exchange/queue not found on broker. With current spring cloud stream configuration not found exchange/queue gets auto created
6 replies

Hi there, I have a quick question. Let's say I have the following binding to materialize stream into a global ktable.
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store-1

Can I use interactive query for this "incoming-store-1" from a different binding later on?

2 replies

Sorry another silly question. I have the following code where I use to pause the consumer based on some timing information from the payload. I have the binding to take the Message which contains the consumer, topic, partition, acknowledge as well as payload. I then check the payload timing info and either pause the consumer or process the payload.

public java.util.function.Consumer<Message<Mock>> processor() {
        return message -> {
            MessageHeaders headers = message.getHeaders();
            Mock payload = message.getPayload();
            Consumer<?, ?> consumer = headers.get(KafkaHeaders.CONSUMER, Consumer.class);
            String topic = headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
            Integer partitionId = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
            Acknowledgment acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

My question is that I would need to join on a ktable to update some information from the payload. However since I don't have the KStream from this binding I cannot do the leftJoin directly with the payload. I wonder if I can somehow do the leftJoin inside this processor binding or I would need to produce the payload to another topic and then create a new topology to do the leftJoin there?

2 replies
Hi, I have a simple consumer using spring cloud streams binder and when scaling up and increasing the number of threads, few messages are getting processed by two consumers of the same consumer group. We have 64 partitions and 16 instances of the app with 4 threads each. The configuration that we have is below: Any idea what setting could be going wrong
    name: my-consumer
    active: local
    bootstrap-servers: localhost:9092
        definition: process
          destination: myTopic
            brokers: localhost:9092
                  ms: 1000
                  threads: 4
            deserializationExceptionHandler: sendToDlq
4 replies

Hi, I upgraded spring-boot to the latest 2.5.4 and I noticed some weird behaviour. I have a bunch of kafka stream integration test using embeddedKafka. Some tests will fail on the second run after the version upgrade. It seems that the state doesn't get cleaned up after the first run or before the second run.

I didn't change anything in the code and I have @DirtiesContext at the test class level. Does anyone else facing the same issue?

NOTE: after manually cleaning up the /tmp folder on my local, the test will pass again, but still fail on the second run.

9 replies