by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Aug 07 12:34
    olegz commented #2013
  • Aug 07 12:34
    olegz commented #2013
  • Aug 06 17:23

    olegz on master

    Add RSocket snapshot repo (compare)

  • Aug 06 14:33
    garyrussell commented #2012
  • Aug 06 14:17

    olegz on master

    Update reactor to the latest sn… (compare)

  • Aug 04 18:10

    olegz on 3.0.x

    Add test to validate GH-573 fro… Update s-c-function version (compare)

  • Aug 04 17:58

    olegz on master

    Add test to validate GH-573 fro… (compare)

  • Aug 04 15:50
    olegz commented #2009
  • Aug 04 15:08
    olegz commented #2010
  • Aug 04 15:02

    olegz on 3.0.x

    Changed versions back to BUILD-… (compare)

  • Aug 04 14:41

    olegz on 3.0.x

    GH-2011 Remove log override fro… (compare)

  • Aug 04 14:40

    olegz on master

    GH-2011 Remove log override fro… (compare)

  • Aug 04 14:38
    olegz commented #2011
  • Jul 31 17:57

    spring-buildmaster on 3.0.x

    Update SNAPSHOT to 3.0.7.RELEASE Going back to snapshots Bumping versions to 3.0.8.SNAPS… (compare)

  • Jul 31 17:55

    spring-buildmaster on v3.0.7.RELEASE

    Update SNAPSHOT to 3.0.7.RELEASE (compare)

  • Jul 31 17:14

    olegz on 3.0.x

    Update docs POM for publishing (compare)

  • Jul 31 14:22
    smonasco commented #1990
  • Jul 30 14:22
    Mrc0113 commented #2009
  • Jul 28 14:48
    sabbyanandan commented #2008
  • Jul 24 14:15

    olegz on 3.0.x

    Change to functions 3.0.9.BUILD… (compare)

Gary Russell
@garyrussell
destination=input-1,input-2
1 reply
By default, this will create 2 bindings (listener containers). If you set the multiplex consumer property to true, you'll get a single binding consuming from both destinations.
boonware
@boonware
Hi all. I'm trying to ensure that applications always use round robin partitioning with sending to the DLQ. In the docs it says I should implement a DlqPartitionFunction bean, but looking at the parameters, it's not clear how I could implement round robin, since I must always return the actual partition number and there isn't any info on the number of partitions for the DLQ topic. Any advise? Reference: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka-dlq-processing
1 reply
JDGale8
@JDGale8

Hey, I have a question for some of the motivation for the new test style for the test binders.
Unlike all other spring tests, these are no longer leveraging @Autowired and instead using this context builder, like

