spencergibb on 3.2.x
Update SNAPSHOT to 3.2.4 Bumps to next snapshot version (compare)
marcingrzejszczak on v3.2.4
Update SNAPSHOT to 3.2.4 (compare)
spencergibb on 3.2.x
don't upload docs to maven cent… (compare)
olegz on 3.2.x
Fix Rabbit test to ensure java1… (compare)
artembilan on main
Fix compile error in Localstack… (compare)
onobc on 3.2.x
Discard raw/wildcard serde matc… (compare)
Hey all, I am currently trying to write a functional style binder using batchMode but I am getting the error below. If I set batchMode to false and remove the List<> parameters my sample project works. Is there an additional setting I should use to be able to work with a List of Message<?> ?
batchMode: true
public Consumer<List<Message<?>>> handler()
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
Hi Everyone, I’m collecting a series of metrics for a kafka streams application, the application is a spring boot. application using the micrometer-core library. The issue I have is I’d like a consolidated value for the meters of a specific name. To make this a little clearer, these metrics are presented as an array of n items with n being the number of threads configured for the Kafka Streams application. I must add that I can consolidate the values by adding the sum of the ‘FunctionCounter’ totals. I however don’t have a mechanism to trigger the ‘aggregator method’ when the meters I’m interested in are updated. The aggregator method is copied below:
private Double aggregateValues(String idName){
return meterRegistry.getMeters().stream()
.filter(meter -> meter.getId().getName().startsWith(idName))
.filter(FunctionCounter.class::isInstance)
.map(FunctionCounter.class::cast)
.mapToDouble(FunctionCounter::count)
.sum();
}
I created a configuration bean to attempt the same, no joy.
@Configuration
@Component
@Slf4j
public class Metrics {
@Bean
public FunctionCounter getAggregateCounter(MeterRegistry registry) {
List<FunctionCounter> counters = registry.getMeters().stream().
filter(meter -> meter.getId().getName().
startsWith("Output_Message_Count"))
.filter( FunctionCounter.class::isInstance )
.map(FunctionCounter.class::cast)
.collect(Collectors.toList());
FunctionCounter counter = FunctionCounter
.builder("Combined_Output_Message_Count", counters, state -> state.stream().mapToDouble(FunctionCounter::count).sum())
.description("a description of what this counter does")
.tags("region", "test")
.register(registry);
return counter;
}
}
An example of the raw data output from actuator/prometheus endpoint is listed below
# HELP kafka_stream_thread_task_created_total The total number of newly created tasks
# TYPE kafka_stream_thread_task_created_total counter
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-4",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-3",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-2",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-1",} 5.0
The end goal is to have a meter that consolidates the totals, in the case of the raw data shown above, 20
There has been talk of this work being done in a proposal but it hasn’t progressed beyond the initial proposal stage
The proposal can be viewed at. The following url :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-674%3A+Metric+Reporter+to+Aggregate+Metrics+in+Kafka+Streams
Hi folks. I am running into a classcast exception when using the Kafka binder together with the Kafka Streams binder, and I'm not using the spring-cloud-stream-test-support
dependency (this is happening while running the app normally):
class jdk.proxy2.$Proxy134 cannot be cast to class org.springframework.messaging.MessageChannel (jdk.proxy2.$Proxy134 is in module jdk.proxy2 of loader 'app'; org.springframework.messaging.MessageChannel is in unnamed module of loader 'app')
I've got a regular Consumer
Bean consuming a certain KTable, but I also have a REST endpoint that can post messages to another topic. For that I am using the StreamBridge
. Wiring it all up gave me the classcast exception.
I have this stream binding function as the following:
public BiFunction<KStream<String, String>, KTable<String, String>, KStream<String, String>> process() {
...
}
As you see the second input param I chose to materialize to ktable. My question is that is it possible to set the first input binding consumer to use latest
for auto.offset.reset
and the second input binding which is the ktable consumer to use earliest
?
num.stream.threads
setting for the stream app. Let's say that I have a stream app consuming from 2 input topics and I have 1
partition per topic and also have num.stream.threads
set to 1
. In this case do I get this single stream thread assigned to the partition for both topics?
Hi,
I have SCS app that works with Kafka Streams and also should be able to consume messages from RabbitMQ (Spring Boot 2.5.5 and Spring Cloud 2020.0.3).
These are my dependencies:
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
This is my code:
@Bean
public BiConsumer<KStream<String, Event1>, KTable<String, Event2>> processEvents() {
// this one is working if I remove spring-cloud-stream-binder-rabbit from dependencies and configuration
}
@Bean
public Consumer<Event1> gpsEvents() {
// this one should consume events from Rabbit
return System.out::println;
}
This is my application.yaml
:
spring:
rabbitmq:
addresses: amqps://rabbitmq-url
cloud:
stream:
function:
definition: processEvents;gpsEvents
bindings:
processEvents-in-0:
binder: kafka
destination: kafka-topic1
processEvents-in-1:
binder: kafka
destination: kafka-topic2
gpsEvents-in-0:
binder: rabbit
destination: rabbit-queue
When I run this I'm gettingNo bean named 'kafka-KafkaStreamsBinderConfigurationProperties' available
If I set default-binder: kafka
and remove binder: kafka
from processEvents
configuration I'm gettingjava.lang.ClassCastException: class com.sun.proxy.$Proxy110 cannot be cast to class org.springframework.messaging.MessageChannel
Any advice on how to make friends kafka-streams and rabbit binders? Thanks in advance
Hi! I'm trying to implement a delayed retry queue with Kafka + latest Spring Cloud Stream. The basic idea is: create a function which receives a message, waits until a timestamp which is contained in the message header, then forward it. I have two questions:
1) How do I avoid Kafka consumer timeout/rebalance?
2) How do I skip the deserialization/serialization configuration so I can just "shovel" the content regardless it's type?
Thanks in advance!
Hello Team, I might be missing something basic with deserialization configuration, could you please advise ?
Application 1
@Bean
public Function<KStream<String, TaskRequest>, KStream<String, TaskRequest>> process1() {
}
Header _ TypeId _ is set to value “com.xxx.model.TaskRequest” for the data sent to the output topic
Application 2
@Bean
public Function<KStream<String, ObjectNode>, KStream<String, ObjectNode>> process2() {
}
Is it possible to receive the data from output topic of Application 1 as generic Jackson ObjectNode ? Doing so fails with below deserialization error :
"class":"org.springframework.messaging.converter.MessageConversionException","msg":"failed to resolve class name. Class not found [com.xxx.model.TaskRequest]; nested exception is java.lang.ClassNotFoundException: com.xxx.model.TaskRequest","stack":["org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
Hello, we are getting an error when using cloud stream + kafka binder. And it happens only for 1 specific Bean. Not sure why.
The error stack is :
021-11-25 11:48:44.777 ERROR 74215 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : java.lang.NoClassDefFoundError: kotlinx/coroutines/Dispatchers
at org.springframework.cloud.function.context.config.CoroutinesUtils.invokeSuspendingSupplier(CoroutinesUtils.kt:103)
at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.invoke(KotlinLambdaToFunctionAutoConfiguration.java:163)
at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.apply(KotlinLambdaToFunctionAutoConfiguration.java:127)
at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.get(KotlinLambdaToFunctionAutoConfiguration.java:179)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:657)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:506)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.get(SimpleFunctionRegistry.java:517)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.get(PartitionAwareFunctionWrapper.java:83)
at org.springframework.integration.dsl.IntegrationFlows$1.doReceive(IntegrationFlows.java:174)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:444)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Our gradle dep looks something like this
plugins {
id("org.springframework.boot") version "2.5.6"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
id("com.github.davidmc24.gradle.plugin.avro") version "1.2.1"
kotlin("jvm") version "1.5.31"
kotlin("plugin.spring") version "1.5.31"
kotlin("plugin.jpa") version "1.5.31"
kotlin("plugin.allopen") version "1.5.31"
}
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
}
extra["springCloudVersion"] = "2020.0.4"
extra["testcontainersVersion"] = "1.16.0"
not able to figure out why it fails
Hello, I am pretty new to Spring Cloud Stream and I am having trouble trying to migrate my tests to use the APIs provided by the new spring-cloud-stream test-jar library and getting rid of the old one - spring-cloud-stream-test-support
.
My setup includes Kafka stream binder and Apache Avro key and value serializer/deserializer for Pojos.
The MessageCollector (old library) was able, somehow, to return Object payload that I can cast to the Pojo class and easily compare my expected object with the received one. I am not pretty sure whether it capture the message argument before the Serializer is activated or the message is being serialized and deserialized back... but It works for me (for the purpose of the test).
Now, I should use the OutputDestination API which gives me Message<byte[]>
payload which I cannot compare easily in a readable way. Is there a way somehow to inject a Deserializer or enable it automatically (like my service code does) and receive and object payload.
I'm facing an issue with the HTTP source app in a spring cloud data flow stream.
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:98) ~[spring-core-5.3.10.jar:5.3.10]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ? org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.cloud.sleuth.instrument.web.TraceWebFilter [DefaultWebFilterChain]
|_ checkpoint ? org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
|_ checkpoint ? HTTP POST "/" [ExceptionHandlingWebHandler]
This happens when I try to post an HTTP request with huge body size to the HTTP endpoint.
I have tried setting the following property in the deployment which didn't help me:
app.http.spring.codec.max-in-memory-size: 10MB
spring cloud starter HTTP Kafka source app version used is 3.1.1
Does anyone have a clue about how to fix this?
Hello. I am trying to use the confluent registry with spring cloud kafka stream binder:
my config looks like:
spring:
application:
name: "jira-metrics"
cloud:
stream:
default:
contentType: "application/json"
function:
definition: wasteProcessor
bindings:
wasteProcessor-in-0:
destination: ${KAFKA_TOPIC_JIRA_ISSUES_SOURCE}
wasteProcessor-out-0:
destination: ${KAFKA_TOPIC_JIRA_ISSUES_WASTE}
kafka:
streams:
binder:
applicationId: ${spring.application.name}
brokers: ${KAFKA_BOOTSTRAP_SERVER}
configuration:
schema.registry.url: localhost:8080/api
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
I am getting error:
Exception in thread "jira-metrics-48381cfa-0217-4af3-812a-dcf8456b8918-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:82)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:957)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1009)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:907)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:532)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
... 9 more
My processor looks like:
@Bean
public Function<KStream<JiraKey, JiraIssue>, KStream<String, Waste>> wasteProcessor() {
return input -> input
.map((key, value) -> new KeyValue<>(key.getKey(), issueParser.parse(value)))
.filter((key, value) -> value.getResolution() != null && (value.getResolution().getName().equals(RESOLUTION_CANCELLED)))
.map((key, value) -> new KeyValue<String, Waste>(key, new Waste(MEASUREMENT, key, value.getStatus().getName(), value.getResolution().getName(), value.getUpdateDate().getMillis())));
}
Any ideas? Is this a supported setup or is only Avro supported?
Hi all,
I am trying to build a kafka strems app (spring boot based) using spring-cloud-stream Functional approach (Kotlin).
I had a major issue when running the the server, it failed to bind the producer and the consumer with the following message:
'o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds
java.lang.ClassCastException: class com.sun.proxy.$Proxy72 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy72 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')'
I then took out all the domain logic and dependencies and left only these:
extra["springCloudVersion"] = "2020.0.4"
dependencies {
implementation("org.apache.kafka:kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.cloud:spring-cloud-stream")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
implementation("org.springframework.kafka:spring-kafka")
but the issue persists.
My bean function looks like this@Bean
fun process(): Function<KStream<String, String>, KStream<String, String>> {
return Function<KStream<String, String>, KStream<String, String>> {
Do you know the source of this class cast exception?
Hi everyone, this is my first post here, so please have some patience :). I'm trying to deploy a Spring Cloud Stream application on Kubernetes, application which starts on my machine. When deployed in a Docker Container, I receive the following error message and the app fails to start:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed;
nested exception is java.lang.NoClassDefFoundError: sun.misc.Unsafe
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.AbstractBeanFactory$$Lambda$268/00000000A1693790.getObject(Unknown Source)
Java version is 11, with Spring Boot 2.6.1 and Spring Cloud 2021.0.0
Any help would be greatly appreciated.
Hi am using the latest stack Java version is 17, with Spring Boot 2.6.1 and Spring Cloud 2021.0.0
package micro.apps.service
import mu.KotlinLogging
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.*
import java.util.function.Consumer
import java.util.function.Function
import java.util.function.Supplier
private val logger = KotlinLogging.logger {}
@SpringBootApplication
class DemoApplication
fun main(args: Array<String>) {
runApplication<DemoApplication>(*args)
}
data class MyModel(
var name: String? = null,
var city: String? = null,
var state: String? = null
)
@Configuration
class KafkaConfiguration {
@Bean
fun generate(): Supplier<MyModel> = Supplier {
MyModel(UUID.randomUUID().toString(), "Paradise", "CA")
}
@Bean
fun city(): Function<KStream<String, MyModel>, KStream<String, String>> = Function {
it.mapValues { v -> v.city }
}
@Bean
fun state(): Function<KStream<String, MyModel>, KStream<String, String>> = Function {
it.mapValues { v -> v.state }
}
@Bean
fun print(): Consumer<KStream<String, String>> = Consumer {
it.peek { k, v -> logger.info("Received: {}, {}", k, v) }
}
}
spring:
cloud:
stream:
bindings:
generate-out-0.destination: all-in-topic
city-in-0.destination: all-in-topic
city-out-0.destination: city-out-topic
state-in-0.destination: all-in-topic
state-out-0.destination: state-out-topic
print-in-0.destination: city-out-topic,state-out-topic
function:
definition: generate;city;state;print
kafka:
streams:
binder:
brokers: localhost:9092
Geeting error
[2021-12-23 11:36:28.901] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Retrieving cached binder: kstream
[2021-12-23 11:36:28.901] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Creating binder: globalktable
[2021-12-23 11:36:28.919] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Caching the binder: globalktable
[2021-12-23 11:36:28.919] - 52714 INFO [restartedMain] --- org.springframework.cloud.stream.binder.DefaultBinderFactory: Retrieving cached binder: globalktable
[2021-12-23 11:36:28.919] - 52714 WARNING [restartedMain] --- org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext: Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.endpoint.EventDrivenConsumer: Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.channel.PublishSubscribeChannel: Channel 'application.errorChannel' has 0 subscriber(s).
[2021-12-23 11:36:28.920] - 52714 INFO [restartedMain] --- org.springframework.integration.endpoint.EventDrivenConsumer: stopped bean '_org.springframework.integration.errorLogger'
generate
function, it starts fine. @Bean
fun generate(): Supplier<MyModel> = Supplier {
MyModel(UUID.randomUUID().toString(), "Paradise", "CA")
}
Hi Everybody!
@Bean
public Function<Flux<Message<OfferIncomingEvent>>, Flux<Message<OfferOutgoingEvent>>> offerValidatorAutomatic() {
return this::validateOfferAutomatic;
}
I have that Function which is procesing events from one kafka topic to another. My question is the following:
Is there a way to set up the binding between this two topics so that spring would forward the ALL the headers automatically?
Because if i remove the Message<?> type of the Flux parameter the headers from the input topic doesnt get forwarded.
Also, in the current setting i have to manually added them when building the outgoin message.
Thank you for your attention!
What about stream-applications project, the idea is to
be useful in the end-user applications as-is
So to test, I just need to run the jar application like in the quick start, right ?
Hi there,
I'm using spring boot (2.6.2) + spring cloud stream (2021.0.0).
Using both Kafka and Kafka Streams channels.
Here is my application.yaml
:
cloud:
stream:
function:
definition: test;test2
kafka:
streams:
binder:
configuration:
commit.interval.ms: 1000
application.server: ${SERVER_HOST:localhost}:${SERVER_PORT:8080}
replication-factor: ${KAFKA_BROKERS_REPLICATION_FACTOR:1}
auto-add-partitions: true
binder:
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
brokers: ${KAFKA_BROKERS:localhost:9092}
replication-factor: ${KAFKA_BROKERS_REPLICATION_FACTOR:1}
auto-add-partitions: true
bindings:
test-in-0:
destination: test-in
test-out-0:
destination: test-out
producer:
partition-count: 2
test2-out-0:
binder: kafka
destination: test2-out
producer:
partition-count: 2
Is there a possibility to configure retention time(ms) for the topics I use? I was not able to find anything in docs...
Big thanks in advance for any help!