by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jul 31 17:57

    spring-buildmaster on 3.0.x

    Update SNAPSHOT to 3.0.7.RELEASE Going back to snapshots Bumping versions to 3.0.8.SNAPS… (compare)

  • Jul 31 17:55

    spring-buildmaster on v3.0.7.RELEASE

    Update SNAPSHOT to 3.0.7.RELEASE (compare)

  • Jul 31 17:14

    olegz on 3.0.x

    Update docs POM for publishing (compare)

  • Jul 31 14:22
    smonasco commented #1990
  • Jul 30 14:22
    Mrc0113 commented #2009
  • Jul 28 14:48
    sabbyanandan commented #2008
  • Jul 24 14:15

    olegz on 3.0.x

    Change to functions 3.0.9.BUILD… (compare)

  • Jul 24 12:57

    olegz on 3.0.x

    Add KafkaNull test (compare)

  • Jul 24 12:56

    olegz on master

    Add KafkaNull test (compare)

  • Jul 24 08:24

    olegz on 3.0.x

    Fix StreamBridge to honor desti… (compare)

  • Jul 24 08:21

    olegz on master

    Fix StreamBridge to honor desti… (compare)

  • Jul 24 07:53

    olegz on master

    Temporary StreamBridge fix for … (compare)

  • Jul 20 15:05

    spring-buildmaster on master

    Update SNAPSHOT to 3.1.0-M2 Going back to snapshots (compare)

  • Jul 20 15:03

    spring-buildmaster on v3.1.0-M2

    Update SNAPSHOT to 3.1.0-M2 (compare)

  • Jul 20 06:21

    olegz on 3.0.x

    GH-2006 Add test for KafkaNull … (compare)

  • Jul 20 06:19

    olegz on master

    GH-2006 Add test for KafkaNull … (compare)

  • Jul 19 16:44

    olegz on 3.0.x

    GH-2006 Add special handling of… (compare)

  • Jul 19 16:38

    olegz on master

    GH-2006 Add special handling of… (compare)

  • Jul 16 15:23
    garyrussell commented #2006
  • Jul 15 15:46
    garyrussell commented #2006
Jeremy Bruns
@j3rrey
I'll try that thx! @sobychacko
Jeremy Bruns
@j3rrey
putting concurrency to default seems to be working cloud:
      default:
        consumer:
          concurrency: 3
        configuration:
          spring.json.use.type.headers: false
      function:
        definition: globalOffers;globalRecovery;localOffers;localRecovery
Jeremy Bruns
@j3rrey
yup all working fine thx
thanks again @sobychacko <3
Rakesh
@ravening
Hello all, I may not be aware of it but can you please clarify my question? Does spring cloud stream support reactive kafka streams of project reactor?
alyxl
@alyxl
Hello, I stumbled upon a bug(?). I'm using Spring Boot 2.2.6 with spring-cloud-starter-stream-kafka 3.0.4.RELEASE for my project. To handle errors from the producer side I added spring.cloud.stream.bindings.<channelName>.producer.errorChannelEnabled: truewhich is working as intended. Now I tried to update my project to spring-boot 2.2.7 and no ErrorMessages are send to the errorChannel anymore. Spring Boot 2.2.7 upgraded the Spring Kafka dependency to 2.3.8. Trying to figure out the problem I saw that when an error occurs from the producer with spring-kafka 2.3.8 the method call "doSend" in the GenericMessagingTemplate.class is not called anymore. Downgrading back to spring-kafka 2.3.7 resolves the issue. Any ideas about that? Will this be fixed in future releases? Has something major changed between spring-kafka 2.3.7 and 2.3.8 ?
Andras Hatvani
@andrashatvani
How can a spring cloud stream function be scheduled ie only executed when the time has come and not always on new message arrival?
natraj09
@natraj09
Is there any circuit breaker pattern supported with latest version of spring cloud stream. For e.g If we want to stop listening to messages and write to a database . I saw a solution here https://stackoverflow.com/questions/39598943/stop-spring-cloud-stream-streamlistener-from-listening-when-target-system-is-do , this will stop the entire application.
1 reply
Juan Antonio Hernando Labajo
@labajo

Hi, I have te following error while consuming a Message from a Kafka Queue using stream functions:

2020-05-21 14:22:39.660 ERROR 19052 --- [container-0-C-1] onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@3da3dc99}

