by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Aug 11 15:43
    olegz commented #1398
  • Aug 11 15:00
    reddy-s commented #1398
  • Aug 11 06:16
    tangdelong commented #2013
  • Aug 11 06:15
    tangdelong commented #2013
  • Aug 11 06:15
    tangdelong commented #2013
  • Aug 11 06:14
    tangdelong commented #2013
  • Aug 11 04:18
    sakthidurai commented #2013
  • Aug 07 12:34
    olegz commented #2013
  • Aug 07 12:34
    olegz commented #2013
  • Aug 06 17:23

    olegz on master

    Add RSocket snapshot repo (compare)

  • Aug 06 14:33
    garyrussell commented #2012
  • Aug 06 14:17

    olegz on master

    Update reactor to the latest sn… (compare)

  • Aug 04 18:10

    olegz on 3.0.x

    Add test to validate GH-573 fro… Update s-c-function version (compare)

  • Aug 04 17:58

    olegz on master

    Add test to validate GH-573 fro… (compare)

  • Aug 04 15:50
    olegz commented #2009
  • Aug 04 15:08
    olegz commented #2010
  • Aug 04 15:02

    olegz on 3.0.x

    Changed versions back to BUILD-… (compare)

  • Aug 04 14:41

    olegz on 3.0.x

    GH-2011 Remove log override fro… (compare)

  • Aug 04 14:40

    olegz on master

    GH-2011 Remove log override fro… (compare)

  • Aug 04 14:38
    olegz commented #2011
rstpv
@rstpv
Hi, I'm trying to use this implementation https://github.com/spring-cloud/stream-applications/blob/master/functions/supplier/rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierConfiguration.java, in a kafka environment, I'm using dataflow and it is setting --spring.cloud.stream.bindings.output.destination=someTopic yet the messages are published to the topic rabbitSupplier-out-0, did I miss something in the implementation?, is there a way of saying that the FluxMessageChannel is the default output and topic bindings apply to it?
sumit1912
@sumit1912

Hi @olegz ,

I have created a simple Spring cloud stream apps(source, processor and sink) where the processor and sink are transferring reactive data.
https://github.com/sumit1912/springcloud.stream.pipeline

I deploy these apps in Spring Cloud Data Flow with RabbitMQ as broker. My apps work fine and transfer data when I use, springBootVersion = "2.1.9.RELEASE" and org.springframework.cloud:spring-cloud-stream-reactive:2.2.0.RELEASE dependency.

When I update Spring boot to v2.2.6.RELEASE and replace spring-cloud-stream-reactive by "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR3" and "org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR3" I got below error:

ERROR 5206 — [ wer.http.wer-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binding.StreamListenerMessageHandler@39ce27f2]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `reactor.core.publisher.Flux` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (byte[])...

The error appears only after I replace spring-cloud-stream-reactive by spring-cloud-dependencies and spring-cloud-stream-dependencies and update Spring boot to 2.2.x

Is it because of the fact that spring-cloud-stream-reactive module is deprecated in spring-cloud-dependencies:Hoxton.SR3 and we need to use spring-cloud-function (Supplier/Function/Consumer model) for handling reactive messages?

sagarikamraj
@sagarikamraj
Hello Team. I am facing a weird issue. We are using using @StreamListener with condition. We have the condition as new String(payload).event.type matches 'MyEvent-Type'. This works fine in local Kafka, even in embedded kafka(integration-test). But when deployed to other environment for the same message i get Cannot find a @StreamListener matching for message".
Currently i do not have any more logs to provide more information.Sorry. Does someone have any idea why this could have happened? Is there any sort of setting i need to check in kafka or is it some problem with serializers etc?
2 replies
Moshe
@moshelav
when consumers contentiously revoking from a group since timeout, the group will count less and less consumers until it left 0 consumers, it obvious we need to increase the timeout and improve performance... but still, is there a possibility to join back the consumers to the group without restart the microservice?
yand
@yand

Hello, I have the following setup for consumer, it is working for the first message, and then I get an exception:

INFO 47834 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application-1.myConsumer-in-0' has 0 subscriber(s).
ERROR 47834 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.myConsumer-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload={...}]
public interface Sink<I> extends Consumer<Flux<I>> {

  @Override
  default void accept(Flux<I> input) {
    consume(input);
  }

  void consume(Flux<I> input);
}
@Component("myConsumer")
public class MyConsumer implements Sink<MyMessage> {

  public MyConsumer() {
  }

