Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Aug 09 15:26
    onobc commented #2474
  • Aug 09 14:33
    sobychacko closed #2474
  • Aug 09 14:33
    sobychacko commented #2474
  • Aug 09 14:33

    sobychacko on main

    Update Spring Boot from `3.0.0-… (compare)

  • Aug 09 14:31
    olegz commented #2474
  • Aug 09 14:12
    sobychacko assigned #2474
  • Aug 09 13:22
    artembilan commented #2474
  • Aug 09 04:42
    onobc edited #2474
  • Aug 09 04:34
    onobc synchronize #2474
  • Aug 09 04:32
    onobc review_requested #2474
  • Aug 09 04:32
    onobc review_requested #2474
  • Aug 09 04:32
    onobc review_requested #2474
  • Aug 09 04:32
    onobc review_requested #2474
  • Aug 09 04:31
    onobc opened #2474
  • Aug 09 04:14
    onobc commented #2473
  • Aug 09 03:55
    onobc commented #2473
  • Aug 08 21:59
    onobc commented #2473
  • Aug 08 21:56
    onobc commented #2473
  • Aug 08 17:35
    artembilan commented #2471
  • Aug 08 16:25
    artembilan commented #2471
gibykthomas
@gibykthomas

Hi Folks,

I'm using the aws managed kafka as the binder for the spring cloud stream apps. How to inject the jar aws-msk-iam-auth-1.0.0-all.jar to spring cloud data flow stream to enable the aws kafka security?

Kannadasan S
@Kannadasan89

Hi,

I use azureeventhub with springboot, springcloud stream with below dependencies.

Right now, I have problem in consuming records in an application with in a 250 ms. Sometime, randomly records are consumed after 50 secs from any of the topics and partitions configured.
but, i dont know the exact reasons. Could any one help on this?

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

Setup Details:

  1. EventHub with 4 topics and each as 15 partitions configured.
  2. 15 consumer application is deployed and each application is responsible to consume from all the topics.
2 replies
Remberto Martinez
@remware
Is there a way to access KGlobalTable from simple topic Consumer ?
gibykthomas
@gibykthomas

Hi,

I have to create a stream in scdf with kafka topic as source, a processor and sink. As we dont have kafka topic as source application, is there a way to create a stream without creating a new source application for kafka topic. As per the scdf document, a stream can be created like stream create --definition ":myDestination > log" --name ingest_from_broker --deploy if there is only source an sink. How can the same be created if we have to provide kafka topic as source to the processor?

2 replies
Snofty
@snofty
Hi, I'm using spring webflux with cloud stream and rabbitmq. I would like control/backpressure with a number of messages consume from rabbitmq dynamically. say CircuitBreaker is open I would like to consume messages with less rate and if closed continue with the normal consumption process.
1 reply
Kannadasan S
@Kannadasan89
Hi, I use spring cloud stream to work with kafka.. I have 4 topics created... I have noticed that., By default 4 producers are created to send a records... If i want to create more
producers for single topic.. how to do ... Is it possible? If not, Is there any impact.?
1 reply
AttitudeL
@AttitudeL
Hi there, I have a quick question. In spring-cloud-stream kafka binding, I see that enable.auto.commit is set to false as per consumer print in the log. In this case I would assume that the default ackMode would be RECORD since batch-mode is false by default. Could someone please confirm? Also is there a way to verify the ackMode by any chance?
12 replies
tomiasko
@tomiasko
Hi, i am using strping cloud stream with azure service bus. How do I make a function called on receiving message transactional? So when there is an exception, database changes are rolled back and message is not completed and stays in a queue to be processed later?
AttitudeL
@AttitudeL
Hi another question about ackMode. I have a kafka-stream binding and in order to set the ackMode for the consumer I would go like this right?
spring.cloud.stream.kafka.streams.bindings.<FUNCTION_NAME>.consumer.ackMode=RECORD
6 replies
Arthur Kazemi
@bidadh
Hi, I've used spring-cloud-stream in a couple of projects in the past and I was able to connect internal channels to an external broker like rabbitmq using @Input and @Ooutput annotations and configure the binder and destination in the .yaml file. However, the mentioned annotations are deprecated in favour of the new functional model. I'm wondering how may I connect spring integration channels to an external broker? I appreciate it if someone can help.
7 replies
aditya parikh
@adityaparikh_twitter
I saw this example https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/partitioning-samples/rabbit-partitioning where we need to start multiple consumer instances to read off partitions.
Is there a way to pin a partition on to a thread in the same consumer instance with multiple threads?
Tiew Kee Hui
@tiewkeehui_twitter

Hi, I'm using Spring Cloud Stream 3.2.1 which is using Spring Cloud Function. I was notified that Spring Cloud Function has a vulnerability that was fixed: https://spring.io/blog/2022/03/29/cve-report-published-for-spring-cloud-function

Is Spring Cloud Stream affected by this vulnerability?

Oleg Zhurakousky
@olegz
No, Spring Cloud Stream is not affected. However, it is highly recommended you manually override the version of spring-cloud-function-context to 3.2.3
Tiew Kee Hui
@tiewkeehui_twitter
Yup, that I have already done. Thank you!
marios sofocleous
@sofocleous2_twitter
@olegz Hi Oleg , am using SCDF and getting the following error , could you please assist:
2022-04-01 22:16:07.305 ERROR [jdbc-sink,8c89b90d7e4a660c,c734edb6c619415b] 1 --- [.audit-stream-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@74a447c6]; nested exception is org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar [INSERT INTO msp_audit(transaction_slip, audit_trail) VALUES (?, ?)]; nested exception is org.h2.jdbc.JdbcSQLSyntaxErrorException: Table "MSP_AUDIT" not found; SQL statement:
INSERT INTO msp_audit(transaction_slip, audit_trail) VALUES (?, ?) [42102-200], failedMessage=GenericMessage [payload=byte[52], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=audit, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=audit.audit-stream, amqp_redelivered=false, amqp_receivedRoutingKey=audit, b3=8c89b90d7e4a660c-8c89b90d7e4a660c-0, nativeHeaders={}, amqp_timestamp=Fri Apr 01 22:16:04 GMT 2022, amqp_messageId=d69621aa-d47a-b756-f5e7-26322d7db5cd, id=84e938f1-c127-07ce-d748-783bc0ded30c, amqp_consumerTag=amq.ctag-JE5zCFVvysO1HNljnxz9YQ, sourceData=(Body:'[B@50fd7425(byte[52])' MessageProperties [headers={}, timestamp=Fri Apr 01 22:16:04 GMT 2022, messageId=d69621aa-d47a-b756-f5e7-26322d7db5cd, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=audit, receivedRoutingKey=audit, deliveryTag=2, consumerTag=amq.ctag-JE5zCFVvysO1HNljnxz9YQ, consumerQueue=audit.audit-stream]), contentType=application/json, timestamp=1648851367296}]
iguissouma
@iguissouma

Hello, I'm upgrading to latest spring-boot(to 2.6.5 from 2.3.3) spring-cloud(2021.0.1 from Hoxton.SR8) versions I started getting this error when sending a message to kafka, I'm using deprecated annotation style.

Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:904)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:861)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:993)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:649)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429)
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:513)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 81 more
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:901)
    ... 88 more

I'm not configuring any key or value serializer, any idea what changed? thanks in advance.

3 replies
Jon Harper
@jonenst

Hi, I just updated to spring cloud stream 3.2.2 and I have a weird issue in one of my servers. I use in several servers a reactive consumer. It worked everywhere with 3.1.3, but fails only one one server after migrating to 3.2.2. I get an error that was mentionned in this channel earlier:

Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: null or too small array, need between 2 and 8 values
Caused by: java.lang.IllegalArgumentException: null or too small array, need between 2 and 8 values
        at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.bindFunctionToDestinations(FunctionConfiguration.java:515)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.afterPropertiesSet(FunctionConfiguration.java:421)

The following diff (changing from consummer to function, like the docs advise) fixes the problem:

     @Bean
-    public Consumer<Flux<Message<String>>> consumeNotification() {
-        return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE).subscribe(this::consume);
+    public Function<Flux<Message<String>>, Mono<Void>> consumeNotification() {
+        return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE).doOnNext(this::consume).then();

But I don't understand why it works in all the servers but one. (there are some subtle differences in the spring context because of different transitive dependencies, but I didn't look into it too much. I guess that's where the problem comes from). This is all open source code so reproducing is possible. Does someone have an idea ? Thanks in advance

(in application.yaml, I have

spring:
  cloud:
    function:
      definition: consumeNotification
    stream:
      bindings:
        consumeNotification-in-0:
          destination: XYZ

spring boot 2.6.6, Spring v5.3.18
)

Any help would be greatly appreciated, is this a regression with the 3.2.2 release ?
Oleg Zhurakousky
@olegz
That is indeed weird. Unfortunately i can't look at it at the moment. Could you please raise an issue in s-c-stream Github with all of these details and possible way to reproduce it?
Jon Harper
@jonenst
Hi, thank you for your time. I'll try to make a small repro project and post it in a github issue.
Jon Harper
@jonenst
I investigated a bit much. It's easy to reproduce:
Jon Harper
@jonenst
@RunWith(SpringRunner.class)
@SpringBootTest(args = {"--spring.cloud.stream.bindings.toStream-out-0.destination=foo", "--spring.cloud.stream.source=toStream"})
public class AppTest 
{

    @Test
    public void shouldAnswerWithTrue()
    {
        assertTrue( true );
    }

    @SpringBootApplication
    static public class App {

        public static void main(String[] args) {
            SpringApplication.run(App.class, args);
        }

        @Bean
        public Consumer<Flux<Message<String>>> consumeNotification() {
            return f -> {};
        }

        @Service
        static class MyService {

            @Autowired
            private StreamBridge streamBridge;
        }
    }
}

fails with

Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: null or too small array, need between 2 and 8 values
Caused by: java.lang.IllegalArgumentException: null or too small array, need between 2 and 8 values

removing the unrelated consumer bean fixes it.
or removing the source: declaration fixes it (but maybe it errors at runtime when you send the first message ?)

Did i make a mistake ?
Jon Harper
@jonenst
I can confirm that sending the message works if you remove the source declaration
Jon Harper
@jonenst
it works with spring-cloud.version 2021.0.0 but fails wth 2021.0.1
Jon Harper
@jonenst
asyncify
@asyncify
Is it possible to define a function to handle a message via local dispatch? That is, if the message producer is local, and the handler is local, I could configure it to handle via kafka or via local dispatch? Is there such a thing as a 'local binder' that dispatches messages locally?
3 replies
AttitudeL
@AttitudeL

Hi all, I upgraded my springboot version to 2.6.6 and then start having test failures due to following issue (I'm using 2021.0.0 for spring.cloud-version, I tried 2021.0.1, error still persists)

Caused by: java.lang.NoSuchMethodError: java.util.List.of(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/util/List;
    at org.springframework.integration.support.json.JacksonJsonUtils.<clinit>(JacksonJsonUtils.java:58)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.<init>(KafkaMessageDrivenChannelAdapter.java:139)
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:735)
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:163)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:426)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:92)
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:180)
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:137)
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118)
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58)
    at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
    ... 79 more

I suspected that it has something to do with having both kafka and kafka-stream binding in the topology. So I commented out the kafka binding and then this error disappeared. Does anyone know if this is a known issue?
I've seen similar error like this before that's related having both kafka and kafka-stream binding in the application. Please advise, thanks!

3 replies
aditya parikh
@adityaparikh_twitter
How does multitenant data source work with Spring Cloud Stream + Spring Cloud Function? Looking for something similar to
https://www.bytefish.de/blog/spring_boot_multitenancy.html and https://www.bytefish.de/blog/spring_boot_multitenancy_webflux.html
And also could it work when we compose non-reactive and reactive functions together?
Gerald Leeb
@gerald24

Hi all,
I'm using spring cloud stream functional (2021.0.1) and need to specify a StoreSupplier for materialized-as for a GlobalKTable (instead of just a custom store name)

The streams function signature looks like:

    @Bean
    fun myAggregator() =
        Function { stream1: KStream<String, Type1> ->
            Function { stream2: KStream<String, Type2> ->
                Function { stream3: KStream<String, Type3> ->
                    Function { myGlobalTable: GlobalKTable<String, MyType> ->

The KafkaStreamsFunctionProcessor (AbstractKafkaStreamsBinderProcessor) will instantiate a GlobalKTable, when parameter type of a function is assignable from GlobalKTable.class (optional with a custom store name (when specified in the extended consumer properties)

...
        return streamsBuilder.globalTable(
                this.bindingServiceProperties.getBindingDestination(destination),
                consumed,
                getMaterialized(storeName, k, v));
...
    private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(
            String storeName, Serde<K> k, Serde<V> v) {
        return Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
                .withKeySerde(k).withValueSerde(v);
    }

Since Materialized.as(storeSupplier) is not supported by the function processor, I tried to define a bean like:

    @Bean
    fun myBean(
        streamsBuilderFactoryBean: StreamsBuilderFactoryBean, myStoreSupplier: KeyValueBytesStoreSupplier
    ): GlobalKTable<String, MyType> =
        streamsBuilderFactoryBean.`object`.globalTable(MyTopic, Materialized.`as`(myStoreSupplier))

and inject it into my streams function instead of a parameter in the method signature, but bean streamsBuilderFactoryBean is not available.

The reason for this need is, that different StoreSupplier will be declared, depending on the spring profile, eg:

@Profile("prod || stage || test")
@Configuration
class PersistentStoreConfiguration {
    @Bean
    fun myStoreSupplier(): KeyValueBytesStoreSupplier =
        Stores.persistentTimestampedKeyValueStore(MyStore.NAME)
}

@Profile("it || dev")
@Configuration class InMemoryStoreConfiguration {
    @Bean
    fun myStoreSupplier(): KeyValueBytesStoreSupplier =
        Stores.inMemoryKeyValueStore(MyStore.NAME)
}

How should I declare the globalTable with my store supplier with spring cloud stream functional?

2 replies
Justin Doehling
@JustinCoded
I think I may be running into a bug with version 3.2.2 on Java 17. My scenario is that I have both Kafka, Sqs, and Sns defined as binders, but I'm finding that specifying the binder explicitly in the app properties doesn't seem to work:
spring.cloud:
    aws:
      security:
        cognito:
          enabled: false
      ses:
        enabled: false
    stream:
      bindings:
        helloProcessor-in-0:
          binder: kafka
          destination: some-input-topic-name
          group: microservice-example
        helloProcessor-out-0:
          binder: kafka
          destination: some-output-topic-name
          group: microservice-example
        helloSink-in-0:
          binder: kafka
          destination: some-output-topic-name
          group: microservice-example
        helloDeadLetterSink-in-0:
          binder: kafka
          destination: some-output-topic-name.microservice-example.dlq
          group: microservice-example
        helloQueueSink-in-0:
          binder: sqs
          destination: LOCAL_some_sink_input_queue
        helloQueueProcessor-in-0:
          binder: sqs
          destination: LOCAL_some_processor_input_queue
        helloQueueProcessor-out-0:
          binder: sns
          destination: LOCAL_some_processor_output_topic
        helloQueueSupplier-out-0:
          binder: sns
          destination: LOCAL_some_processor_output_topic
      kafka:
        binder:
          brokers: ${KAFKA_BROKERS}
        bindings:
          helloSink-in-0:
            consumer:
              enableDlq: true
              dlqName: some-output-topic-name.microservice-example.dlq
              autoCommitOnError: true
              autoCommitOffset: true
      sqs:
        bindings:
          helloQueueSink-in-0:
            consumer:
              snsFanout: false
          helloQueueProcessor-in-0:
            consumer:
              snsFanout: false
      sns:
        bindings:
          helloQueueProcessor-out-0:

    function:
      definition: helloProcessor;helloSink;helloDeadLetterSink;helloQueueSink;helloQueueProcessor;helloQueueSupplier
org.springframework.cloud.stream.config.BindingServiceProperties#getBindingProperties seems to not be mapping the properties out correctly. Am I doing something wrong?
Justin Doehling
@JustinCoded
On line 297, in debugger I can see the properties in the this.bindings but after the get() the binding part of the property goes missing. It's just a get() from a TreeMap
aditya parikh
@adityaparikh_twitter
I had asked this question but probably got lost in the mix
How does multitenant data source work with Spring Cloud Stream + Spring Cloud Function? Looking for something similar to
https://www.bytefish.de/blog/spring_boot_multitenancy.html and https://www.bytefish.de/blog/spring_boot_multitenancy_webflux.html
And also could it work when we compose non-reactive and reactive functions together?
Probably got lost in the mix, @sobychacko @olegz or anyone please advise.
1 reply
Salman khandu
@salman-khandu

I have application.yml configuration i.e

 cloud:
    stream:
      poller:
        # Cron for polling data.
        cron: 0 0/30 * * * *
        ........

I face error like

Description:

The following configuration properties are mutually exclusive:

    spring.integration.poller.cron
    spring.integration.poller.fixed-delay
    spring.integration.poller.fixed-rate

However, more than one of those properties has been configured at the same time:

    spring.integration.poller.cron
    spring.integration.poller.fixed-delay


Action:

Update your configuration so that only one of the mutually exclusive properties is configured.

I have looked at PollerConfigEnvironmentPostProcessor code I see fix-delay added if absent

//TODO Must remain after removal of deprecated code above in the future
        streamPollerProperties.putIfAbsent(INTEGRATION_PROPERTY_PREFIX + "fixed-delay", "1s");
        streamPollerProperties.putIfAbsent(INTEGRATION_PROPERTY_PREFIX + "max-messages-per-poll", "1");

Spring cloud version 2021.0.1

blake-bauman
@blake-bauman
I've got a Function<Flux<Foo>, Flux<Bar>> reading from an input topic with 5 partitions. Since it's using Flux, I can't use input.consumer.concurrency=5, so what is the recommendation for handling messages from each of the 5 partitions in parallel when using Flux? Do I need to create 5 Functions which all have the same input destination? Or is there a better way?
Salman khandu
@salman-khandu
There was an issue for mutual exclusive property for the poller. Pull request raised for the fix. When will it available spring-cloud/spring-cloud-stream#2366 ?
2 replies
showreddy
@showreddy7_twitter
I'm using SCSt functions using kafka binder, My use-case is to consume 50 records every 30 min, is that a possibility if so could you share some reference. or Is it possible to poll for more than 1 record using PollableMessageSource in SCSt. Thanks.
Ryan R
@rrileyca

I'm trying to bridge a MessageChannel from an IntegrationFlow to Spring Cloud Stream. I have an Integration Flow with a defined outputChannel, which I am wiring into the following Bean:

@Bean(name =PUBLISHER)
    public Publisher<Message<Object>> myListener(@Qualifier(OUTPUT_CHANNEL_BEAN) MessageChannel outputChannel) {
        return IntegrationFlows.from(outputChannel)
                .toReactivePublisher();
    }

And then separately, wiring that Publisher into another Bean to create a Supplier<Flux<Message<?>>>:

@Bean(name = SUPPLIER)
    public Supplier<Flux<Message<Object>>> mySupplier(Publisher<Message<Object>> publisher) {
        return () -> Flux.from(publisher)
                .subscribeOn(Schedulers.boundedElastic())
                .share();
    }

I will omit the other Function<> beans for now because I don't think they are the problem. I see these logs (note the timestamps). There is not much context around these:

   2022-05-03T10:36:08.57-0400 [APP/PROC/WEB/0] OUT 2022-05-03 14:36:08.574  INFO [myapp,,] 9 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'myapp-1.OutputChannelBean' has 1 subscriber(s).
...
...
...
   2022-05-03T10:36:36.16-0400 [APP/PROC/WEB/0] OUT 2022-05-03 14:36:36.165  INFO [myapp,,] 9 --- [a.Spring.bean-1] o.s.integration.channel.DirectChannel    : Channel 'myapp-1.OutputChannelBean' has 0 subscriber(s).

It seems like the Supplier is created but then some time later (in this case ~28 seconds) the channel loses its subscriber. I note that in the IntegrationReactiveUtils class there is this line:

.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));

Is there a special configuration needed to "activate" a reactive Supplier? What is strange to me is the error message I get complains about the MessageChannel not having any subscribers:

ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'be-shard-splitter-source-1.prioritizedShardOutputChannelBean'.

Side question: For Supplier/Functions that return a Reactive payload, does SCSt subscribe to each of these beans individually and sink the message within the binder? Or does the SCSt framework compose one long reactive chain and subscribe at the end? I suspect the former, because the latter would not allow for EventRouting.

Ryan R
@rrileyca
As an addon to my comment above, if I wire in the Supplier and just .subscribe(System.out::println) to it, the error goes away. It seems like the Supplier is just not getting subscribed to, but I don't know why.
22 replies
Ryan R
@rrileyca
Thanks!
I’ll keep testing on my end but curious to hear if you find a solution.
gibykthomas
@gibykthomas
I'm trying to use aws cognito for the spring cloud data flow(scdf) security authorization. Any one using cognito with scdf? If yes, could you please share the configuration.
ZHANG-GXJ
@ZHANG-GXJ
Hi there. When I try to write a UT for spring cloud stream function based method. I'm getting a NullPointerException about InputDestination. Can anyone help on this? I have posted a question on StackOverFlow: https://stackoverflow.com/questions/72126026/how-to-write-unit-test-for-spring-cloud-stream-function-based-method
java.lang.NullPointerException: while trying to invoke the method org.springframework.messaging.SubscribableChannel.send(org.springframework.messaging.Message) of a null object returned from org.springframework.cloud.stream.binder.test.InputDestination.getChannelByName(java.lang.String)

    at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:89)
LukaszKusmierczyk
@LukaszKusmierczyk
This message was deleted
1 reply
Oleg Zhurakousky
@olegz
@/all I'd like to thank everyone who has helped to answer questions here and make the announcement that the Spring Cloud Stream development team would prefer to concentrate efforts on answering community questions using stack overflow instead of gitter. We do our best to monitor Stack Overflow channels and answer questions as diligently as we can. In the end we must stress that all these channels are community channels and we encourage all to participate in answering them. In about a month this gitter channel will be deleted. Please post all questions on stack overflow using the spring-cloud-stream tag, https://stackoverflow.com/questions/tagged/spring-cloud-stream
Soby Chacko
@sobychacko
This forum is closed for new questions. See Oleg's note above. Please ask questions in StackOverflow with the right tag attached.