by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 11:59

    olegz on 3.0.x

    interim GH-2014 Add support for NewDest… (compare)

  • 11:59

    olegz on master

    GH-2014 Add support for NewDest… (compare)

  • 10:46
    olegz commented #1990
  • Sep 18 08:12
    kaimeng110091 commented #2018
  • Sep 18 05:30
    kaimeng110091 commented #2018
  • Sep 18 00:48
    guycall commented #1946
  • Sep 17 15:15

    sobychacko on 3.0.x

    Update version to 3.0.9.BUILD-S… (compare)

  • Sep 17 14:34
    garyrussell commented #2018
  • Sep 15 15:36
    sabbyanandan commented #1962
  • Sep 15 15:01
    sabbyanandan commented #2015
  • Sep 15 15:00
    sabbyanandan commented #2015
  • Sep 15 14:48
    sabbyanandan commented #2017
  • Sep 15 14:48

    sabbyanandan on master

    Fix typo in Event Routing secti… (compare)

  • Sep 15 14:48
    sabbyanandan closed #2017
  • Sep 15 14:47
    sabbyanandan assigned #2017
  • Sep 14 05:26
    guycall commented #1946
  • Sep 10 04:39
    pivotal-issuemaster commented #2017
  • Sep 10 04:39
    guycall opened #2017
  • Sep 03 10:06
    Bryce-huang commented #1967
  • Sep 03 07:00
    kschlesselmann commented #2015
Oleg Zhurakousky
@olegz
if you are referring to the old spring-cloud-stream-reactive module it is no longer supported and in fact was removed from newer distributions over a year ago, since we provide reactive support natively through spring-cloud-function programming model which is at the moment the main programming model in spring-cloud-stream
Davide
@jesty
@olegz From spring-cloud-stream 2.1.4.RELEASE to 3.0.4.RELEASE and from Project Reactor 3.2.15.RELEASE to 3.3.5.RELEASE
Oleg Zhurakousky
@olegz
I am not sure what you are saying?
I am talking about https://github.com/spring-cloud/spring-cloud-stream/tree/2.1.x/spring-cloud-stream-reactive module which no longer exist, but I am assuming you are relying on it since you are using annotation-based approach for configuration
with functional approach and its native reactor support all your need is
@Bean
public Consumer<Flux<Message<A>> receive() {/*yourcode*/}
Oleg Zhurakousky
@olegz
in your code you either have to explicitly subscribe to the Flux or write your consumer as function Function<Flux<?>, Mono<Void>>consumer()
Mani Jindal
@manijndl
@olegz i want multiple binders only my configuration looks like this
##########Common Props

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN

################# Binders

spring.cloud.stream.binders.research.type=kafka
spring.cloud.stream.binders.research.environment.spring.cloud.stream.kafka.binder.brokers=kafka.servicebus.windows.net:9093
spring.cloud.stream.binders.research.environment.spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=

spring.cloud.stream.binders.research1.type=kafka
spring.cloud.stream.binders.research1.environment.spring.cloud.stream.kafka.binder.brokers=kafka1.servicebus.windows.net:9093
spring.cloud.stream.binders.research1.environment.spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=

##############INPUTS

spring.cloud.stream.kafka.bindings.input.binder=research
spring.cloud.stream.kafka.bindings.input.consumer.startOffset=latest
spring.cloud.stream.bindings.input.destination:invoice

spring.cloud.stream.kafka.bindings.input1.binder=research1
spring.cloud.stream.kafka.bindings.input1.consumer.startOffset=latest
spring.cloud.stream.bindings.input1.destination:warehouse

#

i am getting "A default binder has been requested, but there is more than one binder available" this exception i am missing something?

Soby Chacko
@sobychacko
@manijndl Your configuration looks right to me. Can't tell why you are getting that exception without knowing more details about the application.
Mani Jindal
@manijndl
only two classes @sobychacko
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel INPUT();
String INPUT1 = "input1";
@Input(Sink.INPUT1)
SubscribableChannel INPUT1();
}
Soby Chacko
@sobychacko
Did you look at this example?
Mani Jindal
@manijndl