  @Override
  public void consume(Flux<MyMessage> input) {
    input
      .transform(...)
      .transform(...)
      .subscribe();
  }
}
spring:
  cloud:
    stream:
      function:
        definition: myConsumer
      bindings:
        myConsumer-in-0:
          destination: ...
          group: myConsumerGroup
          consumer:
            autoRebalanceEnabled: true
            partitioned: true
            maxAttempts: 3
      kafka:
        bindings:
          myConsumer-in-0:
            consumer:
              configuration:
                value:
                  deserializer: ...

Not sure what am I doing wrong, any help is appreciated

natraj09
@natraj09
Hi Team , I am reading through the docs here https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling , how does re-queue of the message work in case of kafka. Does it require any special configuration? I have maxAttempts: 3 . Here's my sample project https://github.com/natraj09/spring-cloud-stream-playground/blob/master/reactive-processor-kafka/src/main/java/reactive/kafka/ReactiveProcessorApplication.java#L52, if the rest payload has "error:true" it prints an error and throws an Exception. I only see the info log once instead of 3 times.
2 replies
GamiiisKth
@GamiiisKth
Hi, I can't set the producer properties delivery.timeout.ms, I got The configuration 'delivery.timeout.ms' was supplied but isn't a known config.
natraj09
@natraj09
Is it possible to map multiple destination to the same function. Something like this (but its not a valid yaml , as the key is the same)
testProcess-in-0:
destination: input-1
testProcess-in-0:
destination:  input-2
Gary Russell
@garyrussell
destination=input-1,input-2
1 reply
By default, this will create 2 bindings (listener containers). If you set the multiplex consumer property to true, you'll get a single binding consuming from both destinations.
boonware
@boonware
Hi all. I'm trying to ensure that applications always use round robin partitioning with sending to the DLQ. In the docs it says I should implement a DlqPartitionFunction bean, but looking at the parameters, it's not clear how I could implement round robin, since I must always return the actual partition number and there isn't any info on the number of partitions for the DLQ topic. Any advise? Reference: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka-dlq-processing
1 reply
JDGale8
@JDGale8

Hey, I have a question for some of the motivation for the new test style for the test binders.
Unlike all other spring tests, these are no longer leveraging @Autowired and instead using this context builder, like

context = new SpringApplicationBuilder(
            TestChannelBinderConfiguration.getCompleteConfiguration(Application.class)
            .run("--server.port=0",
                      "--spring.cloud.function.definition=listener");

This is a huge departure from the standard use of @SpringBootTest and @RunWith(SpringRunner.class). does anyone know why this is happening? Is it out of necessity, is the direction that Spring is moving in? I haven't been able to find any documentation or blog posts about the motivation behind this.
It's making rewriting existing tests really challenging, and it feels like a code smell to rewrite the existing tests is a far more verbose way.

1 reply
natraj09
@natraj09
@garyrussell I tried creating 2 bindings to the same function . The original topic and DLQ topic are both binded to the same function (sample configuration below). I found the message is not being sent to dlq on error https://github.com/natraj09/spring-cloud-stream-playground/blob/master/reactive-processor-kafka/src/main/java/reactive/kafka/ReactiveProcessorApplication.java#L54 . I came across this open issue spring-cloud/spring-cloud-stream#1922 , looks like DLQ is not supported for reactive function. Instead of throwing an exception can I use sendto destination to route it to error channel Mono.just(MessageBuilder.fromMessage(inbound).setHeader("spring.cloud.stream.sendto.destination", "rest-input-error").build()) . What is the recommended pattern to handle errors in case of reactive function?
spring:
  cloud:
    stream:
      function.definition: testProcess;testSource;testSink
      bindings:
        testProcess-in-0:
          destination: rest-input,rest-input-error
          group: input-group
          maxAttempts: 1
        testProcess-out-0:
          destination: rest-output
        testSink-in-0:
          destination: rest-output
          group: sink-group
        testSource-out-0:
          destination: rest-input
      kafka:
        bindings:
          testProcess-in-0:
            consumer:
              enableDlq: true
              dlqName: rest-input-error
3 replies
Andrey Shirobokov
@ashirobokov
Hi!
I'm using spring cloud streams 3.0.4 RELEASE (Hoxton.SR4). Is there any code sample of using bean validation (Function<@Valid Person, Person> uppercase()) with functional approach or not?
Oleg Zhurakousky
@olegz
no, and if anything that would be on spring-cloud-function side
feel free to raise the issue there
Andrey Shirobokov
@ashirobokov
ok, thanks!
sakella1986
@sakella1986
@sobychacko and all, quick question on improving kafka message consumer performance. we have 12 partitions in topic, and publishing at a rate of 200 TPS. after receiving the message (we use 12 sprintboot containers for reading from all partitions), we need to make an external vendor call which takes 600 ms which means I can only support 20 TPS. What options I have to achieve 200 TPS message consumption. Appreciate your help!
20 replies
Nadavbi87
@Nadavbi87
Hi,
Did the start-scs.cfapps.io site move to a new domain?
Soby Chacko
@sobychacko
@Nadavbi87 It's up and running now. There were some issues before.
Nadavbi87
@Nadavbi87
@sobychacko
Thanks!
sumit1912
@sumit1912

Hi team,

I have created 3 simple Spring cloud stream apps(Source/Processor/Sink) using Spring cloud function approach transferring Flux<String>.

Source-app:

@SpringBootApplication
public class SourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }

    @PollableBean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> {
            String v1 = String.valueOf("abc");
            String v2 = String.valueOf("pqr");
            String v3 = String.valueOf("xyz");
            return Flux.just(v1, v2, v3);
        };
    }
}

