olegz on master
GH-2082 Set 'requireSubscribers… (compare)
olegz on master
Update README.adoc Update preface.adoc (compare)
KafkaListener
is part of the Spring Kafka library and you don't need Spring Cloud Stream to use that. You use StreamListener
or functional support from Spring Cloud Stream when you need the higher-level abstractions and programming model provided. In addition, SCSt comes with a lot of opinionated choices you can rely upon. SCSt Kafka binder is built upon the foundations of Spring Kafka although it doesn't directly use KafkaListener
internally.
main
sourceset: https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/RoutingFunctionTests.java#L72;L77
StreamBridge
to send outbound messages. It is relatively a new feature. But available on the recent Horsham
SR releases. The idea is that you don't directly deal with channels, but the framework internally handles that for you. You interact with this in terms of a binding destination. OutputDestinaion
can be used to test that scenario. See these tests: https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java
KafkaListener
).
--spring.cloud.stream.bindings.output.destination=someTopic
yet the messages are published to the topic rabbitSupplier-out-0
, did I miss something in the implementation?, is there a way of saying that the FluxMessageChannel is the default output and topic bindings apply to it?
Hi @olegz ,
I have created a simple Spring cloud stream apps(source, processor and sink) where the processor and sink are transferring reactive data.
https://github.com/sumit1912/springcloud.stream.pipeline
I deploy these apps in Spring Cloud Data Flow with RabbitMQ as broker. My apps work fine and transfer data when I use, springBootVersion = "2.1.9.RELEASE" and org.springframework.cloud:spring-cloud-stream-reactive:2.2.0.RELEASE dependency.
When I update Spring boot to v2.2.6.RELEASE and replace spring-cloud-stream-reactive by "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR3" and "org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR3" I got below error:
ERROR 5206 — [ wer.http.wer-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binding.StreamListenerMessageHandler@39ce27f2]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `reactor.core.publisher.Flux` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (byte[])...
The error appears only after I replace spring-cloud-stream-reactive by spring-cloud-dependencies and spring-cloud-stream-dependencies and update Spring boot to 2.2.x
Is it because of the fact that spring-cloud-stream-reactive module is deprecated in spring-cloud-dependencies:Hoxton.SR3 and we need to use spring-cloud-function (Supplier/Function/Consumer model) for handling reactive messages?
new String(payload).event.type matches 'MyEvent-Type'.
This works fine in local Kafka, even in embedded kafka(integration-test). But when deployed to other environment for the same message i get Cannot find a @StreamListener matching for message".
Hello, I have the following setup for consumer, it is working for the first message, and then I get an exception:
INFO 47834 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application-1.myConsumer-in-0' has 0 subscriber(s).
ERROR 47834 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.myConsumer-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload={...}]
public interface Sink<I> extends Consumer<Flux<I>> {
@Override
default void accept(Flux<I> input) {
consume(input);
}
void consume(Flux<I> input);
}
@Component("myConsumer")
public class MyConsumer implements Sink<MyMessage> {
public MyConsumer() {
}
@Override
public void consume(Flux<MyMessage> input) {
input
.transform(...)
.transform(...)
.subscribe();
}
}
spring:
cloud:
stream:
function:
definition: myConsumer
bindings:
myConsumer-in-0:
destination: ...
group: myConsumerGroup
consumer:
autoRebalanceEnabled: true
partitioned: true
maxAttempts: 3
kafka:
bindings:
myConsumer-in-0:
consumer:
configuration:
value:
deserializer: ...
Not sure what am I doing wrong, any help is appreciated
maxAttempts: 3
. Here's my sample project https://github.com/natraj09/spring-cloud-stream-playground/blob/master/reactive-processor-kafka/src/main/java/reactive/kafka/ReactiveProcessorApplication.java#L52, if the rest payload has "error:true" it prints an error and throws an Exception. I only see the info log once instead of 3 times.
multiplex
consumer property to true
, you'll get a single binding consuming from both destinations.
DlqPartitionFunction
bean, but looking at the parameters, it's not clear how I could implement round robin, since I must always return the actual partition number and there isn't any info on the number of partitions for the DLQ topic. Any advise? Reference: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka-dlq-processing
Hey, I have a question for some of the motivation for the new test style for the test binders.
Unlike all other spring tests, these are no longer leveraging @Autowired
and instead using this context builder, like
context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(Application.class)
.run("--server.port=0",
"--spring.cloud.function.definition=listener");
This is a huge departure from the standard use of @SpringBootTest
and @RunWith(SpringRunner.class)
. does anyone know why this is happening? Is it out of necessity, is the direction that Spring is moving in? I haven't been able to find any documentation or blog posts about the motivation behind this.
It's making rewriting existing tests really challenging, and it feels like a code smell to rewrite the existing tests is a far more verbose way.
spring:
cloud:
stream:
function.definition: testProcess;testSource;testSink
bindings:
testProcess-in-0:
destination: rest-input,rest-input-error
group: input-group
maxAttempts: 1
testProcess-out-0:
destination: rest-output
testSink-in-0:
destination: rest-output
group: sink-group
testSource-out-0:
destination: rest-input
kafka:
bindings:
testProcess-in-0:
consumer:
enableDlq: true
dlqName: rest-input-error
Hi team,
I have created 3 simple Spring cloud stream apps(Source/Processor/Sink) using Spring cloud function approach transferring Flux<String>.
Source-app:
@SpringBootApplication
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> {
String v1 = String.valueOf("abc");
String v2 = String.valueOf("pqr");
String v3 = String.valueOf("xyz");
return Flux.just(v1, v2, v3);
};
}
}
Processor-app:
@SpringBootApplication
public class ProcessorApplication {
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux.map(value -> value.toUpperCase()).log();
}
public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}
}
Sink-app:
@SpringBootApplication
public class SinkApplication {
public static void main(String[] args) {
SpringApplication.run(SinkApplication.class, args);
}
@Bean
public Consumer<Flux<String>> log() {
return flux -> {
flux.subscribe(f -> System.out.println("Received data: " + f));
};
}
}
The dependencies that I have added are:
SpringBoot version = 2.2.6.RELEASE
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit")
implementation("org.springframework.cloud:spring-cloud-starter-function-webflux:3.0.7.RELEASE")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-webflux")
I have registered these these apps in Spring Cloud Data Flow and deployed in a stream.
I am able to transmit data to these apps and receive output both via HTTP and via RabbitMQ individually. But, the message is not communicated across the apps(Source->Processor->sink).
Am I missing any dependency, annotation or application property.
I am new to Spring cloud stream using function approach, any suggestion will be a great help :)
Complete code can be found here:https://github.com/sumit1912/springcloud.function.stream.pipeline
Hi All,
Recently I started learning and using Spring Cloud Stream. I liked the way Spring Cloud Functions simplify message publishing and consumption. But in one of my application, Beans corresponding to Functions are not detected and hence not binded to destination.
When I compared this with my sample working version, difference is, in not-working version I have a module dependency which internally uses Spring Cloud Stream. This module uses @EnableBinding annotation.
In my working version If I put this annotation, I reproduce the same issue.
It seems there is a conflict between @EnableBinding and functional support.
Any hints or suggestions will be really helpful.
Thanks.
In unit test, when I try to receive the message using OutputDestination, I get following error:
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34)
at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:37)
at com.example.loggingconsumer.LoggingConsumerApplicationTests.contextLoads(LoggingConsumerApplicationTests.java:50)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
...
@SpringBootApplication
@Controller
@EnableBinding
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
@Bean
public Consumer<CloudEventImpl<Person>> consumer() {
return p -> {
System.out.println("consumer--Received: " + p);
};
}