@garyrussell I have a question regarding the Acknowledge.nack() lifecycle. In the document it says nack will negatively acknowledge the current record and while it's sleeping it will discard remaining polls and redelivery the current record after it wakes up.

My question is that while nack is sleeping, say 30 seconds, I restart the application. Would the consumer thread sleep get interrupted and then immediately reseek the partition and redelivery the current record?

I checked the source code and it looks like the doSeeks is invoked before Thread.sleep if this is the case I guess it's ok to interrupt the thread sleep since the record has already been redelivered to the front of the message queue. Please correct me if I'm wrong thanks!

@olegz - how can I use Spring Cloud Stream in functional way to create a single exchange with multiple queues and route the messages dynamically at runtime to different queues based on header (or any other attribute), tried with StreamBridge but could not produce end-to-end working example
. Application will contain both producer and consumer. Any working example will be of great help
Is anyone aware of the compatibility issue with the latest spring-cloud version?

I upgraded my app from 2020.0.3 to 2020.0.4 and one of the binding function is throwing me NPE.

Caused by: java.lang.NullPointerException
    at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.<init>(KafkaStreamsBindableProxyFactory.java:83)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211)
Strange thing is that spring-cloud is trying to bind one of the functions as stream application but it isn't which results in NPE...

Luis Copetti

Hey all, I am currently trying to write a functional style binder using batchMode but I am getting the error below. If I set batchMode to false and remove the List<> parameters my sample project works. Is there an additional setting I should use to be able to work with a List of Message<?> ?

batchMode: true
public Consumer<List<Message<?>>> handler()

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
K solo

Hi Everyone, I’m collecting a series of metrics for a kafka streams application, the application is a spring boot. application using the micrometer-core library. The issue I have is I’d like a consolidated value for the meters of a specific name. To make this a little clearer, these metrics are presented as an array of n items with n being the number of threads configured for the Kafka Streams application. I must add that I can consolidate the values by adding the sum of the ‘FunctionCounter’ totals. I however don’t have a mechanism to trigger the ‘aggregator method’ when the meters I’m interested in are updated. The aggregator method is copied below:

