sobychacko on main
Update Spring Boot from `3.0.0-… (compare)
Hi,
I use azureeventhub with springboot, springcloud stream with below dependencies.
Right now, I have problem in consuming records in an application with in a 250 ms. Sometime, randomly records are consumed after 50 secs from any of the topics and partitions configured.
but, i dont know the exact reasons. Could any one help on this?
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Setup Details:
Hi,
I have to create a stream in scdf with kafka topic as source, a processor and sink. As we dont have kafka topic as source application, is there a way to create a stream without creating a new source application for kafka topic. As per the scdf document, a stream can be created like stream create --definition ":myDestination > log" --name ingest_from_broker --deploy
if there is only source an sink. How can the same be created if we have to provide kafka topic as source to the processor?
enable.auto.commit
is set to false as per consumer print in the log. In this case I would assume that the default ackMode
would be RECORD
since batch-mode
is false
by default. Could someone please confirm? Also is there a way to verify the ackMode
by any chance?
@Input
and @Ooutput
annotations and configure the binder and destination in the .yaml
file. However, the mentioned annotations are deprecated in favour of the new functional model. I'm wondering how may I connect spring integration channels to an external broker? I appreciate it if someone can help.
Hi, I'm using Spring Cloud Stream 3.2.1 which is using Spring Cloud Function. I was notified that Spring Cloud Function has a vulnerability that was fixed: https://spring.io/blog/2022/03/29/cve-report-published-for-spring-cloud-function
Is Spring Cloud Stream affected by this vulnerability?
Hello, I'm upgrading to latest spring-boot(to 2.6.5 from 2.3.3) spring-cloud(2021.0.1 from Hoxton.SR8) versions I started getting this error when sending a message to kafka, I'm using deprecated annotation style.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:904)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:861)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:993)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:649)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:513)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 81 more
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:901)
... 88 more
I'm not configuring any key or value serializer, any idea what changed? thanks in advance.
Hi, I just updated to spring cloud stream 3.2.2 and I have a weird issue in one of my servers. I use in several servers a reactive consumer. It worked everywhere with 3.1.3, but fails only one one server after migrating to 3.2.2. I get an error that was mentionned in this channel earlier:
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: null or too small array, need between 2 and 8 values
Caused by: java.lang.IllegalArgumentException: null or too small array, need between 2 and 8 values
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.bindFunctionToDestinations(FunctionConfiguration.java:515)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.afterPropertiesSet(FunctionConfiguration.java:421)
The following diff (changing from consummer to function, like the docs advise) fixes the problem:
@Bean
- public Consumer<Flux<Message<String>>> consumeNotification() {
- return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE).subscribe(this::consume);
+ public Function<Flux<Message<String>>, Mono<Void>> consumeNotification() {
+ return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE).doOnNext(this::consume).then();
But I don't understand why it works in all the servers but one. (there are some subtle differences in the spring context because of different transitive dependencies, but I didn't look into it too much. I guess that's where the problem comes from). This is all open source code so reproducing is possible. Does someone have an idea ? Thanks in advance
(in application.yaml, I have
spring:
cloud:
function:
definition: consumeNotification
stream:
bindings:
consumeNotification-in-0:
destination: XYZ
spring boot 2.6.6, Spring v5.3.18
)
@RunWith(SpringRunner.class)
@SpringBootTest(args = {"--spring.cloud.stream.bindings.toStream-out-0.destination=foo", "--spring.cloud.stream.source=toStream"})
public class AppTest
{
@Test
public void shouldAnswerWithTrue()
{
assertTrue( true );
}
@SpringBootApplication
static public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean
public Consumer<Flux<Message<String>>> consumeNotification() {
return f -> {};
}
@Service
static class MyService {
@Autowired
private StreamBridge streamBridge;
}
}
}
fails with
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: null or too small array, need between 2 and 8 values
Caused by: java.lang.IllegalArgumentException: null or too small array, need between 2 and 8 values
removing the unrelated consumer bean fixes it.
or removing the source: declaration fixes it (but maybe it errors at runtime when you send the first message ?)
Hi all, I upgraded my springboot version to 2.6.6
and then start having test failures due to following issue (I'm using 2021.0.0 for spring.cloud-version, I tried 2021.0.1, error still persists)
Caused by: java.lang.NoSuchMethodError: java.util.List.of(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/util/List;
at org.springframework.integration.support.json.JacksonJsonUtils.<clinit>(JacksonJsonUtils.java:58)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.<init>(KafkaMessageDrivenChannelAdapter.java:139)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:735)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:163)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:426)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:92)
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:180)
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:137)
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118)
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58)
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
... 79 more
I suspected that it has something to do with having both kafka and kafka-stream binding in the topology. So I commented out the kafka binding and then this error disappeared. Does anyone know if this is a known issue?
I've seen similar error like this before that's related having both kafka and kafka-stream binding in the application. Please advise, thanks!
Hi all,
I'm using spring cloud stream functional (2021.0.1) and need to specify a StoreSupplier for materialized-as for a GlobalKTable (instead of just a custom store name)
The streams function signature looks like:
@Bean
fun myAggregator() =
Function { stream1: KStream<String, Type1> ->
Function { stream2: KStream<String, Type2> ->
Function { stream3: KStream<String, Type3> ->
Function { myGlobalTable: GlobalKTable<String, MyType> ->
The KafkaStreamsFunctionProcessor
(AbstractKafkaStreamsBinderProcessor
) will instantiate a GlobalKTable
, when parameter type of a function is assignable from GlobalKTable.class
(optional with a custom store name (when specified in the extended consumer properties)
...
return streamsBuilder.globalTable(
this.bindingServiceProperties.getBindingDestination(destination),
consumed,
getMaterialized(storeName, k, v));
...
private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(
String storeName, Serde<K> k, Serde<V> v) {
return Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(k).withValueSerde(v);
}
Since Materialized.as(storeSupplier)
is not supported by the function processor, I tried to define a bean like:
@Bean
fun myBean(
streamsBuilderFactoryBean: StreamsBuilderFactoryBean, myStoreSupplier: KeyValueBytesStoreSupplier
): GlobalKTable<String, MyType> =
streamsBuilderFactoryBean.`object`.globalTable(MyTopic, Materialized.`as`(myStoreSupplier))
and inject it into my streams function instead of a parameter in the method signature, but bean streamsBuilderFactoryBean is not available.
The reason for this need is, that different StoreSupplier will be declared, depending on the spring profile, eg:
@Profile("prod || stage || test")
@Configuration
class PersistentStoreConfiguration {
@Bean
fun myStoreSupplier(): KeyValueBytesStoreSupplier =
Stores.persistentTimestampedKeyValueStore(MyStore.NAME)
}
@Profile("it || dev")
@Configuration class InMemoryStoreConfiguration {
@Bean
fun myStoreSupplier(): KeyValueBytesStoreSupplier =
Stores.inMemoryKeyValueStore(MyStore.NAME)
}
How should I declare the globalTable with my store supplier with spring cloud stream functional?
spring.cloud:
aws:
security:
cognito:
enabled: false
ses:
enabled: false
stream:
bindings:
helloProcessor-in-0:
binder: kafka
destination: some-input-topic-name
group: microservice-example
helloProcessor-out-0:
binder: kafka
destination: some-output-topic-name
group: microservice-example
helloSink-in-0:
binder: kafka
destination: some-output-topic-name
group: microservice-example
helloDeadLetterSink-in-0:
binder: kafka
destination: some-output-topic-name.microservice-example.dlq
group: microservice-example
helloQueueSink-in-0:
binder: sqs
destination: LOCAL_some_sink_input_queue
helloQueueProcessor-in-0:
binder: sqs
destination: LOCAL_some_processor_input_queue
helloQueueProcessor-out-0:
binder: sns
destination: LOCAL_some_processor_output_topic
helloQueueSupplier-out-0:
binder: sns
destination: LOCAL_some_processor_output_topic
kafka:
binder:
brokers: ${KAFKA_BROKERS}
bindings:
helloSink-in-0:
consumer:
enableDlq: true
dlqName: some-output-topic-name.microservice-example.dlq
autoCommitOnError: true
autoCommitOffset: true
sqs:
bindings:
helloQueueSink-in-0:
consumer:
snsFanout: false
helloQueueProcessor-in-0:
consumer:
snsFanout: false
sns:
bindings:
helloQueueProcessor-out-0:
function:
definition: helloProcessor;helloSink;helloDeadLetterSink;helloQueueSink;helloQueueProcessor;helloQueueSupplier
I have application.yml configuration i.e
cloud:
stream:
poller:
# Cron for polling data.
cron: 0 0/30 * * * *
........
I face error like
Description:
The following configuration properties are mutually exclusive:
spring.integration.poller.cron
spring.integration.poller.fixed-delay
spring.integration.poller.fixed-rate
However, more than one of those properties has been configured at the same time:
spring.integration.poller.cron
spring.integration.poller.fixed-delay
Action:
Update your configuration so that only one of the mutually exclusive properties is configured.
I have looked at PollerConfigEnvironmentPostProcessor code I see fix-delay added if absent
//TODO Must remain after removal of deprecated code above in the future
streamPollerProperties.putIfAbsent(INTEGRATION_PROPERTY_PREFIX + "fixed-delay", "1s");
streamPollerProperties.putIfAbsent(INTEGRATION_PROPERTY_PREFIX + "max-messages-per-poll", "1");
Spring cloud version 2021.0.1
Function<Flux<Foo>, Flux<Bar>>
reading from an input topic with 5 partitions. Since it's using Flux, I can't use input.consumer.concurrency=5
, so what is the recommendation for handling messages from each of the 5 partitions in parallel when using Flux? Do I need to create 5 Functions which all have the same input destination? Or is there a better way?
I'm trying to bridge a MessageChannel from an IntegrationFlow to Spring Cloud Stream. I have an Integration Flow with a defined outputChannel, which I am wiring into the following Bean:
@Bean(name =PUBLISHER)
public Publisher<Message<Object>> myListener(@Qualifier(OUTPUT_CHANNEL_BEAN) MessageChannel outputChannel) {
return IntegrationFlows.from(outputChannel)
.toReactivePublisher();
}
And then separately, wiring that Publisher into another Bean to create a Supplier<Flux<Message<?>>>:
@Bean(name = SUPPLIER)
public Supplier<Flux<Message<Object>>> mySupplier(Publisher<Message<Object>> publisher) {
return () -> Flux.from(publisher)
.subscribeOn(Schedulers.boundedElastic())
.share();
}
I will omit the other Function<>
beans for now because I don't think they are the problem. I see these logs (note the timestamps). There is not much context around these:
2022-05-03T10:36:08.57-0400 [APP/PROC/WEB/0] OUT 2022-05-03 14:36:08.574 INFO [myapp,,] 9 --- [ main] o.s.integration.channel.DirectChannel : Channel 'myapp-1.OutputChannelBean' has 1 subscriber(s).
...
...
...
2022-05-03T10:36:36.16-0400 [APP/PROC/WEB/0] OUT 2022-05-03 14:36:36.165 INFO [myapp,,] 9 --- [a.Spring.bean-1] o.s.integration.channel.DirectChannel : Channel 'myapp-1.OutputChannelBean' has 0 subscriber(s).
It seems like the Supplier is created but then some time later (in this case ~28 seconds) the channel loses its subscriber. I note that in the IntegrationReactiveUtils
class there is this line:
.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));
Is there a special configuration needed to "activate" a reactive Supplier? What is strange to me is the error message I get complains about the MessageChannel not having any subscribers:
ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'be-shard-splitter-source-1.prioritizedShardOutputChannelBean'.
Side question: For Supplier/Functions that return a Reactive payload, does SCSt subscribe to each of these beans individually and sink the message within the binder? Or does the SCSt framework compose one long reactive chain and subscribe at the end? I suspect the former, because the latter would not allow for EventRouting.
java.lang.NullPointerException: while trying to invoke the method org.springframework.messaging.SubscribableChannel.send(org.springframework.messaging.Message) of a null object returned from org.springframework.cloud.stream.binder.test.InputDestination.getChannelByName(java.lang.String)
at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:89)