Processor-app:

@SpringBootApplication
public class ProcessorApplication {

    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase()).log();
    }

    public static void main(String[] args) {
        SpringApplication.run(ProcessorApplication.class, args);
    }
}

Sink-app:

@SpringBootApplication
public class SinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }

    @Bean
    public Consumer<Flux<String>> log() {
        return flux -> {
            flux.subscribe(f -> System.out.println("Received data: " + f));
        };
    }
}

The dependencies that I have added are:

SpringBoot version = 2.2.6.RELEASE

implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit")
implementation("org.springframework.cloud:spring-cloud-starter-function-webflux:3.0.7.RELEASE")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-webflux")

I have registered these these apps in Spring Cloud Data Flow and deployed in a stream.

I am able to transmit data to these apps and receive output both via HTTP and via RabbitMQ individually. But, the message is not communicated across the apps(Source->Processor->sink).
Am I missing any dependency, annotation or application property.
I am new to Spring cloud stream using function approach, any suggestion will be a great help :)

Complete code can be found here:https://github.com/sumit1912/springcloud.function.stream.pipeline

Soby Chacko
@sobychacko
@sumit1912 Didn't you ask and get an answer on SO regarding this?
Omkar Shetkar
@Omkar-Shetkar

Hi All,
Recently I started learning and using Spring Cloud Stream. I liked the way Spring Cloud Functions simplify message publishing and consumption. But in one of my application, Beans corresponding to Functions are not detected and hence not binded to destination.
When I compared this with my sample working version, difference is, in not-working version I have a module dependency which internally uses Spring Cloud Stream. This module uses @EnableBinding annotation.
In my working version If I put this annotation, I reproduce the same issue.
It seems there is a conflict between @EnableBinding and functional support.
Any hints or suggestions will be really helpful.
Thanks.


In unit test, when I try to receive the message using OutputDestination, I get following error:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34) at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:37) at com.example.loggingconsumer.LoggingConsumerApplicationTests.contextLoads(LoggingConsumerApplicationTests.java:50) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) ...

Omkar Shetkar
@Omkar-Shetkar
Not working version with @EnableBinding annotation. If I just remove this annotation it works!
@SpringBootApplication
@Controller
@EnableBinding
public class LoggingConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    @Bean
    public Consumer<CloudEventImpl<Person>> consumer() {
        return p -> {
            System.out.println("consumer--Received: " + p);

        };
    }
Soby Chacko
@sobychacko
@Omkar-Shetkar You cannot mix functional binding and the annotation approach in the regular message channel-based binders (Kafka Streams binder allows that in the current versions).
sumit1912
@sumit1912

@sumit1912 Didn't you ask and get an answer on SO regarding this?

Thanks @sobychacko for the reply. Answer given at SO worked like a charm.

Omkar Shetkar
@Omkar-Shetkar

@Omkar-Shetkar You cannot mix functional binding and the annotation approach in the regular message channel-based binders (Kafka Streams binder allows that in the current versions).

OK... @sobychacko Thanks for the clarification...

boonware
@boonware
Hi. Is it possible to modify a message consumed via a @StreamListener method before sending it to the DLQ? My use case it to attach an error description to the message before routing it
I've tried mutating the object parameter to the method, but that doesn't seem to have any effect
Gonzalo Borobio
@gborobio73

Hello! I'm new to spring cloud stream and kafka and I could use some help. From the spring cloud samples, test embedded kafka (https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/testing-samples/test-embedded-kafka) I'm trying to modify the code so it uses KStream. The code inside EmbeddedKafkaApplication looks like this when using KStream:

    public Function<KStream<Object, byte[]>, KStream<Object, byte[]>> handle(){
        return in ->
            in.map((key, value) -> new KeyValue<Object, byte[]> (key, new String(value).toUpperCase().getBytes()));
    }