context = new SpringApplicationBuilder(
            TestChannelBinderConfiguration.getCompleteConfiguration(Application.class)
            .run("--server.port=0",
                      "--spring.cloud.function.definition=listener");

This is a huge departure from the standard use of @SpringBootTest and @RunWith(SpringRunner.class). does anyone know why this is happening? Is it out of necessity, is the direction that Spring is moving in? I haven't been able to find any documentation or blog posts about the motivation behind this.
It's making rewriting existing tests really challenging, and it feels like a code smell to rewrite the existing tests is a far more verbose way.

1 reply
natraj09
@natraj09
@garyrussell I tried creating 2 bindings to the same function . The original topic and DLQ topic are both binded to the same function (sample configuration below). I found the message is not being sent to dlq on error https://github.com/natraj09/spring-cloud-stream-playground/blob/master/reactive-processor-kafka/src/main/java/reactive/kafka/ReactiveProcessorApplication.java#L54 . I came across this open issue spring-cloud/spring-cloud-stream#1922 , looks like DLQ is not supported for reactive function. Instead of throwing an exception can I use sendto destination to route it to error channel Mono.just(MessageBuilder.fromMessage(inbound).setHeader("spring.cloud.stream.sendto.destination", "rest-input-error").build()) . What is the recommended pattern to handle errors in case of reactive function?
spring:
  cloud:
    stream:
      function.definition: testProcess;testSource;testSink
      bindings:
        testProcess-in-0:
          destination: rest-input,rest-input-error
          group: input-group
          maxAttempts: 1
        testProcess-out-0:
          destination: rest-output
        testSink-in-0:
          destination: rest-output
          group: sink-group
        testSource-out-0:
          destination: rest-input
      kafka:
        bindings:
          testProcess-in-0:
            consumer:
              enableDlq: true
              dlqName: rest-input-error
3 replies
Andrey Shirobokov
@ashirobokov
Hi!
I'm using spring cloud streams 3.0.4 RELEASE (Hoxton.SR4). Is there any code sample of using bean validation (Function<@Valid Person, Person> uppercase()) with functional approach or not?
Oleg Zhurakousky
@olegz
no, and if anything that would be on spring-cloud-function side
feel free to raise the issue there
Andrey Shirobokov
@ashirobokov
ok, thanks!
sakella1986
@sakella1986
@sobychacko and all, quick question on improving kafka message consumer performance. we have 12 partitions in topic, and publishing at a rate of 200 TPS. after receiving the message (we use 12 sprintboot containers for reading from all partitions), we need to make an external vendor call which takes 600 ms which means I can only support 20 TPS. What options I have to achieve 200 TPS message consumption. Appreciate your help!
20 replies
Nadavbi87
@Nadavbi87
Hi,
Did the start-scs.cfapps.io site move to a new domain?
Soby Chacko
@sobychacko
@Nadavbi87 It's up and running now. There were some issues before.
Nadavbi87
@Nadavbi87
@sobychacko
Thanks!
sumit1912
@sumit1912

Hi team,

I have created 3 simple Spring cloud stream apps(Source/Processor/Sink) using Spring cloud function approach transferring Flux<String>.

Source-app:

@SpringBootApplication
public class SourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }

    @PollableBean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> {
            String v1 = String.valueOf("abc");
            String v2 = String.valueOf("pqr");
            String v3 = String.valueOf("xyz");
            return Flux.just(v1, v2, v3);
        };
    }
}

Processor-app:

@SpringBootApplication
public class ProcessorApplication {

    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase()).log();
    }

    public static void main(String[] args) {
        SpringApplication.run(ProcessorApplication.class, args);
    }
}

Sink-app:

@SpringBootApplication
public class SinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }

    @Bean
    public Consumer<Flux<String>> log() {
        return flux -> {
            flux.subscribe(f -> System.out.println("Received data: " + f));
        };
    }
}

The dependencies that I have added are:

SpringBoot version = 2.2.6.RELEASE

implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit")
implementation("org.springframework.cloud:spring-cloud-starter-function-webflux:3.0.7.RELEASE")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-webflux")

I have registered these these apps in Spring Cloud Data Flow and deployed in a stream.

I am able to transmit data to these apps and receive output both via HTTP and via RabbitMQ individually. But, the message is not communicated across the apps(Source->Processor->sink).
Am I missing any dependency, annotation or application property.
I am new to Spring cloud stream using function approach, any suggestion will be a great help :)

Complete code can be found here:https://github.com/sumit1912/springcloud.function.stream.pipeline

Soby Chacko
@sobychacko
@sumit1912 Didn't you ask and get an answer on SO regarding this?
Omkar Shetkar
@Omkar-Shetkar

Hi All,
Recently I started learning and using Spring Cloud Stream. I liked the way Spring Cloud Functions simplify message publishing and consumption. But in one of my application, Beans corresponding to Functions are not detected and hence not binded to destination.
When I compared this with my sample working version, difference is, in not-working version I have a module dependency which internally uses Spring Cloud Stream. This module uses @EnableBinding annotation.
In my working version If I put this annotation, I reproduce the same issue.
It seems there is a conflict between @EnableBinding and functional support.
Any hints or suggestions will be really helpful.
Thanks.


In unit test, when I try to receive the message using OutputDestination, I get following error:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34) at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:37) at com.example.loggingconsumer.LoggingConsumerApplicationTests.contextLoads(LoggingConsumerApplicationTests.java:50) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) ...

