spencergibb on 3.2.x
Update SNAPSHOT to 3.2.4 Bumps to next snapshot version (compare)
marcingrzejszczak on v3.2.4
Update SNAPSHOT to 3.2.4 (compare)
spencergibb on 3.2.x
don't upload docs to maven cent… (compare)
olegz on 3.2.x
Fix Rabbit test to ensure java1… (compare)
artembilan on main
Fix compile error in Localstack… (compare)
onobc on 3.2.x
Discard raw/wildcard serde matc… (compare)
Hello Team, I might be missing something basic with deserialization configuration, could you please advise ?
Application 1
@Bean
public Function<KStream<String, TaskRequest>, KStream<String, TaskRequest>> process1() {
}
Header _ TypeId _ is set to value “com.xxx.model.TaskRequest” for the data sent to the output topic
Application 2
@Bean
public Function<KStream<String, ObjectNode>, KStream<String, ObjectNode>> process2() {
}
Is it possible to receive the data from output topic of Application 1 as generic Jackson ObjectNode ? Doing so fails with below deserialization error :
"class":"org.springframework.messaging.converter.MessageConversionException","msg":"failed to resolve class name. Class not found [com.xxx.model.TaskRequest]; nested exception is java.lang.ClassNotFoundException: com.xxx.model.TaskRequest","stack":["org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
Hello, we are getting an error when using cloud stream + kafka binder. And it happens only for 1 specific Bean. Not sure why.
The error stack is :
021-11-25 11:48:44.777 ERROR 74215 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : java.lang.NoClassDefFoundError: kotlinx/coroutines/Dispatchers
at org.springframework.cloud.function.context.config.CoroutinesUtils.invokeSuspendingSupplier(CoroutinesUtils.kt:103)
at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.invoke(KotlinLambdaToFunctionAutoConfiguration.java:163)
at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.apply(KotlinLambdaToFunctionAutoConfiguration.java:127)
at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.get(KotlinLambdaToFunctionAutoConfiguration.java:179)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:657)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:506)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.get(SimpleFunctionRegistry.java:517)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.get(PartitionAwareFunctionWrapper.java:83)
at org.springframework.integration.dsl.IntegrationFlows$1.doReceive(IntegrationFlows.java:174)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:444)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Our gradle dep looks something like this
plugins {
id("org.springframework.boot") version "2.5.6"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
id("com.github.davidmc24.gradle.plugin.avro") version "1.2.1"
kotlin("jvm") version "1.5.31"
kotlin("plugin.spring") version "1.5.31"
kotlin("plugin.jpa") version "1.5.31"
kotlin("plugin.allopen") version "1.5.31"
}
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
}
extra["springCloudVersion"] = "2020.0.4"
extra["testcontainersVersion"] = "1.16.0"
not able to figure out why it fails
Hello, I am pretty new to Spring Cloud Stream and I am having trouble trying to migrate my tests to use the APIs provided by the new spring-cloud-stream test-jar library and getting rid of the old one - spring-cloud-stream-test-support
.
My setup includes Kafka stream binder and Apache Avro key and value serializer/deserializer for Pojos.
The MessageCollector (old library) was able, somehow, to return Object payload that I can cast to the Pojo class and easily compare my expected object with the received one. I am not pretty sure whether it capture the message argument before the Serializer is activated or the message is being serialized and deserialized back... but It works for me (for the purpose of the test).
Now, I should use the OutputDestination API which gives me Message<byte[]>
payload which I cannot compare easily in a readable way. Is there a way somehow to inject a Deserializer or enable it automatically (like my service code does) and receive and object payload.
I'm facing an issue with the HTTP source app in a spring cloud data flow stream.
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:98) ~[spring-core-5.3.10.jar:5.3.10]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ? org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.cloud.sleuth.instrument.web.TraceWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
|_ checkpoint ? HTTP POST "/" [ExceptionHandlingWebHandler]
This happens when I try to post an HTTP request with huge body size to the HTTP endpoint.
I have tried setting the following property in the deployment which didn't help me:
app.http.spring.codec.max-in-memory-size: 10MB
spring cloud starter HTTP Kafka source app version used is 3.1.1
Does anyone have a clue about how to fix this?
Hello. I am trying to use the confluent registry with spring cloud kafka stream binder:
my config looks like:
spring:
application:
name: "jira-metrics"
cloud:
stream:
default:
contentType: "application/json"
function:
definition: wasteProcessor
bindings:
wasteProcessor-in-0:
destination: ${KAFKA_TOPIC_JIRA_ISSUES_SOURCE}
wasteProcessor-out-0:
destination: ${KAFKA_TOPIC_JIRA_ISSUES_WASTE}
kafka:
streams:
binder:
applicationId: ${spring.application.name}
brokers: ${KAFKA_BOOTSTRAP_SERVER}
configuration:
schema.registry.url: localhost:8080/api
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
I am getting error:
Exception in thread "jira-metrics-48381cfa-0217-4af3-812a-dcf8456b8918-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:82)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:957)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1009)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:907)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:532)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
... 9 more
My processor looks like:
@Bean
public Function<KStream<JiraKey, JiraIssue>, KStream<String, Waste>> wasteProcessor() {
return input -> input
.map((key, value) -> new KeyValue<>(key.getKey(), issueParser.parse(value)))
.filter((key, value) -> value.getResolution() != null && (value.getResolution().getName().equals(RESOLUTION_CANCELLED)))
.map((key, value) -> new KeyValue<String, Waste>(key, new Waste(MEASUREMENT, key, value.getStatus().getName(), value.getResolution().getName(), value.getUpdateDate().getMillis())));
}
Any ideas? Is this a supported setup or is only Avro supported?
Hi all,
I am trying to build a kafka strems app (spring boot based) using spring-cloud-stream Functional approach (Kotlin).
I had a major issue when running the the server, it failed to bind the producer and the consumer with the following message:
'o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds
java.lang.ClassCastException: class com.sun.proxy.$Proxy72 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy72 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')'
I then took out all the domain logic and dependencies and left only these:
extra["springCloudVersion"] = "2020.0.4"
dependencies {
implementation("org.apache.kafka:kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.cloud:spring-cloud-stream")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
implementation("org.springframework.kafka:spring-kafka")
but the issue persists.
My bean function looks like this@Bean
fun process(): Function<KStream<String, String>, KStream<String, String>> {
return Function<KStream<String, String>, KStream<String, String>> {
Do you know the source of this class cast exception?
Hi everyone, this is my first post here, so please have some patience :). I'm trying to deploy a Spring Cloud Stream application on Kubernetes, application which starts on my machine. When deployed in a Docker Container, I receive the following error message and the app fails to start:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed;
nested exception is java.lang.NoClassDefFoundError: sun.misc.Unsafe
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.AbstractBeanFactory$$Lambda$268/00000000A1693790.getObject(Unknown Source)
Java version is 11, with Spring Boot 2.6.1 and Spring Cloud 2021.0.0
Any help would be greatly appreciated.
Hi am using the latest stack Java version is 17, with Spring Boot 2.6.1 and Spring Cloud 2021.0.0
package micro.apps.service
import mu.KotlinLogging
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.*
import java.util.function.Consumer
import java.util.function.Function
import java.util.function.Supplier
private val logger = KotlinLogging.logger {}
@SpringBootApplication
class DemoApplication
fun main(args: Array<String>) {
runApplication<DemoApplication>(*args)
}
data class MyModel(
var name: String? = null,
var city: String? = null,
var state: String? = null
)
@Configuration
class KafkaConfiguration {
@Bean
fun generate(): Supplier<MyModel> = Supplier {
MyModel(UUID.randomUUID().toString(), "Paradise", "CA")
}
@Bean
fun city(): Function<KStream<String, MyModel>, KStream<String, String>> = Function {
it.mapValues { v -> v.city }
}
@Bean
fun state(): Function<KStream<String, MyModel>, KStream<String, String>> = Function {
it.mapValues { v -> v.state }
}
@Bean
fun print(): Consumer<KStream<String, String>> = Consumer {
it.peek { k, v -> logger.info("Received: {}, {}", k, v) }
}
}
spring:
cloud:
stream:
bindings:
generate-out-0.destination: all-in-topic
city-in-0.destination: all-in-topic
city-out-0.destination: city-out-topic
state-in-0.destination: all-in-topic
state-out-0.destination: state-out-topic
print-in-0.destination: city-out-topic,state-out-topic
function:
definition: generate;city;state;print
kafka:
streams:
binder:
brokers: localhost:9092
Geeting error
[2021-12-23 11:36:28.901] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Retrieving cached binder: kstream
[2021-12-23 11:36:28.901] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Creating binder: globalktable
[2021-12-23 11:36:28.919] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Caching the binder: globalktable
[2021-12-23 11:36:28.919] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Retrieving cached binder: globalktable
[2021-12-23 11:36:28.919] - 52714 WARNING [restartedMain] --- org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext: Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.endpoint.EventDrivenConsumer: Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.channel.PublishSubscribeChannel: Channel 'application.errorChannel' has 0 subscriber(s).
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.endpoint.EventDrivenConsumer: stopped bean '_org.springframework.integration.errorLogger'
generate
function, it starts fine. @Bean
fun generate(): Supplier<MyModel> = Supplier {
MyModel(UUID.randomUUID().toString(), "Paradise", "CA")
}
Hi Everybody!
@Bean
public Function<Flux<Message<OfferIncomingEvent>>, Flux<Message<OfferOutgoingEvent>>> offerValidatorAutomatic() {
return this::validateOfferAutomatic;
}
I have that Function which is procesing events from one kafka topic to another. My question is the following:
Is there a way to set up the binding between this two topics so that spring would forward the ALL the headers automatically?
Because if i remove the Message<?> type of the Flux parameter the headers from the input topic doesnt get forwarded.
Also, in the current setting i have to manually added them when building the outgoin message.
Thank you for your attention!
What about stream-applications project, the idea is to
be useful in the end-user applications as-is
So to test, I just need to run the jar application like in the quick start, right ?
Hi there,
I'm using spring boot (2.6.2) + spring cloud stream (2021.0.0).
Using both Kafka and Kafka Streams channels.
Here is my application.yaml
:
cloud:
stream:
function:
definition: test;test2
kafka:
streams:
binder:
configuration:
commit.interval.ms: 1000
application.server: ${SERVER_HOST:localhost}:${SERVER_PORT:8080}
replication-factor: ${KAFKA_BROKERS_REPLICATION_FACTOR:1}
auto-add-partitions: true
binder:
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
brokers: ${KAFKA_BROKERS:localhost:9092}
replication-factor: ${KAFKA_BROKERS_REPLICATION_FACTOR:1}
auto-add-partitions: true
bindings:
test-in-0:
destination: test-in
test-out-0:
destination: test-out
producer:
partition-count: 2
test2-out-0:
binder: kafka
destination: test2-out
producer:
partition-count: 2
Is there a possibility to configure retention time(ms) for the topics I use? I was not able to find anything in docs...
Big thanks in advance for any help!
Hi all, I'm using Spring Cloud Stream with the Kinesis binder (version 2.2.0), I'm wracking my brains trying to find a fix for the following scenario, here's the steps:
Here's the complete error:
{"@timestamp":"2022-02-01T09:05:08.814Z","@version":"1","message":"Got an exception com.amazonaws.services.kinesis.model.InvalidArgumentException: Invalid SequenceNumber for ShardIteratorType AFTER_SEQUENCE_NUMBER, SequenceNumber has reached max possible value for the shard. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ea337009-4622-e862-b041-8a0d394b017a; Proxy: null) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49623586295030033695175315773036414839057851295876513794', timestamp=null, stream='gd-emeartf-recommdata-http-source-domainfactory-stream', shard='shardId-000000000000', reset=false}, state=NEW}] task invocation.\nProcess will be retried on the next iteration.","logger_name":"org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter","log_level":"INFO"}
Many thanks for your help
spring-cloud-function-compiler
was removed quite some time ago, and the sample referenced in this doc had it removed, too. Is Dynamic Compilation still supported? If so, which module is it in?ERROR 205156 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@28eb2a42]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.apache.kafka.streams.kstream.KStream` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (byte[])"{"id":null,"price":null}"; line: 1, column: 1], failedMessage=GenericMessage [payload=byte[24], headers={kafka_offset=5, event_type=ORDER_EVENT, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5f0694ae, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=functionRouter-in-0, kafka_receivedTimestamp=1644475340419, contentType=application/json, __TypeId__=[B@31090fda, kafka_groupId=anonymous.3c479c96-098e-4fd0-8f83-429a3ba25ce7}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:398)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:80)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:457)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:123)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:117)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:41)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2319)
at org.springframework.kafka.listener.KafkaMessage
package demo;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@Slf4j
@SpringBootApplication
public class MessageRoutingApplication {
public static void main(String[] args) {
final ConfigurableApplicationContext run = SpringApplication.run(MessageRoutingApplication.class, args);
final KafkaTemplate kafkaTemplate = run.getBean(KafkaTemplate.class);
kafkaTemplate.setDefaultTopic("functionRouter-in-0");
Message<Menu> menuMessage =
MessageBuilder.withPayload(new Menu())
.setHeader("event_type", "MENU_EVENT").build();
kafkaTemplate.send(menuMessage);
System.out.println("Published menu message");
Message<Order> orderMessage =
MessageBuilder.withPayload(new Order())
.setHeader("event_type", "ORDER_EVENT").build();
kafkaTemplate.send(orderMessage);
System.out.println("Published order message");
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public MessageRoutingCallback messageRoutingCallback() {
return new CustomMessageRoutingCallback();
}
@Bean
public Consumer<KStream<Object, Order>> orderConsumer(){
return order -> log.info("Received order {}", order.toString());
}
@Bean
public Consumer<Menu> menuConsumer(){
return menu -> log.info("Received Menu {}", menu.toString());
}
@Bean
public Supplier<Message<Menu>> supply() {
System.out.println("Published menu event");
return () -> MessageBuilder.withPayload(new Menu()).setHeader("event_type", "MENU_EVENT").build();
}
}
@Data
class Order {
private String id;
private Double price;
}
@Data
class Menu {
private String id;
private String name;
}
Hi,
I need to create a test case for the Function KStream Handler:
@Bean
public Function<KStream<String, String>, KStream<String, String>[]> handleResult() {
return transactionStream -> transactionStream
.map((key, value) -> extractStatus(key, value))
.branch((key, value) -> true);
}
I created this unit test:
@Test
public void handleScpiResult() throws Exception {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final Serde<String> stringSerdes = Serdes.String();
final KStream<String, String> inputStream = streamsBuilder.stream(INPUT_TOPIC, Consumed.with(stringSerdes, stringSerdes));
inputStream.to(OUTPUT_TOPIC, Produced.with(stringSerdes, stringSerdes));
TopologyTestDriver testDriver = new TopologyTestDriver(streamsBuilder.build(), getStreamsConfiguration());
var inputTopic
= testDriver.createInputTopic(INPUT_TOPIC, stringSerdes.serializer(), stringSerdes.serializer());
var outputTopic
= testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerdes.deserializer(), stringSerdes.deserializer());
inputTopic.pipeInput("dummy-key0", SampleData.getScpiTransactionResults()[0]);
inputTopic.pipeInput("dummy-key1", SampleData.getScpiTransactionResults()[1]);
final var scpi
= new SCPITransactionStreamService().handleScpiResult();
KStream<String, String>[] outputMaterialize = scpi.apply(inputStream);
}
I have some issues as I need to receive the results by outputMaterialize
object, anyway my results are there in outputTopic
object.
spring.cloud.stream.kafka.streams.binder.configuration.default.deserialization.exception.handler
StreamsBuilderFactoryBeanConfigurer
but every time the handler is called in my test, the streamBridge attribute is null because of the static instantiation. How could I make it work ?
Good day everyone,
I have spring-cloud-stream
application (Spring Boot is 2.6.2 and Spring Cloud is 2021.0.0). Working with Kafka and Kafka Streams there.
Here is my configuration:
spring:
cloud:
stream:
function:
definition: supplier;streamFunction;function;consumer1;consumer2;consumer3
kafka:
binder:
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
bindings:
supplier-out-0:
binder: kafka
destination: scs-cc-supplier
streamFunction-in-0:
destination: scs-cc-supplier
streamFunction-out-0:
destination: scs-cc-streamFunction
function-in-0:
binder: kafka
destination: scs-cc-streamFunction
function-out-0:
binder: kafka
destination: scs-cc-consumer
consumer1-in-0:
destination: scs-cc-consumer
consumer2-in-0:
destination: scs-cc-consumer
consumer3-in-0:
binder: kafka
destination: scs-cc-consumer
And here is my application class:
@SpringBootApplication
public class ScsClassCastApplication {
public static void main(String[] args) {
SpringApplication.run(ScsClassCastApplication.class, args);
}
private final Random random = new Random();
@Bean
public Supplier<Message<ClassA>> supplier() {
return () -> MessageBuilder.createMessage(
new ClassA(random.ints(0, 10).findFirst().getAsInt()),
new MessageHeaders(
Map.of(KafkaHeaders.MESSAGE_KEY, String.valueOf(random.ints(0, 10).findFirst().getAsInt())))
);
}
@Bean
public Function<KStream<String, ClassA>, KStream<String, ClassB>> streamFunction() {
return stream -> stream.mapValues(classA -> new ClassB(String.valueOf(classA.a)));
}
@Bean
public Function<ClassB, Message<ClassC>> function() {
return classB -> MessageBuilder.createMessage(
new ClassC(Integer.parseInt(classB.b)),
new MessageHeaders(Map.of(KafkaHeaders.MESSAGE_KEY, classB.b))
);
}
@Bean
public Consumer<KStream<String, ClassC>> consumer1() {
return stream -> stream.peek((k, v) -> System.out.println("No class cast: " + k + " " + v));
}
@Bean
public Consumer<KStream<String, String>> consumer2() {
return stream -> stream.peek((k, v) -> System.out.println("No class cast - consumer2: " + k + " " + v));
}
@Bean
public Consumer<ClassC> consumer3() {
return classC -> System.out.println("No class cast - consumer3: " + classC);
}
@AllArgsConstructor
@NoArgsConstructor
public static class ClassA {
public int a;
}
@NoArgsConstructor
@AllArgsConstructor
public static class ClassB {
public String b;
}
@NoArgsConstructor
@AllArgsConstructor
@ToString
public static class ClassC {
public int c;
}
}
And I'm getting following issue for consumer1
:
...
Caused by: java.lang.ClassCastException: class com.scs.classcast.ScsClassCastApplication$ClassB cannot be cast to class com.scs.classcast.ScsClassCastApplication$ClassC (com.scs.classcast.ScsClassCastApplication$ClassB and com.scs.classcast.ScsClassCastApplication$ClassC are in unnamed module of loader 'app')
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41) ~[kafka-streams-3.0.0.jar:na]
...
At the same time consumer2
and consumer3
are working fine:
No class cast - consumer2: 5 {"c":5}
No class cast - consumer3: ScsClassCastApplication.ClassC(c=5)
Why does it try to work with ClassB in consumer1
? ClassC object was explicitly passed in function
and ClassC is being expected at the consumer1
KStream.
And how can I make it work finally?
Thanks in advance for any help!