I have also modified the POM to include spring-cloud-stream-binder-kafka-stream

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

And also upgraded to newer Kafka as explained here: spring-cloud/spring-cloud-stream-samples#182
The issue I have is that when I run the test, I can see this error in the log:

2020-06-09 16:37:58.854 ERROR 6507 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [handle-applicationId-203b2c06-8fbf-4687-b8aa-71e231d46005-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance

org.apache.kafka.streams.errors.StreamsException: stream-thread [handle-applicationId-203b2c06-8fbf-4687-b8aa-71e231d46005-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
    at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216) ~[kafka-streams-2.3.1.jar:na]

And the tests eventually fails because

Expecting:
 <0>
to be equal to:
 <1>
but was not.

Any ideas on what I'm doing wrong? Thanks a lot!!

15 replies
natraj09
@natraj09
Is there an option to skip bad records with kafka binder , I see an option with kafka stream https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.2.1.RELEASE/spring-cloud-stream-binder-kafka.html#_handling_deserialization_exceptions . I am also using the reactive api , wondering if its something I have to handle on my own ?
Soby Chacko
@sobychacko
You can setup a DLQ for storing the bad records and get going with the processing of new records.
1 reply
ZhangXudong
@MLeft
image.png
1 reply
image.png
image.png
I bind a input channel, but why the ChannelInterceptor.preSend invoked, not the preReceive method?
dharezlak
@dharezlak

Hi, I have the following reactive consumer:

@Bean
public Function<Flux<String>, Mono<Void>> processor() {
    return messages -> messages
            .flatMap(storage::store)
            .then();
}

How is this going to behave with respect to Kafka message acknowledgement? I can't seem to find any documentation on this. Since storage::store is reactive as well I would assume the flatMap operator would parallelize message consumption here?

3 replies
shipra chaudhary
@ShipraShipra04_twitter

Hi Team, I am using reactive Spring data r2dbc repositories to get pojo's from Postgres DB

@Table("document")
public class Document {
@Id
private String id;
@Column(value = "authors_ids")
private Set<Author> authors;// object
}

Author is an pojo object authors_ids is array list in postgres Db

while trying to run i get below error because of custom conversion.how i can define Set<object> type in pojo to access authors_ids(array of string) field from postgres DB?

{
"timestamp": "2020-06-15T13:20:50.491+0000",
"status": 500,
"error": "Internal Server Error",
"message": "Could not read property @org.springframework.data.relational.core.mapping.Column(keyColumn=, value=authors_ids)private java.util.Set com.example.demo.Documents.authors from result set!",
"path": "/api/document"
}

dvirgiln
@dvirgiln

Hello,

I am following the example from https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution

I am receiving an error:

ailed to invoke method; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"HelloWorld","namespace":"com.example.avro","fields":[{"name":"transactionId","type":"string"},{"name":"message","type":"string"},{"name":"field1","type":["null","int"]}]} (through reference chain: com.example.HelloWorld["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])

My Avro file is quite simple:

{
    "namespace": "com.hollandandbarrett.dns.kafkaworkshop.avro",
    "type": "record",
    "name": "HelloWorld",
    "fields": [
        {"name": "transactionId", "type": "string"},
        {"name": "message", "type": "string"},
        {"name": "field1", "type": ["null", "int"]}
    ]
}

The application.yml is:

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamic-schema-generation-enabled: false
      bindings:
        output:
          contentType: application/*+avro
          destination: HelloWorld
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: localhost:29092
          auto-create-topics: true

        bindings:
          output:

            producer:
              configuration:
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
server.port: 9010
Any idea why is not working?
Why is it trying to use the Json serialization instead of the Avro one?
Soby Chacko
@sobychacko
@dvirgiln Did you get that example working properly? Or getting that error while running that?
dvirgiln
@dvirgiln
It works perfectly if I do not include the union operator
1 reply
{"name": "field1", "type": ["null", "int"]}
somehow it is trying to use the Jackson serde
dvirgiln
@dvirgiln
The same error happens when I execute the producer 2 on the example
https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution
mvn clean compile package
java -jar schema-registry-confluent-avro-serializer-producer2/target/schema-registry-confluent-avro-serializer-producer2-0.0.1-SNAPSHOT.jar
dvirgiln
@dvirgiln
I have used the same docker as it appears in the README file:
 docker-compose -f docker-compose-control-center.yaml up -d