by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Aug 11 15:43
    olegz commented #1398
  • Aug 11 15:00
    reddy-s commented #1398
  • Aug 11 06:16
    tangdelong commented #2013
  • Aug 11 06:15
    tangdelong commented #2013
  • Aug 11 06:15
    tangdelong commented #2013
  • Aug 11 06:14
    tangdelong commented #2013
  • Aug 11 04:18
    sakthidurai commented #2013
  • Aug 07 12:34
    olegz commented #2013
  • Aug 07 12:34
    olegz commented #2013
  • Aug 06 17:23

    olegz on master

    Add RSocket snapshot repo (compare)

  • Aug 06 14:33
    garyrussell commented #2012
  • Aug 06 14:17

    olegz on master

    Update reactor to the latest sn… (compare)

  • Aug 04 18:10

    olegz on 3.0.x

    Add test to validate GH-573 fro… Update s-c-function version (compare)

  • Aug 04 17:58

    olegz on master

    Add test to validate GH-573 fro… (compare)

  • Aug 04 15:50
    olegz commented #2009
  • Aug 04 15:08
    olegz commented #2010
  • Aug 04 15:02

    olegz on 3.0.x

    Changed versions back to BUILD-… (compare)

  • Aug 04 14:41

    olegz on 3.0.x

    GH-2011 Remove log override fro… (compare)

  • Aug 04 14:40

    olegz on master

    GH-2011 Remove log override fro… (compare)

  • Aug 04 14:38
    olegz commented #2011
Soby Chacko
@sobychacko
@Nadavbi87 It's up and running now. There were some issues before.
Nadavbi87
@Nadavbi87
@sobychacko
Thanks!
sumit1912
@sumit1912

Hi team,

I have created 3 simple Spring cloud stream apps(Source/Processor/Sink) using Spring cloud function approach transferring Flux<String>.

Source-app:

@SpringBootApplication
public class SourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }

    @PollableBean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> {
            String v1 = String.valueOf("abc");
            String v2 = String.valueOf("pqr");
            String v3 = String.valueOf("xyz");
            return Flux.just(v1, v2, v3);
        };
    }
}

Processor-app:

@SpringBootApplication
public class ProcessorApplication {

    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase()).log();
    }

    public static void main(String[] args) {
        SpringApplication.run(ProcessorApplication.class, args);
    }
}

Sink-app:

@SpringBootApplication
public class SinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }

    @Bean
    public Consumer<Flux<String>> log() {
        return flux -> {
            flux.subscribe(f -> System.out.println("Received data: " + f));
        };
    }
}

The dependencies that I have added are:

SpringBoot version = 2.2.6.RELEASE

implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit")
implementation("org.springframework.cloud:spring-cloud-starter-function-webflux:3.0.7.RELEASE")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-webflux")

I have registered these these apps in Spring Cloud Data Flow and deployed in a stream.

I am able to transmit data to these apps and receive output both via HTTP and via RabbitMQ individually. But, the message is not communicated across the apps(Source->Processor->sink).
Am I missing any dependency, annotation or application property.
I am new to Spring cloud stream using function approach, any suggestion will be a great help :)

Complete code can be found here:https://github.com/sumit1912/springcloud.function.stream.pipeline

Soby Chacko
@sobychacko
@sumit1912 Didn't you ask and get an answer on SO regarding this?
Omkar Shetkar
@Omkar-Shetkar

Hi All,
Recently I started learning and using Spring Cloud Stream. I liked the way Spring Cloud Functions simplify message publishing and consumption. But in one of my application, Beans corresponding to Functions are not detected and hence not binded to destination.
When I compared this with my sample working version, difference is, in not-working version I have a module dependency which internally uses Spring Cloud Stream. This module uses @EnableBinding annotation.
In my working version If I put this annotation, I reproduce the same issue.
It seems there is a conflict between @EnableBinding and functional support.
Any hints or suggestions will be really helpful.
Thanks.


In unit test, when I try to receive the message using OutputDestination, I get following error:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34) at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:37) at com.example.loggingconsumer.LoggingConsumerApplicationTests.contextLoads(LoggingConsumerApplicationTests.java:50) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) ...

Omkar Shetkar
@Omkar-Shetkar
Not working version with @EnableBinding annotation. If I just remove this annotation it works!
@SpringBootApplication
@Controller
@EnableBinding
public class LoggingConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    @Bean
    public Consumer<CloudEventImpl<Person>> consumer() {
        return p -> {
            System.out.println("consumer--Received: " + p);

        };
    }