Omkar Shetkar
@Omkar-Shetkar
Not working version with @EnableBinding annotation. If I just remove this annotation it works!
@SpringBootApplication
@Controller
@EnableBinding
public class LoggingConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    @Bean
    public Consumer<CloudEventImpl<Person>> consumer() {
        return p -> {
            System.out.println("consumer--Received: " + p);

        };
    }
Soby Chacko
@sobychacko
@Omkar-Shetkar You cannot mix functional binding and the annotation approach in the regular message channel-based binders (Kafka Streams binder allows that in the current versions).
sumit1912
@sumit1912

@sumit1912 Didn't you ask and get an answer on SO regarding this?

Thanks @sobychacko for the reply. Answer given at SO worked like a charm.

Omkar Shetkar
@Omkar-Shetkar

@Omkar-Shetkar You cannot mix functional binding and the annotation approach in the regular message channel-based binders (Kafka Streams binder allows that in the current versions).

OK... @sobychacko Thanks for the clarification...

boonware
@boonware
Hi. Is it possible to modify a message consumed via a @StreamListener method before sending it to the DLQ? My use case it to attach an error description to the message before routing it
I've tried mutating the object parameter to the method, but that doesn't seem to have any effect
Gonzalo Borobio
@gborobio73

Hello! I'm new to spring cloud stream and kafka and I could use some help. From the spring cloud samples, test embedded kafka (https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/testing-samples/test-embedded-kafka) I'm trying to modify the code so it uses KStream. The code inside EmbeddedKafkaApplication looks like this when using KStream:

    public Function<KStream<Object, byte[]>, KStream<Object, byte[]>> handle(){
        return in ->
            in.map((key, value) -> new KeyValue<Object, byte[]> (key, new String(value).toUpperCase().getBytes()));
    }

I have also modified the POM to include spring-cloud-stream-binder-kafka-stream

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

And also upgraded to newer Kafka as explained here: spring-cloud/spring-cloud-stream-samples#182
The issue I have is that when I run the test, I can see this error in the log:

2020-06-09 16:37:58.854 ERROR 6507 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [handle-applicationId-203b2c06-8fbf-4687-b8aa-71e231d46005-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance

org.apache.kafka.streams.errors.StreamsException: stream-thread [handle-applicationId-203b2c06-8fbf-4687-b8aa-71e231d46005-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
    at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216) ~[kafka-streams-2.3.1.jar:na]

And the tests eventually fails because

Expecting:
 <0>
to be equal to:
 <1>
but was not.

Any ideas on what I'm doing wrong? Thanks a lot!!

15 replies
natraj09
@natraj09
Is there an option to skip bad records with kafka binder , I see an option with kafka stream https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.2.1.RELEASE/spring-cloud-stream-binder-kafka.html#_handling_deserialization_exceptions . I am also using the reactive api , wondering if its something I have to handle on my own ?
Soby Chacko
@sobychacko
You can setup a DLQ for storing the bad records and get going with the processing of new records.
1 reply
ZhangXudong
@MLeft
image.png
1 reply
image.png
image.png
I bind a input channel, but why the ChannelInterceptor.preSend invoked, not the preReceive method?
dharezlak
@dharezlak

Hi, I have the following reactive consumer:

@Bean
public Function<Flux<String>, Mono<Void>> processor() {
    return messages -> messages
            .flatMap(storage::store)
            .then();
}

How is this going to behave with respect to Kafka message acknowledgement? I can't seem to find any documentation on this. Since storage::store is reactive as well I would assume the flatMap operator would parallelize message consumption here?

3 replies
shipra chaudhary
@ShipraShipra04_twitter

Hi Team, I am using reactive Spring data r2dbc repositories to get pojo's from Postgres DB

@Table("document")
public class Document {
@Id
private String id;
@Column(value = "authors_ids")
private Set<Author> authors;// object
}

Author is an pojo object authors_ids is array list in postgres Db

while trying to run i get below error because of custom conversion.how i can define Set<object> type in pojo to access authors_ids(array of string) field from postgres DB?

{
"timestamp": "2020-06-15T13:20:50.491+0000",
"status": 500,
"error": "Internal Server Error",
"message": "Could not read property @org.springframework.data.relational.core.mapping.Column(keyColumn=, value=authors_ids)private java.util.Set com.example.demo.Documents.authors from result set!",
"path": "/api/document"
}

dvirgiln
@dvirgiln

Hello,

I am following the example from https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution

I am receiving an error:

ailed to invoke method; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"HelloWorld","namespace":"com.example.avro","fields":[{"name":"transactionId","type":"string"},{"name":"message","type":"string"},{"name":"field1","type":["null","int"]}]} (through reference chain: com.example.HelloWorld["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])

My Avro file is quite simple:

{
    "namespace": "com.hollandandbarrett.dns.kafkaworkshop.avro",
    "type": "record",
    "name": "HelloWorld",
    "fields": [
        {"name": "transactionId", "type": "string"},
        {"name": "message", "type": "string"},
        {"name": "field1", "type": ["null", "int"]}
    ]
}

The application.yml is:

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamic-schema-generation-enabled: false
      bindings:
        output:
          contentType: application/*+avro
          destination: HelloWorld
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: localhost:29092
          auto-create-topics: true

        bindings:
          output:

            producer:
              configuration:
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
server.port: 9010
Any idea why is not working?
Why is it trying to use the Json serialization instead of the Avro one?
Soby Chacko
@sobychacko
@dvirgiln Did you get that example working properly? Or getting that error while running that?
dvirgiln
@dvirgiln
It works perfectly if I do not include the union operator
1 reply
{"name": "field1", "type": ["null", "int"]}
somehow it is trying to use the Jackson serde
dvirgiln
@dvirgiln
The same error happens when I execute the producer 2 on the example
https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution
mvn clean compile package
java -jar schema-registry-confluent-avro-serializer-producer2/target/schema-registry-confluent-avro-serializer-producer2-0.0.1-SNAPSHOT.jar
dvirgiln
@dvirgiln
I have used the same docker as it appears in the README file:
 docker-compose -f docker-compose-control-center.yaml up -d
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Angshuman Agarwal
@angshuman-agarwal

@sobychacko : I have a naive question related to one sample here

How do you know process-out-0, process-out-1, process-out-2 mappings to english, french and spanish respectively and why not to any other order of process-out-* index ?

spring.cloud.stream.bindings.process-out-0:
  destination: english-counts
spring.cloud.stream.bindings.process-out-1:
  destination: french-counts
spring.cloud.stream.bindings.process-out-2:
  destination: spanish-counts
4 replies
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

@sobychacko : Could you give some suggestions on how can I use Flux of org.springframework.messaging.Message?

Thank you

6 replies
Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Has anyone tried this please ?

Andras Hatvani
@andrashatvani
hi, does spring-cloud-stream support deduping based on the message key?
Víctor Reyes Rodríguez
@vicrdguez

Hi!
I have a problem trying to configure the applicationId for my processors. This is my application.yml

spring.cloud:
    stream:
      function:
        definition: changePriority;addComment
      kafka:
        streams:
          binder:
            functions:
              changePriority.applicationId: proxy-sbn-changePriority
              addComment.applicationId: proxy-sbn-addComment

Anyways when I run my app I still getting anonymous consumers. Any hint?
Thanks!

5 replies
Wil Collins
@wilcollins
Hi, our team is using a RabbitMQ backend and we need message deduplication so we are considering using this community-provided plugin https://github.com/noxdafox/rabbitmq-message-deduplication , however, it requires specific arguments to be provided to the Exchange when it is created and, as far as I know, there are currently no existing facilities to do this through the provided configuration class so I have a few questions about that:
1) is there an existing solution that I am overlooking?
2) if no, is this functionality that already exists on any roadmaps for implementation?
3) if not or if that timeline is reasonably long, would this be a feature that would be a welcomed contribution from the community?
2 replies