@EnableBinding(Sink.class)
public class ConsumerApplication {

@StreamListener(Sink.INPUT)
public void handle(String str) {

    System.out.println(" invoice :  " + str);
}

@StreamListener(Sink.INPUT1)
public void handle1(String str) {

    System.out.println(" invoice :  " + str);
}

}

Soby Chacko
@sobychacko
what are your maven dependencies for the binder and what versions?
Mani Jindal
@manijndl
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.6.RELEASE</version>
</parent>

<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>

<properties>
    <java.version>1.8</java.version>
    <spring-cloud-stream.version>Fishtown.RELEASE</spring-cloud-stream.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>3.0.0.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        <version>3.0.0.RELEASE</version>
    </dependency>
</dependencies>
</project>
@sobychacko my pom file
Davide
@jesty
sorry to be not so clear, but I think you understood @olegz. Now I fixed another issue and my integration tests are running, but the function is not invoked.
The properties should be:
spring.cloud.stream.function.definition=receive
spring.cloud.stream.bindings.receive-in-0.destination=myevent
spring.cloud.stream.bindings.receive-in-0.contentType=application/json
(where receive is the bean name)
Soby Chacko
@sobychacko

@manijndl You have small configuration error.

spring.cloud.stream.kafka.bindings.input.binder=research
spring.cloud.stream.kafka.bindings.input1.binder=research1

This needs to be changed to:

spring.cloud.stream.bindings.input.binder=research
spring.cloud.stream.bindings.input1.binder=research1

Note that the binder property needs to be set on the main bindings, not at the Kafka bindings.

Mani Jindal
@manijndl
sorry @sobychacko for this silly mistake now works fine.
1 reply
Thanks alot.
Mani Jindal
@manijndl
just have one more question @sobychacko actually our services are built on spring cloud stream and deployed in two different counties. first country need input fro m from topic1 and second country need input from topic1 and topic2 is there to write a generic consumer like a common code that would work for both countries some condition we can pass to the #Streamlistener based on country?
7 replies
Mani Jindal
@manijndl

Hi i have one question which SubscribableChannel is created by this code snipped like DirectChannel, Executor Channel or something else. can we add or remove SubscribableChannels at runtime or through application.properties file?

Code Snippet: -
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel INPUT();
}

Marcello Teodori
@mteodori

in your code you either have to explicitly subscribe to the Flux or write your consumer as function Function<Flux<?>, Mono<Void>>consumer()

Thanks @olegz the issue is in the Activiti Cloud code which we are upgrading now to Spring Cloud Stream 3 and yes we were using the old module we have just removed, so can you confirm that reactive works only with the function based API and not the annotation based one? we haven't started refactoring the code to move to the new API and were hoping we could do that in stages before the annotation based API is deprecated/removed