org.springframework.transaction.reactive.TransactionContextManager$NoTransactionInContextException: No transaction in context

The problem is presented when I use Spring cloud stream functions with mongo. The code to reproduce is the following:

Repository

@Repository
public interface UsersRepo extends ReactiveMongoRepository<User, String> {
}

Consumer

 @Bean
  public Function<Flux<Message<String>>, Mono<Void>> toUpperCase() {
    return messageFlux -> { return messageFlux
        .map( msg -> {
          String msgStr = msg.getPayload();
          return new User().setId("1231").setName(msgStr);
        })
        .flatMap(usersRepo::save)
        .then();
    };
  }

If I replace usersRepo::save by a Mono.just(something) the error is not presented. Please, could you help me, what I'm doing wrong? Thanks

Juan Antonio Hernando Labajo
@labajo
Sorry I forgot I'm using:
<spring-cloud.version>Hoxton.SR3</spring-cloud.version>
<spring-data-mongo>2.2.6.RELEASE</spring-data-mongo>
khurramharoon
@khurramharoon
I would like to use Spring cloud stream for our upcoming project. We plan to use AWS SQS and SNS. However, it seems there are no binders available (https://spring.io/projects/spring-cloud-stream). Perhaps there is some alternative that I am not aware of at the moment. On Github there is some traction but that is not production ready(e.g. https://github.com/maciejwalkowiak/spring-cloud-stream-binder-sqs) Please advise.
sampathmende
@sampathmende
Hi when i publish message to multiple instances of consumers by using instace index and instance count and at publisher side used partitionKeyExpression and partition count 2 . But message is not consumed by consumer .
pigeon256
@pigeon256
if my cloud stream consumer is dependent on an external API and say it goes down for 20 minutes. What is the best way to handle such scenario automatically
itsatrip
@itsatrip
how would you use the kinesis binder using functional approach to produce batches and also to consume in batches?
Knut Schleßelmann
@kschlesselmann

What could be the cause for Dispatcher has no subscribers for channel 'classification-group-provider-1.productMapping-in-0'.? My function looks like

@Configuration
class ProductMapper(
        private val mappingService: MappingService
) {

    @Bean
    fun mapClassificationGroup(): Function<Flux<Product>, Flux<Message<ProductOfSource>>> = Function { products ->
        products
                .concatMap { handleProduct(it) }
                .onErrorStop()
    }
}

and my configuration like

spring:
  cloud:
    function:
      definition: mapClassificationGroup
    stream:
      bindings:
        mapClassificationGroup-in-0:
          destination: product-updates
          group: ${spring.application.name}
        mapClassificationGroup-out-0:
          destination: products-of-source
          producer:
            partition-key-expression: headers['productId']
            partition-count: 32
      kafka:
        binder:
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer

to me it looks like all my other services and I cannot spot the error :-S

Oleg Zhurakousky
@olegz
i am a little rusty on Kotlin but does your above function return a value?
Knut Schleßelmann
@kschlesselmann
@olegz yep. the products.concatMap
Oleg Zhurakousky
@olegz
i guess it does, but onErrorStop() worries me
Knut Schleßelmann
@kschlesselmann
@olegz That's because I use Spring Data Mongo which optherwise bubbles up a lot of missinsg transacition exceptions (we talked about that a couple of weeks ago)
Oleg Zhurakousky
@olegz
ohh yeah
Knut Schleßelmann
@kschlesselmann
Oleg Zhurakousky
@olegz
sorry, as you can feel I am in the middle of some other weeds, i'll think about it later
Knut Schleßelmann
@kschlesselmann
Thanks … it has to be a really stupid error … but I can't spot it. I have 3 almost identical services here … maybe it's too late for me too today :-/
Jakub Kubryński
@jkubrynski
Hi! I'm trying to use the new approach for testing (with the test-binder) and I have no idea how to connect it with my production code? I have my component that sends the event and I want to test if the event was sent successfully. I can use the OutputDestination to receive the message, however, the InputDestination is not related to the MessageChannel object so it's not possible to inject it into the production component. Any hints?
Soby Chacko
@sobychacko
@jkubrynski When you have the new binder on the classpath, it creates both InputDestination and OutputDestination based on the bindings in your application. You can access them from the context.
Jakub Kubryński
@jkubrynski
But the InputDestination is totally not related to any classes you use in the production code
Soby Chacko
@sobychacko
Sorry, I may not be following the context. Are you trying to test a single processor with input and output?
Jakub Kubryński
@jkubrynski
it doesn't matter. In my production code, I'm attaching to MessageChannel - that's the main stream object I use to invoke send method. Then I use for example MessageCollector to check if the proper message was send. InputDestination and OutputDestination are useless, as I cannot link them with my production code.
Soby Chacko
@sobychacko
See if any of these tests are useful for what you are trying to test.
For the most part, they all use the new test binder.
Jakub Kubryński
@jkubrynski
in those tests message is send from the test scope. Normally, the message is built and sent from the production code - the production code is the place where you construct the message, set valid headers, etc.
Soby Chacko
@sobychacko
Maybe you can provide some code examples of what you are trying to do? That way, we can investigate if there is anything lacking from the framework with the situation that you are seeing. /cc @olegz
Jakub Kubryński
@jkubrynski
It's not the whole codebase, but let's assume that in my production code I have something like this: https://github.com/jkubrynski/cloud-stream-sample/blob/master/src/main/java/com/kubrynski/streamdemo/permission/PermissionService.java
then in the test I want to check if after invoking revokeAccess method, the proper message was sent to the proper destination. I don't mean testing the business logic - just the integration. If the proper destination, message headers and a valid payload were set.
with the old test-support I use messageCollector.forChannel(myChannel.myDestination()).poll(1, TimeUnit.SECONDS);
then on the retrieved message I can verify the whole structure. The question is how similar approach is possible with the new test-binder
if the above is not clear let me know -> I'll code the whole approach in the separate project
Knut Schleßelmann
@kschlesselmann
@olegz My error resolved itself … don't ask me why …
Knut Schleßelmann
@kschlesselmann
OK … now it's back. Let me have a look into our AWS setup here … could it be some network issue? If so could I get some better logs at some place?
Knut Schleßelmann
@kschlesselmann
@olegz Looks like that the infrastructure is fine … maybe it's really some weird behavior in our service
sakella1986
@sakella1986
@sobychacko basic question on consuming messages from Kafka. How do I choose between KafkaListener, StreamListerner, Function etc. I may optionally write to a retry topic but I can also live with same topic by not committing the offset for failed messages. We use Kerberos Auth and Kafka Broker version 0.10.2 and have no plans on upgrading to 0.11 atleast for the next 1 year.
Soby Chacko
@sobychacko
@sakella1986 That is a very old Kafka version and it won't work with the latest versions of the Kafka binder in Spring Cloud Stream (You will likely see some Kafka wire protocol compatibility issues if you combine 0.10.2 and latest SCSt versions using later Kafka clients). We used to provide a separate 0.11 binder when the default version supported was 0.10.2/0.10.1. Those binders are no longer supported.
With that said, KafkaListener is part of the Spring Kafka library and you don't need Spring Cloud Stream to use that. You use StreamListener or functional support from Spring Cloud Stream when you need the higher-level abstractions and programming model provided. In addition, SCSt comes with a lot of opinionated choices you can rely upon. SCSt Kafka binder is built upon the foundations of Spring Kafka although it doesn't directly use KafkaListener internally.
1 reply
Soby Chacko
@sobychacko
@jkubrynski Are you accessing that service using StreamListener? or is that getting wrapped inside a Supplier function?
Jakub Kubryński
@jkubrynski
Currently I'm taking about testing the producer side - so the send method on the MessageChannel. For the client side it depends but in the recent usecase I use subscribe on the channel.
But you can focus on just the tests you've shared with me. Normally the selected lines would be in the production code. How can you move them from this test to main sourceset: https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/RoutingFunctionTests.java#L72;L77
Soby Chacko
@sobychacko
ya, i am sorry, i didn't mean StreamListener there but was thinking about functional vs non-functional on the producer side. It sounds like you are directly calling the send method.
sakella1986
@sakella1986
@sobychacko so I better use KafkaListener not SCSt.
Soby Chacko
@sobychacko
@jkubrynski you can also try to use StreamBridge to send outbound messages. It is relatively a new feature. But available on the recent Horsham SR releases. The idea is that you don't directly deal with channels, but the framework internally handles that for you. You interact with this in terms of a binding destination. OutputDestinaion can be used to test that scenario. See these tests: https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java
Jakub Kubryński
@jkubrynski
Looks good. Thanks @sobychacko