Soby Chacko
@sobychacko
@Omkar-Shetkar You cannot mix functional binding and the annotation approach in the regular message channel-based binders (Kafka Streams binder allows that in the current versions).
sumit1912
@sumit1912

@sumit1912 Didn't you ask and get an answer on SO regarding this?

Thanks @sobychacko for the reply. Answer given at SO worked like a charm.

Omkar Shetkar
@Omkar-Shetkar

@Omkar-Shetkar You cannot mix functional binding and the annotation approach in the regular message channel-based binders (Kafka Streams binder allows that in the current versions).

OK... @sobychacko Thanks for the clarification...

boonware
@boonware
Hi. Is it possible to modify a message consumed via a @StreamListener method before sending it to the DLQ? My use case it to attach an error description to the message before routing it
I've tried mutating the object parameter to the method, but that doesn't seem to have any effect
Gonzalo Borobio
@gborobio73

Hello! I'm new to spring cloud stream and kafka and I could use some help. From the spring cloud samples, test embedded kafka (https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/testing-samples/test-embedded-kafka) I'm trying to modify the code so it uses KStream. The code inside EmbeddedKafkaApplication looks like this when using KStream:

    public Function<KStream<Object, byte[]>, KStream<Object, byte[]>> handle(){
        return in ->
            in.map((key, value) -> new KeyValue<Object, byte[]> (key, new String(value).toUpperCase().getBytes()));
    }

I have also modified the POM to include spring-cloud-stream-binder-kafka-stream

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

And also upgraded to newer Kafka as explained here: spring-cloud/spring-cloud-stream-samples#182
The issue I have is that when I run the test, I can see this error in the log:

2020-06-09 16:37:58.854 ERROR 6507 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [handle-applicationId-203b2c06-8fbf-4687-b8aa-71e231d46005-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance

org.apache.kafka.streams.errors.StreamsException: stream-thread [handle-applicationId-203b2c06-8fbf-4687-b8aa-71e231d46005-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
    at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216) ~[kafka-streams-2.3.1.jar:na]

And the tests eventually fails because

Expecting:
 <0>
to be equal to:
 <1>
but was not.

Any ideas on what I'm doing wrong? Thanks a lot!!

15 replies
natraj09
@natraj09
Is there an option to skip bad records with kafka binder , I see an option with kafka stream https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.2.1.RELEASE/spring-cloud-stream-binder-kafka.html#_handling_deserialization_exceptions . I am also using the reactive api , wondering if its something I have to handle on my own ?
Soby Chacko
@sobychacko
You can setup a DLQ for storing the bad records and get going with the processing of new records.
1 reply
ZhangXudong
@MLeft
image.png
1 reply
image.png
image.png
I bind a input channel, but why the ChannelInterceptor.preSend invoked, not the preReceive method?
dharezlak
@dharezlak

Hi, I have the following reactive consumer:

@Bean
public Function<Flux<String>, Mono<Void>> processor() {
    return messages -> messages
            .flatMap(storage::store)
            .then();
}

How is this going to behave with respect to Kafka message acknowledgement? I can't seem to find any documentation on this. Since storage::store is reactive as well I would assume the flatMap operator would parallelize message consumption here?

3 replies
shipra chaudhary
@ShipraShipra04_twitter

Hi Team, I am using reactive Spring data r2dbc repositories to get pojo's from Postgres DB

@Table("document")
public class Document {
@Id
private String id;
@Column(value = "authors_ids")
private Set<Author> authors;// object
}

Author is an pojo object authors_ids is array list in postgres Db

while trying to run i get below error because of custom conversion.how i can define Set<object> type in pojo to access authors_ids(array of string) field from postgres DB?

{
"timestamp": "2020-06-15T13:20:50.491+0000",
"status": 500,
"error": "Internal Server Error",
"message": "Could not read property @org.springframework.data.relational.core.mapping.Column(keyColumn=, value=authors_ids)private java.util.Set com.example.demo.Documents.authors from result set!",
"path": "/api/document"
}

dvirgiln
@dvirgiln

Hello,

I am following the example from https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution

I am receiving an error:

ailed to invoke method; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"HelloWorld","namespace":"com.example.avro","fields":[{"name":"transactionId","type":"string"},{"name":"message","type":"string"},{"name":"field1","type":["null","int"]}]} (through reference chain: com.example.HelloWorld["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])

My Avro file is quite simple:

{
    "namespace": "com.hollandandbarrett.dns.kafkaworkshop.avro",
    "type": "record",
    "name": "HelloWorld",
    "fields": [
        {"name": "transactionId", "type": "string"},
        {"name": "message", "type": "string"},
        {"name": "field1", "type": ["null", "int"]}
    ]
}

The application.yml is:

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamic-schema-generation-enabled: false
      bindings:
        output:
          contentType: application/*+avro
          destination: HelloWorld
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: localhost:29092
          auto-create-topics: true

        bindings:
          output:

            producer:
              configuration:
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
server.port: 9010
Any idea why is not working?
Why is it trying to use the Json serialization instead of the Avro one?
Soby Chacko
@sobychacko
@dvirgiln Did you get that example working properly? Or getting that error while running that?
dvirgiln
@dvirgiln
It works perfectly if I do not include the union operator
1 reply
{"name": "field1", "type": ["null", "int"]}
somehow it is trying to use the Jackson serde
dvirgiln
@dvirgiln
The same error happens when I execute the producer 2 on the example
https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution
mvn clean compile package
java -jar schema-registry-confluent-avro-serializer-producer2/target/schema-registry-confluent-avro-serializer-producer2-0.0.1-SNAPSHOT.jar
dvirgiln
@dvirgiln
I have used the same docker as it appears in the README file:
 docker-compose -f docker-compose-control-center.yaml up -d
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Angshuman Agarwal
@angshuman-agarwal

@sobychacko : I have a naive question related to one sample here

How do you know process-out-0, process-out-1, process-out-2 mappings to english, french and spanish respectively and why not to any other order of process-out-* index ?

spring.cloud.stream.bindings.process-out-0:
  destination: english-counts
spring.cloud.stream.bindings.process-out-1:
  destination: french-counts
spring.cloud.stream.bindings.process-out-2:
  destination: spanish-counts
4 replies
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

@sobychacko : Could you give some suggestions on how can I use Flux of org.springframework.messaging.Message?

Thank you

6 replies
Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Has anyone tried this please ?

Andras Hatvani
@andrashatvani
hi, does spring-cloud-stream support deduping based on the message key?
Víctor Reyes Rodríguez
@vicrdguez

Hi!
I have a problem trying to configure the applicationId for my processors. This is my application.yml

spring.cloud:
    stream:
      function:
        definition: changePriority;addComment
      kafka:
        streams:
          binder:
            functions:
              changePriority.applicationId: proxy-sbn-changePriority
              addComment.applicationId: proxy-sbn-addComment

Anyways when I run my app I still getting anonymous consumers. Any hint?
Thanks!

5 replies
Wil Collins
@wilcollins
Hi, our team is using a RabbitMQ backend and we need message deduplication so we are considering using this community-provided plugin https://github.com/noxdafox/rabbitmq-message-deduplication , however, it requires specific arguments to be provided to the Exchange when it is created and, as far as I know, there are currently no existing facilities to do this through the provided configuration class so I have a few questions about that:
1) is there an existing solution that I am overlooking?
2) if no, is this functionality that already exists on any roadmaps for implementation?
3) if not or if that timeline is reasonably long, would this be a feature that would be a welcomed contribution from the community?
2 replies
shipra chaudhary
@ShipraShipra04_twitter
Hi team, I am trying to create SCDF task which use jdbc dependancy (org.springframework.boot:spring-boot-starter-jdbc)to store metadata. In spring boot app I am using r2dbc dependency(io.r2dbc:r2dbc-postgresql:0.8.2.RELEASE) to get data from postgresql and post it to some rest point.
I am getting error I tried to follow below link https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/multiple-datasources
but i am not sure what will be my first and second EmbeddedDatabaseType in this Scenario please help.
Jon Harper
@jonenst
Hi, is it possible to use the new test binder ( https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#_test_binder_usage ) with "normal" annotation based Tests controlling the context ? something like this
@SpringBootApplication
public class MyApp() {
   @Bean
    public Supplier<Flux<Message<String>>> mypublisher() {
        ...
    }

   public void publishone() {
      //some function called by the app to send messages to the mypublisher flux
   }
}

@SpringBootTest(...)
public class MyTest {
  @AutoWired
   OutputDestination output;

  @Test mytest() {
     //context setup automatically
     app.publishone()
     assertThat(collector.receive().getPayload(), equalTo("hello".getBytes()));
  }

}
Oleg Zhurakousky
@olegz
yes, we have tests for that
what I don't understand from your code is the collector part
Jon Harper
@jonenst
Hi,
thanks for taking the time to answer
collector should have been "output"
Oleg Zhurakousky
@olegz
got it
Jon Harper
@jonenst
the mypublisher bean stores a fluxsink used by the app (when an http request comes in) to send messages to the flux to send a message to a broker queue
I just want my test of the http request to also assert that a message has been sent to to the queue
I don't use @EnableBinding, I use spring function definitions
Oleg Zhurakousky
@olegz
ok, then I missunderstod yoru question