Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • May 27 20:20
    tszmytka opened #2410
  • May 27 17:24

    spencergibb on 3.2.x

    Update SNAPSHOT to 3.2.4 Bumps to next snapshot version (compare)

  • May 27 16:30
    artembilan commented #2409
  • May 27 16:24
    maroneal1 commented #2409
  • May 27 16:05
    maroneal1 commented #2409
  • May 27 09:41

    marcingrzejszczak on v3.2.4

    Update SNAPSHOT to 3.2.4 (compare)

  • May 27 08:35
    livk-cloud commented #2289
  • May 27 00:20
    artembilan commented #2409
  • May 26 16:12
    amseager edited #2408
  • May 26 16:05

    spencergibb on 3.2.x

    don't upload docs to maven cent… (compare)

  • May 26 15:56
    amseager opened #2408
  • May 24 17:37

    olegz on 3.2.x

    Fix Rabbit test to ensure java1… (compare)

  • May 24 16:05
    artembilan labeled #2404
  • May 24 16:05

    artembilan on main

    Fix compile error in Localstack… (compare)

  • May 24 16:05
    artembilan closed #2404
  • May 24 16:00
    onobc synchronize #2404
  • May 24 13:38

    onobc on 3.2.x

    Discard raw/wildcard serde matc… (compare)

  • May 24 13:38
    onobc closed #2405
  • May 24 04:31
    onobc review_requested #2405
  • May 24 04:31
    onobc opened #2405
Andras Hatvani
@andrashatvani
Hi, when can the integration of kafka 3.0.0 with Apple Silicon support be expected?
6 replies
aliaksandr-pleski
@aliaksandr-pleski

Hi,
I have SCS app that works with Kafka Streams and also should be able to consume messages from RabbitMQ (Spring Boot 2.5.5 and Spring Cloud 2020.0.3).
These are my dependencies:

    implementation 'org.apache.kafka:kafka-streams'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
    implementation 'org.springframework.kafka:spring-kafka'
        implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'

This is my code:

@Bean
    public BiConsumer<KStream<String, Event1>, KTable<String, Event2>> processEvents() {
        // this one is working if I remove spring-cloud-stream-binder-rabbit from dependencies and configuration
    }
@Bean
    public Consumer<Event1> gpsEvents() {
        // this one should consume events from Rabbit
        return System.out::println;
    }

This is my application.yaml:

spring:
  rabbitmq:
    addresses: amqps://rabbitmq-url
  cloud:
    stream:
      function:
        definition: processEvents;gpsEvents
      bindings:
        processEvents-in-0:
          binder: kafka
          destination: kafka-topic1
        processEvents-in-1:
          binder: kafka
          destination: kafka-topic2
        gpsEvents-in-0:
          binder: rabbit
          destination: rabbit-queue

When I run this I'm getting
No bean named 'kafka-KafkaStreamsBinderConfigurationProperties' available
If I set default-binder: kafka and remove binder: kafka from processEvents configuration I'm getting
java.lang.ClassCastException: class com.sun.proxy.$Proxy110 cannot be cast to class org.springframework.messaging.MessageChannel

Any advice on how to make friends kafka-streams and rabbit binders? Thanks in advance

16 replies
enterprisesoftwaresolutions
@enterprisesoftwaresolutions

Hi! I'm trying to implement a delayed retry queue with Kafka + latest Spring Cloud Stream. The basic idea is: create a function which receives a message, waits until a timestamp which is contained in the message header, then forward it. I have two questions:

1) How do I avoid Kafka consumer timeout/rebalance?
2) How do I skip the deserialization/serialization configuration so I can just "shovel" the content regardless it's type?

Thanks in advance!

3 replies
rvnhere
@rvnhere
Hello Team, I'm looking to merge two Kafka topics into one using Spring Cloud Kafka Stream. Both the input topics contain similar entities - very similar to the example provided at https://youtu.be/5NoU7D4OGA0?t=167 Could you please advise how to achieve this using Spring Cloud Kafka Stream ? Please help redirect me if there is an example available already ?
2 replies
enterprisesoftwaresolutions
@enterprisesoftwaresolutions
Is there a way to get the source topic name from the Message object in a functional Kafka consumer/processor?
2 replies
AttitudeL
@AttitudeL
Quick question. I currently use logAndContinue to handle deserialization error however I wanted to log more information. Is it possible to write a custom error handler and assign it to deserializationExceptionHandler?
2 replies
Salman khandu
@salman-khandu
Has anyone used sqs binder ? I am facing issue like the consumer application didn't get the model object in the spring function Consumer<T> function(). I have raised an issue idealo/spring-cloud-stream-binder-sqs#37.
3 replies
rvnhere
@rvnhere