private Double aggregateValues(String idName){
    return meterRegistry.getMeters().stream()
        .filter(meter -> meter.getId().getName().startsWith(idName))

I created a configuration bean to attempt the same, no joy.

public class Metrics {

    public FunctionCounter getAggregateCounter(MeterRegistry registry) {
        List<FunctionCounter> counters = registry.getMeters().stream().
            filter(meter -> meter.getId().getName().
            .filter( FunctionCounter.class::isInstance )

        FunctionCounter counter = FunctionCounter
                .builder("Combined_Output_Message_Count", counters, state -> state.stream().mapToDouble(FunctionCounter::count).sum())
                .description("a description of what this counter does")
                .tags("region", "test")
        return counter;

An example of the raw data output from actuator/prometheus endpoint is listed below

# HELP kafka_stream_thread_task_created_total The total number of newly created tasks
# TYPE kafka_stream_thread_task_created_total counter
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-4",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-3",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-2",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-1",} 5.0

The end goal is to have a meter that consolidates the totals, in the case of the raw data shown above, 20
There has been talk of this work being done in a proposal but it hasn’t progressed beyond the initial proposal stage
The proposal can be viewed at. The following url :

This message was deleted
Andreas Evers

Hi folks. I am running into a classcast exception when using the Kafka binder together with the Kafka Streams binder, and I'm not using the spring-cloud-stream-test-support dependency (this is happening while running the app normally):

class jdk.proxy2.$Proxy134 cannot be cast to class org.springframework.messaging.MessageChannel (jdk.proxy2.$Proxy134 is in module jdk.proxy2 of loader 'app'; org.springframework.messaging.MessageChannel is in unnamed module of loader 'app')

I've got a regular Consumer Bean consuming a certain KTable, but I also have a REST endpoint that can post messages to another topic. For that I am using the StreamBridge. Wiring it all up gave me the classcast exception.

4 replies
Salman khandu
I want to do performance testing of the spring cloud stream with rabbitmq binder. is there any way to do with spring cloud stream?
Hello, I have an application using multi binder rabbitmq and kafka, I'm reading a message from rabbitmq queue and constructing new message with a new payload and set some headers and sending it to a different queus and a kafka topic, the partner consuming from the kafka topic is complaining about the content of headers amqp_*, spring_json_headers_types, source_data .... any reason why this headers related to rabbitmq/spring are sent to the topic kafka, is it the expected behaviour? anything I can do to send the minimal required headers? I'm using the ol annotation style @StremaListener and injecting multiple @Output channels with spring cloud stream version Hoxton.SR8
Salman khandu
I have posted an issue on stack overflow regarding concurrency not working with Artemis binder https://stackoverflow.com/questions/69753400/spring-cloud-stream-artemis-binder-concurrency-not-working can anyone please help?

I have this stream binding function as the following:

public BiFunction<KStream<String, String>, KTable<String, String>, KStream<String, String>> process() {

As you see the second input param I chose to materialize to ktable. My question is that is it possible to set the first input binding consumer to use latest for auto.offset.reset and the second input binding which is the ktable consumer to use earliest?

I have a question regarding the num.stream.threads setting for the stream app. Let's say that I have a stream app consuming from 2 input topics and I have 1 partition per topic and also have num.stream.threads set to 1. In this case do I get this single stream thread assigned to the partition for both topics?
Andras Hatvani
Hi, when can the integration of kafka 3.0.0 with Apple Silicon support be expected?
I have SCS app that works with Kafka Streams and also should be able to consume messages from RabbitMQ (Spring Boot 2.5.5 and Spring Cloud 2020.0.3).
These are my dependencies:

    implementation 'org.apache.kafka:kafka-streams'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
    implementation 'org.springframework.kafka:spring-kafka'
        implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'

This is my code:

    public BiConsumer<KStream<String, Event1>, KTable<String, Event2>> processEvents() {
        // this one is working if I remove spring-cloud-stream-binder-rabbit from dependencies and configuration
    public Consumer<Event1> gpsEvents() {
        // this one should consume events from Rabbit
        return System.out::println;

This is my application.yaml:

    addresses: amqps://rabbitmq-url
        definition: processEvents;gpsEvents
          binder: kafka
          destination: kafka-topic1
          binder: kafka
          destination: kafka-topic2
          binder: rabbit
          destination: rabbit-queue

When I run this I'm getting
No bean named 'kafka-KafkaStreamsBinderConfigurationProperties' available
If I set default-binder: kafka and remove binder: kafka from processEvents configuration I'm getting
java.lang.ClassCastException: class com.sun.proxy.$Proxy110 cannot be cast to class org.springframework.messaging.MessageChannel

Any advice on how to make friends kafka-streams and rabbit binders? Thanks in advance

Hi! I'm trying to implement a delayed retry queue with Kafka + latest Spring Cloud Stream. The basic idea is: create a function which receives a message, waits until a timestamp which is contained in the message header, then forward it. I have two questions:

1) How do I avoid Kafka consumer timeout/rebalance?
2) How do I skip the deserialization/serialization configuration so I can just "shovel" the content regardless it's type?

Thanks in advance!

Hello Team, I'm looking to merge two Kafka topics into one using Spring Cloud Kafka Stream. Both the input topics contain similar entities - very similar to the example provided at https://youtu.be/5NoU7D4OGA0?t=167 Could you please advise how to achieve this using Spring Cloud Kafka Stream ? Please help redirect me if there is an example available already ?
Is there a way to get the source topic name from the Message object in a functional Kafka consumer/processor?
Quick question. I currently use logAndContinue to handle deserialization error however I wanted to log more information. Is it possible to write a custom error handler and assign it to deserializationExceptionHandler?
Salman khandu
Has anyone used sqs binder ? I am facing issue like the consumer application didn't get the model object in the spring function Consumer<T> function(). I have raised an issue idealo/spring-cloud-stream-binder-sqs#37.
Hello Team, I might be missing something basic with deserialization configuration, could you please advise ?

Application 1
public Function<KStream<String, TaskRequest>, KStream<String, TaskRequest>> process1() {

Header _ TypeId _ is set to value “com.xxx.model.TaskRequest” for the data sent to the output topic

Application 2
public Function<KStream<String, ObjectNode>, KStream<String, ObjectNode>> process2() {

Is it possible to receive the data from output topic of Application 1 as generic Jackson ObjectNode ? Doing so fails with below deserialization error :

"class":"org.springframework.messaging.converter.MessageConversionException","msg":"failed to resolve class name. Class not found [com.xxx.model.TaskRequest]; nested exception is java.lang.ClassNotFoundException: com.xxx.model.TaskRequest","stack":["org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)

Sherry Ummen

Hello, we are getting an error when using cloud stream + kafka binder. And it happens only for 1 specific Bean. Not sure why.

The error stack is :

021-11-25 11:48:44.777 ERROR 74215 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : java.lang.NoClassDefFoundError: kotlinx/coroutines/Dispatchers
    at org.springframework.cloud.function.context.config.CoroutinesUtils.invokeSuspendingSupplier(CoroutinesUtils.kt:103)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.invoke(KotlinLambdaToFunctionAutoConfiguration.java:163)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.apply(KotlinLambdaToFunctionAutoConfiguration.java:127)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.get(KotlinLambdaToFunctionAutoConfiguration.java:179)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:657)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:506)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.get(SimpleFunctionRegistry.java:517)
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.get(PartitionAwareFunctionWrapper.java:83)
    at org.springframework.integration.dsl.IntegrationFlows$1.doReceive(IntegrationFlows.java:174)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:444)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Our gradle dep looks something like this

plugins {
    id("org.springframework.boot") version "2.5.6"
    id("io.spring.dependency-management") version "1.0.11.RELEASE"
    id("com.github.davidmc24.gradle.plugin.avro") version "1.2.1"
    kotlin("jvm") version "1.5.31"
    kotlin("plugin.spring") version "1.5.31"
    kotlin("plugin.jpa") version "1.5.31"
    kotlin("plugin.allopen") version "1.5.31"

version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11

repositories {
    maven(url = "https://packages.confluent.io/maven/")

extra["springCloudVersion"] = "2020.0.4"
extra["testcontainersVersion"] = "1.16.0"

not able to figure out why it fails

Salman khandu
I have used spring cloud stream with a rabbit binder. I want to send a manual acknowledgment from a consumer. How can I do with the spring cloud function?
    public Consumer<Model> consumer() {
        return model -> {
            // send manully ack
Miguel González
hello there. can you tell me how or point me to a class on how stopping/starting a Kafka Streams is implemented. I'm referring to this https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.5/reference/html/spring-cloud-stream-binder-kafka.html#_binding_visualization_and_control_in_kafka_streams_binder. I have just tried calling the actuator endpoint. I'm assuming it's calling kafkastreams.close() and kafkastreams.start() is that correct? I would like to implement something similar for the regular kafka streams library...
Aleydin Karaimin

Hello, I am pretty new to Spring Cloud Stream and I am having trouble trying to migrate my tests to use the APIs provided by the new spring-cloud-stream test-jar library and getting rid of the old one - spring-cloud-stream-test-support.

My setup includes Kafka stream binder and Apache Avro key and value serializer/deserializer for Pojos.
The MessageCollector (old library) was able, somehow, to return Object payload that I can cast to the Pojo class and easily compare my expected object with the received one. I am not pretty sure whether it capture the message argument before the Serializer is activated or the message is being serialized and deserialized back... but It works for me (for the purpose of the test).

Now, I should use the OutputDestination API which gives me Message<byte[]> payload which I cannot compare easily in a readable way. Is there a way somehow to inject a Deserializer or enable it automatically (like my service code does) and receive and object payload.

Bipin Yadav
Hi @Team
getting exception whenn message coming to listener fron gcp pubsub
before reaching to application code, its giving exception

“Sending Spring message [3465236180236956] failed; message nacked automatically.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binding.StreamListenerMessageHandler@23504729]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
at [Source: (byte[])“{”traversalPathPayload”:{“id”:“d8bb2614-3dd8-4146-8e6b-af8952d648ce”,“available”:true,“originNodeId”:“773727cb-0405-4b7a-8a44-99b66ec8998a”,“destinationNodeId”:“f9ad5260-aceb-4b97-8d40-7c15c8ac3bf4”,“fulfillmentChannel”:“FBF”,“fulfillmentMode”:“DIRECTSHIPPING”,“originNodeType”:“WAREHOUSE”,“destinationNodeType”:“ZONE”,“pickupType”:“SCHEDULED_PICKUP”,“lastmileLegType”:“HOME_DELIVERY”,“createdAt”:“2021-11-29T13:35:29.281071Z”,“updatedAt”:“2021-11-29T13:35:29.281083Z”},“triggeredBy”:“COMPUTE_ITINERA”[truncated 137 bytes]; line: 1, column: 25] (through reference chain: java.util.LinkedHashMap[“traversalPathPayload”])
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:79)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:130)
at com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:178)
at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:396)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
at [Source: (byte[])“{”traversalPathPayload”:{“id”:“d8bb2614-3dd8-4146-8e6b-af8952d648ce”,“available”:true,“originNodeId”:“773727cb-0405-4b7a-8a44-99b66ec8998a”,“destinationNodeId”:“f9ad5260-aceb-4b97-8d40-
Hi Team
Can any one please help me resolve:

I'm facing an issue with the HTTP source app in a spring cloud data flow stream.

org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
    at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:98) ~[spring-core-5.3.10.jar:5.3.10]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
    |_ checkpoint ? org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.cloud.sleuth.instrument.web.TraceWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? HTTP POST "/" [ExceptionHandlingWebHandler]

This happens when I try to post an HTTP request with huge body size to the HTTP endpoint.

I have tried setting the following property in the deployment which didn't help me:

app.http.spring.codec.max-in-memory-size: 10MB

spring cloud starter HTTP Kafka source app version used is 3.1.1

Does anyone have a clue about how to fix this?

Jose Badeau

Hello. I am trying to use the confluent registry with spring cloud kafka stream binder:
my config looks like:

    name: "jira-metrics"
        contentType: "application/json"
        definition: wasteProcessor
            destination: ${KAFKA_TOPIC_JIRA_ISSUES_SOURCE}
            destination: ${KAFKA_TOPIC_JIRA_ISSUES_WASTE}
            applicationId: ${spring.application.name}
            brokers: ${KAFKA_BOOTSTRAP_SERVER}
              schema.registry.url: localhost:8080/api
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.springframework.kafka.support.serializer.JsonSerde

I am getting error:

Exception in thread "jira-metrics-48381cfa-0217-4af3-812a-dcf8456b8918-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:82)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:957)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1009)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:907)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.util.Assert.state(Assert.java:76)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:532)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    ... 9 more

My processor looks like:

    public Function<KStream<JiraKey, JiraIssue>, KStream<String, Waste>> wasteProcessor() {
        return input -> input
                .map((key, value) -> new KeyValue<>(key.getKey(), issueParser.parse(value)))
                .filter((key, value) -> value.getResolution() != null && (value.getResolution().getName().equals(RESOLUTION_CANCELLED)))
                .map((key, value) -> new KeyValue<String, Waste>(key, new Waste(MEASUREMENT, key, value.getStatus().getName(), value.getResolution().getName(), value.getUpdateDate().getMillis())));

Any ideas? Is this a supported setup or is only Avro supported?

Piotr Mińkowski
Hi. I see that you are referencing to my article on the site https://spring.io/projects/spring-cloud-stream. I just wanted to say that the links between my article and Anshul Mishra's article are replaced, so the link with article forwards to the Anshul article, and link with his article forwards to my article :) Maybe you can fix that? I would create a PR by myself, but I don't know if the main Spring Cloud Stream site (https://spring.io/projects/spring-cloud-stream) is configured in Git repository
Hi every one
Please check, i can't access to link
Hi everyone!
Probably is a basic question (I'm new with Spring Cloud Stream), it is just a use case, is there any reason for having a 500 response error when we send a post request with a null body?
Do you have any ideas if we need to make an initial post request with null body? (Or do I need to build a custom http source?)

Hi all,
I am trying to build a kafka strems app (spring boot based) using spring-cloud-stream Functional approach (Kotlin).
I had a major issue when running the the server, it failed to bind the producer and the consumer with the following message:
'o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds

java.lang.ClassCastException: class com.sun.proxy.$Proxy72 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy72 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')'

I then took out all the domain logic and dependencies and left only these:

extra["springCloudVersion"] = "2020.0.4"

dependencies {

but the issue persists.
My bean function looks like this
@Bean fun process(): Function<KStream<String, String>, KStream<String, String>> { return Function<KStream<String, String>, KStream<String, String>> {
Do you know the source of this class cast exception?

Hi! Can someone tell me which (lower level?) class could I use as a pointcut to capture any inbound or outbound Kafka transmissions using Spring Cloud Stream (function based plus StreamBridge)? I'd like to put MDC context variables back and forth based on Kafka Headers.

Hi everyone, this is my first post here, so please have some patience :). I'm trying to deploy a Spring Cloud Stream application on Kubernetes, application which starts on my machine. When deployed in a Docker Container, I receive the following error message and the app fails to start:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; 
nested exception is java.lang.NoClassDefFoundError: sun.misc.Unsafe
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.AbstractBeanFactory$$Lambda$268/00000000A1693790.getObject(Unknown Source)

Java version is 11, with Spring Boot 2.6.1 and Spring Cloud 2021.0.0

Any help would be greatly appreciated.

Sumanth Chinthagunta

Hi am using the latest stack Java version is 17, with Spring Boot 2.6.1 and Spring Cloud 2021.0.0

package micro.apps.service

import mu.KotlinLogging
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.*
import java.util.function.Consumer
import java.util.function.Function
import java.util.function.Supplier

private val logger = KotlinLogging.logger {}

class DemoApplication

fun main(args: Array<String>) {

data class MyModel(
    var name: String? = null,
    var city: String? = null,
    var state: String? = null

class KafkaConfiguration {

    fun generate(): Supplier<MyModel> = Supplier {
        MyModel(UUID.randomUUID().toString(), "Paradise", "CA")

    fun city(): Function<KStream<String, MyModel>, KStream<String, String>> = Function {
        it.mapValues { v -> v.city }

    fun state(): Function<KStream<String, MyModel>, KStream<String, String>> = Function {
        it.mapValues { v -> v.state }

    fun print(): Consumer<KStream<String, String>> = Consumer {
        it.peek { k, v -> logger.info("Received: {}, {}", k, v) }
        generate-out-0.destination: all-in-topic
        city-in-0.destination: all-in-topic
        city-out-0.destination: city-out-topic
        state-in-0.destination: all-in-topic
        state-out-0.destination: state-out-topic
        print-in-0.destination: city-out-topic,state-out-topic
        definition: generate;city;state;print
            brokers: localhost:9092

Geeting error

[2021-12-23 11:36:28.901] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Retrieving cached binder: kstream
[2021-12-23 11:36:28.901] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Creating binder: globalktable
[2021-12-23 11:36:28.919] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Caching the binder: globalktable
[2021-12-23 11:36:28.919] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Retrieving cached binder: globalktable
[2021-12-23 11:36:28.919] - 52714 WARNING [restartedMain] --- org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext: Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.endpoint.EventDrivenConsumer: Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.channel.PublishSubscribeChannel: Channel 'application.errorChannel' has 0 subscriber(s).
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.endpoint.EventDrivenConsumer: stopped bean '_org.springframework.integration.errorLogger'
If I comment out generate function, it starts fine.
    fun generate(): Supplier<MyModel> = Supplier {
        MyModel(UUID.randomUUID().toString(), "Paradise", "CA")
Sumanth Chinthagunta
Can I use Supplier functions with spring-cloud-stream-binder-kafka-streams binder ?
Sumanth Chinthagunta
To making Supplier function also work, I have to add both binders. Is this expected or a bug?
Hi, I have a use case where I want to control message Acknowledgement. I do not see a way to do it with functional model (kafka Streams Binder), however the "Apache Kafka Binder" model seems to support it (@StreamListener). Also I am reading that the @StreamListener is being deprecated ?. I am kind of confused, please advice which approach should I take if I need manual Acknowledgement ?
Hi! Is there a Spring Cloud Stream metric which I could use for monitoring incoming and outgoing Kafka message counts in the application, or do I need to create a custom one?
Andras Hatvani
Hi, is there a way to retrieve and inject the default value serde in a kafka streams app/test?
Salman khandu
is there any way to test(JUnit) consumer functionality?
public Consumer<Model> consumer(final Handler handler) {
    return model -> {
        if (model != null) {
            // code to process model
Ivan Vercinsky

Hi Everybody!

    public Function<Flux<Message<OfferIncomingEvent>>, Flux<Message<OfferOutgoingEvent>>> offerValidatorAutomatic() {
        return this::validateOfferAutomatic;

I have that Function which is procesing events from one kafka topic to another. My question is the following:

Is there a way to set up the binding between this two topics so that spring would forward the ALL the headers automatically?

Because if i remove the Message<?> type of the Flux parameter the headers from the input topic doesnt get forwarded.
Also, in the current setting i have to manually added them when building the outgoin message.

Thank you for your attention!

Choubani Amir
Hello. I am discovering spring cloud stream project samples. I started by source-samples/dynamic-destination-source-kafka.
I get confused about this property:
spring.cloud.stream.function.definition: supplier;receive1;receive2;
Is it for spring cloud function ?
If so, I think we use pipe not semi colon, am I wrong ?
Choubani Amir

What about stream-applications project, the idea is to

be useful in the end-user applications as-is

So to test, I just need to run the jar application like in the quick start, right ?

