by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 17:23

    olegz on master

    Add RSocket snapshot repo (compare)

  • 14:33
    garyrussell commented #2012
  • 14:17

    olegz on master

    Update reactor to the latest sn… (compare)

  • Aug 04 18:10

    olegz on 3.0.x

    Add test to validate GH-573 fro… Update s-c-function version (compare)

  • Aug 04 17:58

    olegz on master

    Add test to validate GH-573 fro… (compare)

  • Aug 04 15:50
    olegz commented #2009
  • Aug 04 15:08
    olegz commented #2010
  • Aug 04 15:02

    olegz on 3.0.x

    Changed versions back to BUILD-… (compare)

  • Aug 04 14:41

    olegz on 3.0.x

    GH-2011 Remove log override fro… (compare)

  • Aug 04 14:40

    olegz on master

    GH-2011 Remove log override fro… (compare)

  • Aug 04 14:38
    olegz commented #2011
  • Jul 31 17:57

    spring-buildmaster on 3.0.x

    Update SNAPSHOT to 3.0.7.RELEASE Going back to snapshots Bumping versions to 3.0.8.SNAPS… (compare)

  • Jul 31 17:55

    spring-buildmaster on v3.0.7.RELEASE

    Update SNAPSHOT to 3.0.7.RELEASE (compare)

  • Jul 31 17:14

    olegz on 3.0.x

    Update docs POM for publishing (compare)

  • Jul 31 14:22
    smonasco commented #1990
  • Jul 30 14:22
    Mrc0113 commented #2009
  • Jul 28 14:48
    sabbyanandan commented #2008
  • Jul 24 14:15

    olegz on 3.0.x

    Change to functions 3.0.9.BUILD… (compare)

  • Jul 24 12:57

    olegz on 3.0.x

    Add KafkaNull test (compare)

  • Jul 24 12:56

    olegz on master

    Add KafkaNull test (compare)