Hello Team, I might be missing something basic with deserialization configuration, could you please advise ?

Application 1
@Bean
public Function<KStream<String, TaskRequest>, KStream<String, TaskRequest>> process1() {
}

Header _ TypeId _ is set to value “com.xxx.model.TaskRequest” for the data sent to the output topic

Application 2
@Bean
public Function<KStream<String, ObjectNode>, KStream<String, ObjectNode>> process2() {
}

Is it possible to receive the data from output topic of Application 1 as generic Jackson ObjectNode ? Doing so fails with below deserialization error :

"class":"org.springframework.messaging.converter.MessageConversionException","msg":"failed to resolve class name. Class not found [com.xxx.model.TaskRequest]; nested exception is java.lang.ClassNotFoundException: com.xxx.model.TaskRequest","stack":["org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)

2 replies
Sherry Ummen
@sherry-ummen

Hello, we are getting an error when using cloud stream + kafka binder. And it happens only for 1 specific Bean. Not sure why.

The error stack is :

021-11-25 11:48:44.777 ERROR 74215 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : java.lang.NoClassDefFoundError: kotlinx/coroutines/Dispatchers
    at org.springframework.cloud.function.context.config.CoroutinesUtils.invokeSuspendingSupplier(CoroutinesUtils.kt:103)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.invoke(KotlinLambdaToFunctionAutoConfiguration.java:163)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.apply(KotlinLambdaToFunctionAutoConfiguration.java:127)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.get(KotlinLambdaToFunctionAutoConfiguration.java:179)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:657)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:506)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.get(SimpleFunctionRegistry.java:517)
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.get(PartitionAwareFunctionWrapper.java:83)
    at org.springframework.integration.dsl.IntegrationFlows$1.doReceive(IntegrationFlows.java:174)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:444)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Our gradle dep looks something like this

plugins {
    id("org.springframework.boot") version "2.5.6"
    id("io.spring.dependency-management") version "1.0.11.RELEASE"
    id("com.github.davidmc24.gradle.plugin.avro") version "1.2.1"
    kotlin("jvm") version "1.5.31"
    kotlin("plugin.spring") version "1.5.31"
    kotlin("plugin.jpa") version "1.5.31"
    kotlin("plugin.allopen") version "1.5.31"
}

version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11

repositories {
    mavenCentral()
    maven(url = "https://packages.confluent.io/maven/")
}

extra["springCloudVersion"] = "2020.0.4"
extra["testcontainersVersion"] = "1.16.0"

not able to figure out why it fails

2 replies
Salman khandu
@salman-khandu
I have used spring cloud stream with a rabbit binder. I want to send a manual acknowledgment from a consumer. How can I do with the spring cloud function?
    public Consumer<Model> consumer() {
        return model -> {
            // send manully ack
        };
    }
2 replies
Miguel González
@magg
hello there. can you tell me how or point me to a class on how stopping/starting a Kafka Streams is implemented. I'm referring to this https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.5/reference/html/spring-cloud-stream-binder-kafka.html#_binding_visualization_and_control_in_kafka_streams_binder. I have just tried calling the actuator endpoint. I'm assuming it's calling kafkastreams.close() and kafkastreams.start() is that correct? I would like to implement something similar for the regular kafka streams library...
2 replies
Aleydin Karaimin
@karaimin

Hello, I am pretty new to Spring Cloud Stream and I am having trouble trying to migrate my tests to use the APIs provided by the new spring-cloud-stream test-jar library and getting rid of the old one - spring-cloud-stream-test-support.

My setup includes Kafka stream binder and Apache Avro key and value serializer/deserializer for Pojos.
The MessageCollector (old library) was able, somehow, to return Object payload that I can cast to the Pojo class and easily compare my expected object with the received one. I am not pretty sure whether it capture the message argument before the Serializer is activated or the message is being serialized and deserialized back... but It works for me (for the purpose of the test).

Now, I should use the OutputDestination API which gives me Message<byte[]> payload which I cannot compare easily in a readable way. Is there a way somehow to inject a Deserializer or enable it automatically (like my service code does) and receive and object payload.