Davide
@jesty
@olegz I found my problem, not all consumer/producer are reactive, so I have some @EnableBinding annotation in the same project and the function are not loaded because of this if if (ObjectUtils.isEmpty(applicationContext.getBeanNamesForAnnotation(EnableBinding.class))) {. I was trying running a test suite and I miss that log "Functional binding is disabled due to the presense of @EnableBinding annotation in your configuration" :(
Phillip Fleischer
@pcfleischer
+1 i had that same issue cause i was playing around with different bindings
Oleg Zhurakousky
@olegz
@jesty yes, EnableBinding and functional are mutually exclusive. But what I am also hearing is that you have a lot of messaging components in a single microservice. That is kind of an anti-pattern, so consider breaking it apart. But regardless, within a single microservice you can either use functional (preferred) approach or annotation-driven which is all but deprecated.
Phillip Fleischer
@pcfleischer
yeah, in my case it wasn’t a conscious decision rather a code organizational issue in an effore to demonstrate mutltiple functions without several applications
Oleg Zhurakousky
@olegz
fair enough but multiple functions is one thing but mixing functions and @StreamListeners is different, but it appears to be clear now
@pcfleischer ^^
sampathmende
@sampathmende
Hello how to integrate rabbitmq with eureka service registry using spring boot microservies
i am new to rabbitmq tool and want to integrate it with eureka server and zuul gateway
Mansingh Shitole
@msshitole
@sobychacko and @olegz I am back again with the same old issue. Let me refresh you regarding the issue, the spring-cloud-stream application is not running in the docker. This issue/error is persisting from Hoxton.SR2, SR3 and in the current SR4 version as well. The same application runs fine with the Hoxton.SR1 and earlier versions.
And the root cause of the error is, the application is failing to read application.properties and this happens only when spring-boot-starter-actuator dependency is present in the pom. And in all our production microservices, unfortunately, actuator dependency is needed. Hence this issue is a showstopper now.
As suggested I have raised this issue (spring-projects/spring-boot#21441) with the Spring Boot team. I am hoping a positive response from them but the same it would be great if we together solve this issue or bring to some logical conclusion on the same. Your help will be highly appreciated.
Here is my sample application: (https://github.com/msshitole/reactive-avro-processor ). Thank you in advance!
20 replies
edwardsainsbury
@edwardsainsbury
Hi, I'm trying to use the functional approach with reactive Kafka using Flux, is there a nice way to get the message key? I can't find any examples or documentation, and have resorted to getting the key header from the Message object. Just wondering if there was nicer way to do it a la the Kafka Stream implementation
Jose A. Iñigo
@codependent
Hi @sobychacko When will Spring Cloud Stream be compatible with Spring Boot 2.3.0/KafkaStreams 2.5? At the moment there's a runtime exception after upgrading:
java.lang.NoSuchMethodError: 'boolean org.apache.kafka.streams.KafkaStreams$State.isRunning()'
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderHealthIndicator.doHealthCheck(KafkaStreamsBinderHealthIndicator.java:111) ~[spring-cloud-stream-binder-kafka-streams-3.0.4.RELEASE.jar!/:3.0.4.RELEASE]
8 replies
I've set kafka.version 2.3.0 but get another error during Spring Boot Kafka Autoconfiguration
Juan Antonio Hernando Labajo
@labajo

Hi, I'm using Spring Cloud Functions and I'm not being able to configure the DLQ Topics.

I have the following configuration:

spring:
  data:
    mongodb:
      uri: ${MONGO_URI:mongodb://127.0.0.1/plaform}
  cloud:
    stream:       
      function.definition: statusConsumer;
      bindings:
        statusConsumer-in-0:
          consumer:
            max-attempts: ${KAFKA_CONSUMER_MAX_ATTEMPS:3}
            backOffInitialInterval: ${KAFKA_CONSUMER_BACKOFFINITIALINTERVAL:7000}
            backOffMaxInterval: ${KAFKA_CONSUMER_BACKOFFMAXINTERVAL:600000}
            backOffMultiplier: ${KAFKA_CONSUMER_BACKOFFMULTIPLIER:6.0}
          group: com.platform.ServiceStatusConsumer
          destination: ${SERVICESTATUS_QUEUE:platform-admin-provision-status}
          binder: kafka
      kafka:
        bindings:
          statusConsumer-in-0:
            consumer:
              autoCommitOffset: true
              enableDlq: true
              dlqName: ${spring.cloud.stream.bindings.statusConsumer-in-0.destination}-dlq-admin

With this config, eclipse shows me an error in ```kafka.bindings.statusConsumer-in-0.consumer´´´
Eclipse says that consumer is an Object and autoCommitOffset and the other properties are unknown because the consumer property has an object type.

This is the detail of consumer property:

spring.cloud.stream.kafka.bindings.internalConsumer-in-0.consumer
Object

KafkaConsumerProperties org.springframework.cloud.stream.binder.kafka.properties.KafkaBindingProperties.consumer
org.springframework.cloud.stream.binder.kafka.properties.KafkaBindingProperties

Consumer specific binding properties.

I don't know what is happening. Is a yml configuration problem with Spring cloud functions or do I have a dependency problem in the POM? Thanks

pradoshtnair
@pradoshtnair

Sometimes the first message of a new partition poll is not getting read.Can you give any pointers?

@Bean
    public Consumer<Flux<Message<String>>> receiver() {
        return (sink -> {
            sink
                    .onBackpressureBuffer()
                    .doOnEach((signal) -> AckHandler.register(Objects.requireNonNull(signal.get())))
                    .subscribe(
                            (r) -> MessageHandler.process(r), (e) -> {
                                log.fatal(FineemContextHandler.LOGFORMAT, () -> FinEEMException.getDescription(FinEEMException.UNKNOWN_ERROR), () -> MessageStates.ACTIONEXECUTIONERROR, () -> DataProvider.printError(e));
                            });

        });
    }

Application.yml

 spring:

  cloud:

    stream:

         kafka:

            default:

               consumer:

                  autoCommitOffset: false

                  concurrency: 1

            bindings:

                 receiver-in-0:

                      consumer:

                         autoCommitOffset: false

            binder:

                  brokers: host:9092

                  autoAddPartitions: true

                  minPartitionCount: 10

                  auto-offset-reset: earliest          


         bindings:

            receiver-in-0:

               binder: kafka

               destination: BUSINESSEVENT,BUSINESSEVENT1

               content-type: text/plain;charset=UTF-8

               group: input-group-1



            emitter-out-0:

               binder: kafka

               producer:

                  partition-count:  10

                  partition-key-extractor-name: EmitterPartitionKey

            erroremitter-out-0:

               binder: kafka

               destination: error

            error:

               binder: kafka

               destination: error



 spring.cloud.stream.function.definition: receiver;emitter;erroremitter

 spring.cloud.stream.function.routing.enabled: true
Andras Hatvani
@andrashatvani
Is there a way to use spring-cloud-stream-kafka-binder settings defined in application.ymlin TopologyTestDriver when testing kafka streams?
6 replies
Jeremy Bruns
@j3rrey
hey guys our configuration for num threads in kafka stream bindings seems to not be working
any hints ?
maybe the wrong place ?
Soby Chacko
@sobychacko
@j3rrey That should work, wonder what's going on. You can also set the concurrency property at the individual binding level.
For e.g. spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.concurrency. This property gets internally mapped to num.stream.threads.
Jeremy Bruns
@j3rrey
I'll try that thx! @sobychacko
Jeremy Bruns
@j3rrey
putting concurrency to default seems to be working cloud:
      default:
        consumer:
          concurrency: 3
        configuration:
          spring.json.use.type.headers: false
      function:
        definition: globalOffers;globalRecovery;localOffers;localRecovery
Jeremy Bruns
@j3rrey
yup all working fine thx
thanks again @sobychacko <3
Rakesh
@ravening
Hello all, I may not be aware of it but can you please clarify my question? Does spring cloud stream support reactive kafka streams of project reactor?
alyxl
@alyxl
Hello, I stumbled upon a bug(?). I'm using Spring Boot 2.2.6 with spring-cloud-starter-stream-kafka 3.0.4.RELEASE for my project. To handle errors from the producer side I added spring.cloud.stream.bindings.<channelName>.producer.errorChannelEnabled: truewhich is working as intended. Now I tried to update my project to spring-boot 2.2.7 and no ErrorMessages are send to the errorChannel anymore. Spring Boot 2.2.7 upgraded the Spring Kafka dependency to 2.3.8. Trying to figure out the problem I saw that when an error occurs from the producer with spring-kafka 2.3.8 the method call "doSend" in the GenericMessagingTemplate.class is not called anymore. Downgrading back to spring-kafka 2.3.7 resolves the issue. Any ideas about that? Will this be fixed in future releases? Has something major changed between spring-kafka 2.3.7 and 2.3.8 ?