Davide
@jesty
@olegz I found my problem, not all consumer/producer are reactive, so I have some @EnableBinding annotation in the same project and the function are not loaded because of this if if (ObjectUtils.isEmpty(applicationContext.getBeanNamesForAnnotation(EnableBinding.class))) {. I was trying running a test suite and I miss that log "Functional binding is disabled due to the presense of @EnableBinding annotation in your configuration" :(
Phillip Fleischer
@pcfleischer
+1 i had that same issue cause i was playing around with different bindings
Oleg Zhurakousky
@olegz
@jesty yes, EnableBinding and functional are mutually exclusive. But what I am also hearing is that you have a lot of messaging components in a single microservice. That is kind of an anti-pattern, so consider breaking it apart. But regardless, within a single microservice you can either use functional (preferred) approach or annotation-driven which is all but deprecated.
Phillip Fleischer
@pcfleischer
yeah, in my case it wasn’t a conscious decision rather a code organizational issue in an effore to demonstrate mutltiple functions without several applications
Oleg Zhurakousky
@olegz
fair enough but multiple functions is one thing but mixing functions and @StreamListeners is different, but it appears to be clear now
@pcfleischer ^^
sampathmende
@sampathmende
Hello how to integrate rabbitmq with eureka service registry using spring boot microservies
i am new to rabbitmq tool and want to integrate it with eureka server and zuul gateway
Mansingh Shitole
@msshitole
@sobychacko and @olegz I am back again with the same old issue. Let me refresh you regarding the issue, the spring-cloud-stream application is not running in the docker. This issue/error is persisting from Hoxton.SR2, SR3 and in the current SR4 version as well. The same application runs fine with the Hoxton.SR1 and earlier versions.
And the root cause of the error is, the application is failing to read application.properties and this happens only when spring-boot-starter-actuator dependency is present in the pom. And in all our production microservices, unfortunately, actuator dependency is needed. Hence this issue is a showstopper now.
As suggested I have raised this issue (spring-projects/spring-boot#21441) with the Spring Boot team. I am hoping a positive response from them but the same it would be great if we together solve this issue or bring to some logical conclusion on the same. Your help will be highly appreciated.
Here is my sample application: (https://github.com/msshitole/reactive-avro-processor ). Thank you in advance!
20 replies
edwardsainsbury
@edwardsainsbury
Hi, I'm trying to use the functional approach with reactive Kafka using Flux, is there a nice way to get the message key? I can't find any examples or documentation, and have resorted to getting the key header from the Message object. Just wondering if there was nicer way to do it a la the Kafka Stream implementation
Jose A. Iñigo
@codependent
Hi @sobychacko When will Spring Cloud Stream be compatible with Spring Boot 2.3.0/KafkaStreams 2.5? At the moment there's a runtime exception after upgrading:
java.lang.NoSuchMethodError: 'boolean org.apache.kafka.streams.KafkaStreams$State.isRunning()'
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderHealthIndicator.doHealthCheck(KafkaStreamsBinderHealthIndicator.java:111) ~[spring-cloud-stream-binder-kafka-streams-3.0.4.RELEASE.jar!/:3.0.4.RELEASE]
8 replies
I've set kafka.version 2.3.0 but get another error during Spring Boot Kafka Autoconfiguration
Juan Antonio Hernando Labajo
@labajo

Hi, I'm using Spring Cloud Functions and I'm not being able to configure the DLQ Topics.

I have the following configuration:

spring:
  data:
    mongodb:
      uri: ${MONGO_URI:mongodb://127.0.0.1/plaform}
  cloud:
    stream:       
      function.definition: statusConsumer;
      bindings:
        statusConsumer-in-0:
          consumer:
            max-attempts: ${KAFKA_CONSUMER_MAX_ATTEMPS:3}
            backOffInitialInterval: ${KAFKA_CONSUMER_BACKOFFINITIALINTERVAL:7000}
            backOffMaxInterval: ${KAFKA_CONSUMER_BACKOFFMAXINTERVAL:600000}
            backOffMultiplier: ${KAFKA_CONSUMER_BACKOFFMULTIPLIER:6.0}
          group: com.platform.ServiceStatusConsumer
          destination: ${SERVICESTATUS_QUEUE:platform-admin-provision-status}
          binder: kafka
      kafka:
        bindings:
          statusConsumer-in-0:
            consumer:
              autoCommitOffset: true
              enableDlq: true
              dlqName: ${spring.cloud.stream.bindings.statusConsumer-in-0.destination}-dlq-admin

With this config, eclipse shows me an error in ```kafka.bindings.statusConsumer-in-0.consumer´´´
Eclipse says that consumer is an Object and autoCommitOffset and the other properties are unknown because the consumer property has an object type.

This is the detail of consumer property:

spring.cloud.stream.kafka.bindings.internalConsumer-in-0.consumer
Object

KafkaConsumerProperties org.springframework.cloud.stream.binder.kafka.properties.KafkaBindingProperties.consumer
org.springframework.cloud.stream.binder.kafka.properties.KafkaBindingProperties

Consumer specific binding properties.

I don't know what is happening. Is a yml configuration problem with Spring cloud functions or do I have a dependency problem in the POM? Thanks

pradoshtnair
@pradoshtnair

Sometimes the first message of a new partition poll is not getting read.Can you give any pointers?

@Bean
    public Consumer<Flux<Message<String>>> receiver() {
        return (sink -> {
            sink
                    .onBackpressureBuffer()
                    .doOnEach((signal) -> AckHandler.register(Objects.requireNonNull(signal.get())))
                    .subscribe(
                            (r) -> MessageHandler.process(r), (e) -> {
                                log.fatal(FineemContextHandler.LOGFORMAT, () -> FinEEMException.getDescription(FinEEMException.UNKNOWN_ERROR), () -> MessageStates.ACTIONEXECUTIONERROR, () -> DataProvider.printError(e));
                            });

        });
    }

Application.yml

 spring:

  cloud:

    stream:

         kafka:

            default:

               consumer:

                  autoCommitOffset: false

                  concurrency: 1

            bindings:

                 receiver-in-0:

                      consumer:

                         autoCommitOffset: false

            binder:

                  brokers: host:9092

                  autoAddPartitions: true

                  minPartitionCount: 10

                  auto-offset-reset: earliest          


         bindings:

            receiver-in-0:

               binder: kafka

               destination: BUSINESSEVENT,BUSINESSEVENT1

               content-type: text/plain;charset=UTF-8

               group: input-group-1



            emitter-out-0:

               binder: kafka

               producer:

                  partition-count:  10

                  partition-key-extractor-name: EmitterPartitionKey

            erroremitter-out-0:

               binder: kafka

               destination: error

            error:

               binder: kafka

               destination: error



 spring.cloud.stream.function.definition: receiver;emitter;erroremitter

 spring.cloud.stream.function.routing.enabled: true
Andras Hatvani
@andrashatvani
Is there a way to use spring-cloud-stream-kafka-binder settings defined in application.ymlin TopologyTestDriver when testing kafka streams?
6 replies
Jeremy Bruns
@j3rrey
hey guys our configuration for num threads in kafka stream bindings seems to not be working
any hints ?
maybe the wrong place ?
Soby Chacko
@sobychacko
@j3rrey That should work, wonder what's going on. You can also set the concurrency property at the individual binding level.
For e.g. spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.concurrency. This property gets internally mapped to num.stream.threads.
Jeremy Bruns
@j3rrey
I'll try that thx! @sobychacko
Jeremy Bruns
@j3rrey
putting concurrency to default seems to be working cloud:
      default:
        consumer:
          concurrency: 3
        configuration:
          spring.json.use.type.headers: false
      function:
        definition: globalOffers;globalRecovery;localOffers;localRecovery
Jeremy Bruns
@j3rrey
yup all working fine thx
thanks again @sobychacko <3
Rakesh
@ravening
Hello all, I may not be aware of it but can you please clarify my question? Does spring cloud stream support reactive kafka streams of project reactor?
alyxl
@alyxl
Hello, I stumbled upon a bug(?). I'm using Spring Boot 2.2.6 with spring-cloud-starter-stream-kafka 3.0.4.RELEASE for my project. To handle errors from the producer side I added spring.cloud.stream.bindings.<channelName>.producer.errorChannelEnabled: truewhich is working as intended. Now I tried to update my project to spring-boot 2.2.7 and no ErrorMessages are send to the errorChannel anymore. Spring Boot 2.2.7 upgraded the Spring Kafka dependency to 2.3.8. Trying to figure out the problem I saw that when an error occurs from the producer with spring-kafka 2.3.8 the method call "doSend" in the GenericMessagingTemplate.class is not called anymore. Downgrading back to spring-kafka 2.3.7 resolves the issue. Any ideas about that? Will this be fixed in future releases? Has something major changed between spring-kafka 2.3.7 and 2.3.8 ?
Andras Hatvani
@andrashatvani
How can a spring cloud stream function be scheduled ie only executed when the time has come and not always on new message arrival?
natraj09
@natraj09
Is there any circuit breaker pattern supported with latest version of spring cloud stream. For e.g If we want to stop listening to messages and write to a database . I saw a solution here https://stackoverflow.com/questions/39598943/stop-spring-cloud-stream-streamlistener-from-listening-when-target-system-is-do , this will stop the entire application.
1 reply
Juan Antonio Hernando Labajo
@labajo

Hi, I have te following error while consuming a Message from a Kafka Queue using stream functions:

2020-05-21 14:22:39.660 ERROR 19052 --- [container-0-C-1] onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@3da3dc99}

org.springframework.transaction.reactive.TransactionContextManager$NoTransactionInContextException: No transaction in context

The problem is presented when I use Spring cloud stream functions with mongo. The code to reproduce is the following:

Repository

@Repository
public interface UsersRepo extends ReactiveMongoRepository<User, String> {
}

Consumer

 @Bean
  public Function<Flux<Message<String>>, Mono<Void>> toUpperCase() {
    return messageFlux -> { return messageFlux
        .map( msg -> {
          String msgStr = msg.getPayload();
          return new User().setId("1231").setName(msgStr);
        })
        .flatMap(usersRepo::save)
        .then();
    };
  }

If I replace usersRepo::save by a Mono.just(something) the error is not presented. Please, could you help me, what I'm doing wrong? Thanks

Juan Antonio Hernando Labajo
@labajo
Sorry I forgot I'm using:
<spring-cloud.version>Hoxton.SR3</spring-cloud.version>
<spring-data-mongo>2.2.6.RELEASE</spring-data-mongo>
khurramharoon
@khurramharoon
I would like to use Spring cloud stream for our upcoming project. We plan to use AWS SQS and SNS. However, it seems there are no binders available (https://spring.io/projects/spring-cloud-stream). Perhaps there is some alternative that I am not aware of at the moment. On Github there is some traction but that is not production ready(e.g. https://github.com/maciejwalkowiak/spring-cloud-stream-binder-sqs) Please advise.
sampathmende
@sampathmende
Hi when i publish message to multiple instances of consumers by using instace index and instance count and at publisher side used partitionKeyExpression and partition count 2 . But message is not consumed by consumer .
pigeon256
@pigeon256
if my cloud stream consumer is dependent on an external API and say it goes down for 20 minutes. What is the best way to handle such scenario automatically
itsatrip
@itsatrip
how would you use the kinesis binder using functional approach to produce batches and also to consume in batches?
Knut Schleßelmann
@kschlesselmann

What could be the cause for Dispatcher has no subscribers for channel 'classification-group-provider-1.productMapping-in-0'.? My function looks like

@Configuration
class ProductMapper(
        private val mappingService: MappingService
) {

    @Bean
    fun mapClassificationGroup(): Function<Flux<Product>, Flux<Message<ProductOfSource>>> = Function { products ->
        products
                .concatMap { handleProduct(it) }
                .onErrorStop()
    }
}

and my configuration like

spring:
  cloud:
    function:
      definition: mapClassificationGroup
    stream:
      bindings:
        mapClassificationGroup-in-0:
          destination: product-updates
          group: ${spring.application.name}
        mapClassificationGroup-out-0:
          destination: products-of-source
          producer:
            partition-key-expression: headers['productId']
            partition-count: 32
      kafka:
        binder:
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer

to me it looks like all my other services and I cannot spot the error :-S

Oleg Zhurakousky
@olegz
i am a little rusty on Kotlin but does your above function return a value?
Knut Schleßelmann
@kschlesselmann
@olegz yep. the products.concatMap
Oleg Zhurakousky
@olegz
i guess it does, but onErrorStop() worries me
Knut Schleßelmann
@kschlesselmann
@olegz That's because I use Spring Data Mongo which optherwise bubbles up a lot of missinsg transacition exceptions (we talked about that a couple of weeks ago)
Oleg Zhurakousky
@olegz
ohh yeah
Knut Schleßelmann
@kschlesselmann
Oleg Zhurakousky
@olegz
sorry, as you can feel I am in the middle of some other weeds, i'll think about it later
Knut Schleßelmann
@kschlesselmann
Thanks … it has to be a really stupid error … but I can't spot it. I have 3 almost identical services here … maybe it's too late for me too today :-/
Jakub Kubryński
@jkubrynski
Hi! I'm trying to use the new approach for testing (with the test-binder) and I have no idea how to connect it with my production code? I have my component that sends the event and I want to test if the event was sent successfully. I can use the OutputDestination to receive the message, however, the InputDestination is not related to the MessageChannel object so it's not possible to inject it into the production component. Any hints?
Soby Chacko
@sobychacko
@jkubrynski When you have the new binder on the classpath, it creates both InputDestination and OutputDestination based on the bindings in your application. You can access them from the context.
Jakub Kubryński
@jkubrynski
But the InputDestination is totally not related to any classes you use in the production code
Soby Chacko
@sobychacko
Sorry, I may not be following the context. Are you trying to test a single processor with input and output?
Jakub Kubryński
@jkubrynski
it doesn't matter. In my production code, I'm attaching to MessageChannel - that's the main stream object I use to invoke send method. Then I use for example MessageCollector to check if the proper message was send. InputDestination and OutputDestination are useless, as I cannot link them with my production code.
Soby Chacko
@sobychacko
See if any of these tests are useful for what you are trying to test.
For the most part, they all use the new test binder.