olegz on 3.0.x
Fix test binder usage examples … (compare)
olegz on master
Fix test binder usage examples … (compare)
olegz on master
GH-2078 Add support for honorin… (compare)
olegz on 3.0.x
GH-2078 Add support for honorin… (compare)
olegz on master
GH-2077 Clarify StreamBridge.se… (compare)
olegz on 3.0.x
GH-2077 Clarify StreamBridge.se… (compare)
Hi, I have te following error while consuming a Message from a Kafka Queue using stream functions:
2020-05-21 14:22:39.660 ERROR 19052 --- [container-0-C-1] onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@3da3dc99}
org.springframework.transaction.reactive.TransactionContextManager$NoTransactionInContextException: No transaction in context
The problem is presented when I use Spring cloud stream functions with mongo. The code to reproduce is the following:
Repository
@Repository
public interface UsersRepo extends ReactiveMongoRepository<User, String> {
}
Consumer
@Bean
public Function<Flux<Message<String>>, Mono<Void>> toUpperCase() {
return messageFlux -> { return messageFlux
.map( msg -> {
String msgStr = msg.getPayload();
return new User().setId("1231").setName(msgStr);
})
.flatMap(usersRepo::save)
.then();
};
}
If I replace usersRepo::save by a Mono.just(something) the error is not presented. Please, could you help me, what I'm doing wrong? Thanks
What could be the cause for Dispatcher has no subscribers for channel 'classification-group-provider-1.productMapping-in-0'.
? My function looks like
@Configuration
class ProductMapper(
private val mappingService: MappingService
) {
@Bean
fun mapClassificationGroup(): Function<Flux<Product>, Flux<Message<ProductOfSource>>> = Function { products ->
products
.concatMap { handleProduct(it) }
.onErrorStop()
}
}
and my configuration like
spring:
cloud:
function:
definition: mapClassificationGroup
stream:
bindings:
mapClassificationGroup-in-0:
destination: product-updates
group: ${spring.application.name}
mapClassificationGroup-out-0:
destination: products-of-source
producer:
partition-key-expression: headers['productId']
partition-count: 32
kafka:
binder:
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
to me it looks like all my other services and I cannot spot the error :-S
messageCollector.forChannel(myChannel.myDestination()).poll(1, TimeUnit.SECONDS);
KafkaListener
is part of the Spring Kafka library and you don't need Spring Cloud Stream to use that. You use StreamListener
or functional support from Spring Cloud Stream when you need the higher-level abstractions and programming model provided. In addition, SCSt comes with a lot of opinionated choices you can rely upon. SCSt Kafka binder is built upon the foundations of Spring Kafka although it doesn't directly use KafkaListener
internally.
main
sourceset: https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/RoutingFunctionTests.java#L72;L77
StreamBridge
to send outbound messages. It is relatively a new feature. But available on the recent Horsham
SR releases. The idea is that you don't directly deal with channels, but the framework internally handles that for you. You interact with this in terms of a binding destination. OutputDestinaion
can be used to test that scenario. See these tests: https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java
KafkaListener
).
--spring.cloud.stream.bindings.output.destination=someTopic
yet the messages are published to the topic rabbitSupplier-out-0
, did I miss something in the implementation?, is there a way of saying that the FluxMessageChannel is the default output and topic bindings apply to it?
Hi @olegz ,
I have created a simple Spring cloud stream apps(source, processor and sink) where the processor and sink are transferring reactive data.
https://github.com/sumit1912/springcloud.stream.pipeline
I deploy these apps in Spring Cloud Data Flow with RabbitMQ as broker. My apps work fine and transfer data when I use, springBootVersion = "2.1.9.RELEASE" and org.springframework.cloud:spring-cloud-stream-reactive:2.2.0.RELEASE dependency.
When I update Spring boot to v2.2.6.RELEASE and replace spring-cloud-stream-reactive by "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR3" and "org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR3" I got below error:
ERROR 5206 — [ wer.http.wer-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binding.StreamListenerMessageHandler@39ce27f2]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `reactor.core.publisher.Flux` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (byte[])...
The error appears only after I replace spring-cloud-stream-reactive by spring-cloud-dependencies and spring-cloud-stream-dependencies and update Spring boot to 2.2.x
Is it because of the fact that spring-cloud-stream-reactive module is deprecated in spring-cloud-dependencies:Hoxton.SR3 and we need to use spring-cloud-function (Supplier/Function/Consumer model) for handling reactive messages?
new String(payload).event.type matches 'MyEvent-Type'.
This works fine in local Kafka, even in embedded kafka(integration-test). But when deployed to other environment for the same message i get Cannot find a @StreamListener matching for message".