8 replies
Bipin Yadav
@bipin-yadav
Hi @Team
getting exception whenn message coming to listener fron gcp pubsub
before reaching to application code, its giving exception

“Sending Spring message [3465236180236956] failed; message nacked automatically.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binding.StreamListenerMessageHandler@23504729]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
at [Source: (byte[])“{”traversalPathPayload”:{“id”:“d8bb2614-3dd8-4146-8e6b-af8952d648ce”,“available”:true,“originNodeId”:“773727cb-0405-4b7a-8a44-99b66ec8998a”,“destinationNodeId”:“f9ad5260-aceb-4b97-8d40-7c15c8ac3bf4”,“fulfillmentChannel”:“FBF”,“fulfillmentMode”:“DIRECTSHIPPING”,“originNodeType”:“WAREHOUSE”,“destinationNodeType”:“ZONE”,“pickupType”:“SCHEDULED_PICKUP”,“lastmileLegType”:“HOME_DELIVERY”,“createdAt”:“2021-11-29T13:35:29.281071Z”,“updatedAt”:“2021-11-29T13:35:29.281083Z”},“triggeredBy”:“COMPUTE_ITINERA”[truncated 137 bytes]; line: 1, column: 25] (through reference chain: java.util.LinkedHashMap[“traversalPathPayload”])
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:79)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:130)
at com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:178)
at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:396)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
at [Source: (byte[])“{”traversalPathPayload”:{“id”:“d8bb2614-3dd8-4146-8e6b-af8952d648ce”,“available”:true,“originNodeId”:“773727cb-0405-4b7a-8a44-99b66ec8998a”,“destinationNodeId”:“f9ad5260-aceb-4b97-8d40-
2 replies
Minase
@MinHab
Hi Team
Can any one please help me resolve:
Harish
@harishanchu

I'm facing an issue with the HTTP source app in a spring cloud data flow stream.

org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
    at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:98) ~[spring-core-5.3.10.jar:5.3.10]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
    |_ checkpoint ? org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.cloud.sleuth.instrument.web.TraceWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
    |_ checkpoint ? HTTP POST "/" [ExceptionHandlingWebHandler]

This happens when I try to post an HTTP request with huge body size to the HTTP endpoint.

I have tried setting the following property in the deployment which didn't help me:

app.http.spring.codec.max-in-memory-size: 10MB

spring cloud starter HTTP Kafka source app version used is 3.1.1

Does anyone have a clue about how to fix this?

1 reply
Jose Badeau
@jbadeau

Hello. I am trying to use the confluent registry with spring cloud kafka stream binder:
my config looks like:

spring:
  application:
    name: "jira-metrics"
  cloud:
    stream:
      default:
        contentType: "application/json"
      function:
        definition: wasteProcessor
        bindings:
          wasteProcessor-in-0:
            destination: ${KAFKA_TOPIC_JIRA_ISSUES_SOURCE}
          wasteProcessor-out-0:
            destination: ${KAFKA_TOPIC_JIRA_ISSUES_WASTE}
      kafka:
        streams:
          binder:
            applicationId: ${spring.application.name}
            brokers: ${KAFKA_BOOTSTRAP_SERVER}
            configuration:
              schema.registry.url: localhost:8080/api
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.springframework.kafka.support.serializer.JsonSerde

I am getting error:

Exception in thread "jira-metrics-48381cfa-0217-4af3-812a-dcf8456b8918-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:82)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:957)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1009)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:907)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.util.Assert.state(Assert.java:76)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:532)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    ... 9 more

My processor looks like:

    @Bean
    public Function<KStream<JiraKey, JiraIssue>, KStream<String, Waste>> wasteProcessor() {
        return input -> input
                .map((key, value) -> new KeyValue<>(key.getKey(), issueParser.parse(value)))
                .filter((key, value) -> value.getResolution() != null && (value.getResolution().getName().equals(RESOLUTION_CANCELLED)))
                .map((key, value) -> new KeyValue<String, Waste>(key, new Waste(MEASUREMENT, key, value.getStatus().getName(), value.getResolution().getName(), value.getUpdateDate().getMillis())));
    }

Any ideas? Is this a supported setup or is only Avro supported?

3 replies
Piotr Mińkowski
@piomin
Hi. I see that you are referencing to my article on the site https://spring.io/projects/spring-cloud-stream. I just wanted to say that the links between my article and Anshul Mishra's article are replaced, so the link with article forwards to the Anshul article, and link with his article forwards to my article :) Maybe you can fix that? I would create a PR by myself, but I don't know if the main Spring Cloud Stream site (https://spring.io/projects/spring-cloud-stream) is configured in Git repository
2 replies
yendauvinh9
@yendauvinh9
Hi every one
Please check, i can't access to link
Sebastian
@Sebastianhk6_twitter
Hi everyone!
Probably is a basic question (I'm new with Spring Cloud Stream), it is just a use case, is there any reason for having a 500 response error when we send a post request with a null body?
Do you have any ideas if we need to make an initial post request with null body? (Or do I need to build a custom http source?)
erezfirt
@erezfirt

Hi all,
I am trying to build a kafka strems app (spring boot based) using spring-cloud-stream Functional approach (Kotlin).
I had a major issue when running the the server, it failed to bind the producer and the consumer with the following message:
'o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds

java.lang.ClassCastException: class com.sun.proxy.$Proxy72 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy72 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')'

I then took out all the domain logic and dependencies and left only these:

extra["springCloudVersion"] = "2020.0.4"

dependencies {
    implementation("org.apache.kafka:kafka-streams")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.cloud:spring-cloud-stream")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
    implementation("org.springframework.kafka:spring-kafka")

but the issue persists.
My bean function looks like this
@Bean fun process(): Function<KStream<String, String>, KStream<String, String>> { return Function<KStream<String, String>, KStream<String, String>> {
Do you know the source of this class cast exception?

8 replies
enterprisesoftwaresolutions
@enterprisesoftwaresolutions
Hi! Can someone tell me which (lower level?) class could I use as a pointcut to capture any inbound or outbound Kafka transmissions using Spring Cloud Stream (function based plus StreamBridge)? I'd like to put MDC context variables back and forth based on Kafka Headers.
meehighd
@meehighd

Hi everyone, this is my first post here, so please have some patience :). I'm trying to deploy a Spring Cloud Stream application on Kubernetes, application which starts on my machine. When deployed in a Docker Container, I receive the following error message and the app fails to start:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; 
nested exception is java.lang.NoClassDefFoundError: sun.misc.Unsafe
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.AbstractBeanFactory$$Lambda$268/00000000A1693790.getObject(Unknown Source)

Java version is 11, with Spring Boot 2.6.1 and Spring Cloud 2021.0.0

Any help would be greatly appreciated.

2 replies
Sumanth Chinthagunta
@xmlking

Hi am using the latest stack Java version is 17, with Spring Boot 2.6.1 and Spring Cloud 2021.0.0

package micro.apps.service

import mu.KotlinLogging
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.*
import java.util.function.Consumer
import java.util.function.Function
import java.util.function.Supplier


private val logger = KotlinLogging.logger {}

@SpringBootApplication
class DemoApplication

fun main(args: Array<String>) {
    runApplication<DemoApplication>(*args)
}

data class MyModel(
    var name: String? = null,
    var city: String? = null,
    var state: String? = null
)

@Configuration
class KafkaConfiguration {

    @Bean
    fun generate(): Supplier<MyModel> = Supplier {
        MyModel(UUID.randomUUID().toString(), "Paradise", "CA")
    }

    @Bean
    fun city(): Function<KStream<String, MyModel>, KStream<String, String>> = Function {
        it.mapValues { v -> v.city }
    }


    @Bean
    fun state(): Function<KStream<String, MyModel>, KStream<String, String>> = Function {
        it.mapValues { v -> v.state }
    }


    @Bean
    fun print(): Consumer<KStream<String, String>> = Consumer {
        it.peek { k, v -> logger.info("Received: {}, {}", k, v) }
    }
}
spring:
  cloud:
    stream:
      bindings:
        generate-out-0.destination: all-in-topic
        city-in-0.destination: all-in-topic
        city-out-0.destination: city-out-topic
        state-in-0.destination: all-in-topic
        state-out-0.destination: state-out-topic
        print-in-0.destination: city-out-topic,state-out-topic
      function:
        definition: generate;city;state;print
      kafka:
        streams:
          binder:
            brokers: localhost:9092

Geeting error

[2021-12-23 11:36:28.901] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Retrieving cached binder: kstream
[2021-12-23 11:36:28.901] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Creating binder: globalktable
[2021-12-23 11:36:28.919] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Caching the binder: globalktable
[2021-12-23 11:36:28.919] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Retrieving cached binder: globalktable
[2021-12-23 11:36:28.919] - 52714 WARNING [restartedMain] --- org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext: Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.endpoint.EventDrivenConsumer: Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.channel.PublishSubscribeChannel: Channel 'application.errorChannel' has 0 subscriber(s).
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.endpoint.EventDrivenConsumer: stopped bean '_org.springframework.integration.errorLogger'
If I comment out generate function, it starts fine.
    @Bean
    fun generate(): Supplier<MyModel> = Supplier {
        MyModel(UUID.randomUUID().toString(), "Paradise", "CA")
    }
image.png
Sumanth Chinthagunta
@xmlking
Can I use Supplier functions with spring-cloud-stream-binder-kafka-streams binder ?
Sumanth Chinthagunta
@xmlking
To making Supplier function also work, I have to add both binders. Is this expected or a bug?
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
9 replies
hnazmatrix
@hnazmatrix:matrix.org
[m]
Hi, I have a use case where I want to control message Acknowledgement. I do not see a way to do it with functional model (kafka Streams Binder), however the "Apache Kafka Binder" model seems to support it (@StreamListener). Also I am reading that the @StreamListener is being deprecated ?. I am kind of confused, please advice which approach should I take if I need manual Acknowledgement ?
1 reply
enterprisesoftwaresolutions
@enterprisesoftwaresolutions
Hi! Is there a Spring Cloud Stream metric which I could use for monitoring incoming and outgoing Kafka message counts in the application, or do I need to create a custom one?
Andras Hatvani
@andrashatvani
Hi, is there a way to retrieve and inject the default value serde in a kafka streams app/test?
4 replies
Salman khandu
@salman-khandu
is there any way to test(JUnit) consumer functionality?
public Consumer<Model> consumer(final Handler handler) {
    return model -> {
        if (model != null) {
            // code to process model
        }
    };
}
3 replies
Ivan Vercinsky
@ivercinsky

Hi Everybody!

@Bean
    public Function<Flux<Message<OfferIncomingEvent>>, Flux<Message<OfferOutgoingEvent>>> offerValidatorAutomatic() {
        return this::validateOfferAutomatic;
    }

I have that Function which is procesing events from one kafka topic to another. My question is the following:

Is there a way to set up the binding between this two topics so that spring would forward the ALL the headers automatically?

Because if i remove the Message<?> type of the Flux parameter the headers from the input topic doesnt get forwarded.
Also, in the current setting i have to manually added them when building the outgoin message.

Thank you for your attention!

Choubani Amir
@amirensit
Hello. I am discovering spring cloud stream project samples. I started by source-samples/dynamic-destination-source-kafka.
I get confused about this property:
spring.cloud.stream.function.definition: supplier;receive1;receive2;
2 replies
Is it for spring cloud function ?
If so, I think we use pipe not semi colon, am I wrong ?
1 reply
Choubani Amir
@amirensit

What about stream-applications project, the idea is to

be useful in the end-user applications as-is

So to test, I just need to run the jar application like in the quick start, right ?

3 replies
Matthias
@Mnantier
Hi everybody !
I'm using kafka-streams binder and I wanted to know if it was possible to use StreamBridge inside a DeserializationExceptionHandler (as well as the ProductionExceptionHandler and UncaughtExceptionHandler) to send the raised exceptions to a DLQ. My goal is to have an almost completely fault tolerant application that sends every exceptions to a DLQ to be processed later on. I've been trying to make it work for several days without success. My problems were that I couldn't first access a StreamBridge bean from the ProductionExceptionHandler instance, then I was encountering some MessageDispatchingExceptions but maybe I'm using the wrong tools.
Thank you for your help !
2 replies
aliaksandr-pleski
@aliaksandr-pleski

Hi there,
I'm using spring boot (2.6.2) + spring cloud stream (2021.0.0).
Using both Kafka and Kafka Streams channels.
Here is my application.yaml:

  cloud:
    stream:
      function:
        definition: test;test2
      kafka:
        streams:
          binder:
            configuration:
              commit.interval.ms: 1000
              application.server: ${SERVER_HOST:localhost}:${SERVER_PORT:8080}
            replication-factor: ${KAFKA_BROKERS_REPLICATION_FACTOR:1}
            auto-add-partitions: true
        binder:
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          brokers: ${KAFKA_BROKERS:localhost:9092}
          replication-factor: ${KAFKA_BROKERS_REPLICATION_FACTOR:1}
          auto-add-partitions: true
      bindings:
        test-in-0:
          destination: test-in
        test-out-0:
          destination: test-out
          producer:
            partition-count: 2
        test2-out-0:
          binder: kafka
          destination: test2-out
          producer:
            partition-count: 2

Is there a possibility to configure retention time(ms) for the topics I use? I was not able to find anything in docs...

Big thanks in advance for any help!

3 replies
iguissouma
@iguissouma
In a multi binder(kafka, rabbit) project, can I set a default property for the partitionKeyExpression for only kafka, its failing when I set spring.cloud.stream.default.producer.partitionKeyExpression=headers['myId'] for rabbit becaus of the use of a fanout exchange. I don't want to set this property in each kafka producer channel.
2 replies
constantinpopa10
@constantinpopa10

Hi all, I'm using Spring Cloud Stream with the Kinesis binder (version 2.2.0), I'm wracking my brains trying to find a fix for the following scenario, here's the steps:

  1. Kinesis stream shards number bumped up , say from 2 to 4. A couple of short-lived Kinesis shards are created behind the scenes by AWS which have StartingSequenceNumber=EndingSequenceNumber
  2. Spring Cloud Stream service started, all good.
  3. Spring Cloud Stream service stopped.
  4. Spring Cloud Stream service started, bunch of errors started popping up: "SequenceNumber has reached max possible value for the shard."
  5. Spring Cloud Stream service stopped && I delete all records from the CheckpointStore DynamoDB table
  6. Spring Cloud Stream service started, all good.
  7. Spring Cloud Stream stop and start, the errors show up again.

Here's the complete error:

{"@timestamp":"2022-02-01T09:05:08.814Z","@version":"1","message":"Got an exception com.amazonaws.services.kinesis.model.InvalidArgumentException: Invalid SequenceNumber for ShardIteratorType AFTER_SEQUENCE_NUMBER, SequenceNumber has reached max possible value for the shard. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ea337009-4622-e862-b041-8a0d394b017a; Proxy: null) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49623586295030033695175315773036414839057851295876513794', timestamp=null, stream='gd-emeartf-recommdata-http-source-domainfactory-stream', shard='shardId-000000000000', reset=false}, state=NEW}] task invocation.\nProcess will be retried on the next iteration.","logger_name":"org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter","log_level":"INFO"}

Many thanks for your help

2 replies
blake-bauman
@blake-bauman
I was looking at the "Dynamic Compilation" section of the Spring Cloud Function reference, but I noticed that spring-cloud-function-compiler was removed quite some time ago, and the sample referenced in this doc had it removed, too. Is Dynamic Compilation still supported? If so, which module is it in?
https://docs.spring.io/spring-cloud-function/docs/3.2.1/reference/html/spring-cloud-function.html#_dynamic_compilation
Oleg Zhurakousky
@olegz
@blake-bauman no it is no longer supported. The reason being is that in 5 years you are the first person who asked a question about it. We can certainly resurrect it as an optional module though. I would suggest take one from the pervious versions, do some POM updates to make sure it depends on the current stuff and see if you can get some value out of it
1 reply
i personally don't see a reasonable production case for it, but I am also always open to have my mind changed ;)
Matthias
@Mnantier
Hi everyone ! What would be an easy way to simulate a production exception to test a custom ProductionExceptionHandler behavior ?
Thanks for your help
4 replies
xitangseng
@xitangseng
Hi everyone, does routing support Kstream conversion?
I was using routing-samples project. Instead of public Consumer<Order> orderConsumer orderConsumer() {}, I am using public Consumer<KStream<Object, Order>> orderConsumer(){
however getting below error
xitangseng
@xitangseng
ERROR 205156 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@28eb2a42]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.apache.kafka.streams.kstream.KStream` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (byte[])"{"id":null,"price":null}"; line: 1, column: 1], failedMessage=GenericMessage [payload=byte[24], headers={kafka_offset=5, event_type=ORDER_EVENT, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5f0694ae, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=functionRouter-in-0, kafka_receivedTimestamp=1644475340419, contentType=application/json, __TypeId__=[B@31090fda, kafka_groupId=anonymous.3c479c96-098e-4fd0-8f83-429a3ba25ce7}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:398)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:80)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:457)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:123)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:117)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:41)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2319)
    at org.springframework.kafka.listener.KafkaMessage