by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jul 10 02:54
    natraj09 commented #1909
  • Jul 09 17:50
    Buzzardo opened #1999
  • Jul 09 11:17
    olegz commented #1909
  • Jul 09 11:08
    natraj09 commented #1909
  • Jul 04 02:12
    raycheung commented #1922
  • Jul 02 16:05

    sobychacko on master

    GH-1998: StreamBridge API and c… (compare)

  • Jul 02 16:04

    sobychacko on 3.0.x

    GH-1998: StreamBridge API and c… (compare)

  • Jul 02 15:09
    garyrussell commented #1997
  • Jul 02 14:40
    JagmohanSharma commented #1997
  • Jul 02 13:04

    mkheck on master

    Fixed various small typos/ambig… Merge pull request #1993 from m… (compare)

  • Jul 02 13:04
    mkheck closed #1993
  • Jul 02 13:03
    mkheck added as member
  • Jul 01 20:09
    garyrussell commented #1997
  • Jul 01 20:09
    garyrussell commented #1997
  • Jul 01 09:32
    olegz commented #1994
  • Jun 30 15:54
    mkheck opened #1993
  • Jun 30 15:01
    sabbyanandan commented #1933
  • Jun 30 14:59
    sabbyanandan commented #1970
  • Jun 22 15:05

    spring-buildmaster on 3.0.x

    Update SNAPSHOT to 3.0.6.RELEASE Going back to snapshots Bumping versions to 3.0.7.BUILD… (compare)

  • Jun 22 15:02

    spring-buildmaster on v3.0.6.RELEASE

    Update SNAPSHOT to 3.0.6.RELEASE (compare)

Jakub Kubryński
@jkubrynski
then in the test I want to check if after invoking revokeAccess method, the proper message was sent to the proper destination. I don't mean testing the business logic - just the integration. If the proper destination, message headers and a valid payload were set.
with the old test-support I use messageCollector.forChannel(myChannel.myDestination()).poll(1, TimeUnit.SECONDS);
then on the retrieved message I can verify the whole structure. The question is how similar approach is possible with the new test-binder
if the above is not clear let me know -> I'll code the whole approach in the separate project
Knut Schleßelmann
@kschlesselmann
@olegz My error resolved itself … don't ask me why …
Knut Schleßelmann
@kschlesselmann
OK … now it's back. Let me have a look into our AWS setup here … could it be some network issue? If so could I get some better logs at some place?
Knut Schleßelmann
@kschlesselmann
@olegz Looks like that the infrastructure is fine … maybe it's really some weird behavior in our service
sakella1986
@sakella1986
@sobychacko basic question on consuming messages from Kafka. How do I choose between KafkaListener, StreamListerner, Function etc. I may optionally write to a retry topic but I can also live with same topic by not committing the offset for failed messages. We use Kerberos Auth and Kafka Broker version 0.10.2 and have no plans on upgrading to 0.11 atleast for the next 1 year.
Soby Chacko
@sobychacko
@sakella1986 That is a very old Kafka version and it won't work with the latest versions of the Kafka binder in Spring Cloud Stream (You will likely see some Kafka wire protocol compatibility issues if you combine 0.10.2 and latest SCSt versions using later Kafka clients). We used to provide a separate 0.11 binder when the default version supported was 0.10.2/0.10.1. Those binders are no longer supported.
With that said, KafkaListener is part of the Spring Kafka library and you don't need Spring Cloud Stream to use that. You use StreamListener or functional support from Spring Cloud Stream when you need the higher-level abstractions and programming model provided. In addition, SCSt comes with a lot of opinionated choices you can rely upon. SCSt Kafka binder is built upon the foundations of Spring Kafka although it doesn't directly use KafkaListener internally.
1 reply
Soby Chacko
@sobychacko
@jkubrynski Are you accessing that service using StreamListener? or is that getting wrapped inside a Supplier function?
Jakub Kubryński
@jkubrynski
Currently I'm taking about testing the producer side - so the send method on the MessageChannel. For the client side it depends but in the recent usecase I use subscribe on the channel.
But you can focus on just the tests you've shared with me. Normally the selected lines would be in the production code. How can you move them from this test to main sourceset: https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/RoutingFunctionTests.java#L72;L77
Soby Chacko
@sobychacko
ya, i am sorry, i didn't mean StreamListener there but was thinking about functional vs non-functional on the producer side. It sounds like you are directly calling the send method.
sakella1986
@sakella1986
@sobychacko so I better use KafkaListener not SCSt.
Soby Chacko
@sobychacko
@jkubrynski you can also try to use StreamBridge to send outbound messages. It is relatively a new feature. But available on the recent Horsham SR releases. The idea is that you don't directly deal with channels, but the framework internally handles that for you. You interact with this in terms of a binding destination. OutputDestinaion can be used to test that scenario. See these tests: https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java
Jakub Kubryński
@jkubrynski
Looks good. Thanks @sobychacko
Soby Chacko
@sobychacko
@sakella1986 I spoke with @garyrussell and you should be fine using SCSt or Spring Kafka there. Based on this compatibility matrix, it should work with brokers 0.10.2. https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix.
Sorry for the confusion. The modern clients can negotiate with the older wire protocols and therefore you may use the later Kafka binders in Spring Cloud Stream (or Spring Kafka's KafkaListener).
2 replies
Knut Schleßelmann
@kschlesselmann
@olegz I've still got no clue why I get the "Dispatcher has no subscriber" in my service … to me it uses the same setup and the same versions as my other services … how would you debug such a thing?
Oleg Zhurakousky
@olegz
@kschlesselmann i am sorry, stuck with releases which are holding other people up. So really a question of priority. Will get to it hopefully later today
Knut Schleßelmann
@kschlesselmann
@olegz Never mind … I got my error. I just don't get how it killed my Flux. It was an error on my side though. No need to worry ;-)
Mani Jindal
@manijndl
hi everyone i just want to check do application emit any metrics for kafka? like incoming msgs , processed msgs and failed msgs if yes can you please help the endpoints to get that metrics?
rstpv
@rstpv
Hi, I'm trying to use this implementation https://github.com/spring-cloud/stream-applications/blob/master/functions/supplier/rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierConfiguration.java, in a kafka environment, I'm using dataflow and it is setting --spring.cloud.stream.bindings.output.destination=someTopic yet the messages are published to the topic rabbitSupplier-out-0, did I miss something in the implementation?, is there a way of saying that the FluxMessageChannel is the default output and topic bindings apply to it?
sumit1912
@sumit1912

Hi @olegz ,

I have created a simple Spring cloud stream apps(source, processor and sink) where the processor and sink are transferring reactive data.
https://github.com/sumit1912/springcloud.stream.pipeline

I deploy these apps in Spring Cloud Data Flow with RabbitMQ as broker. My apps work fine and transfer data when I use, springBootVersion = "2.1.9.RELEASE" and org.springframework.cloud:spring-cloud-stream-reactive:2.2.0.RELEASE dependency.

When I update Spring boot to v2.2.6.RELEASE and replace spring-cloud-stream-reactive by "org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR3" and "org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR3" I got below error:

ERROR 5206 — [ wer.http.wer-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binding.StreamListenerMessageHandler@39ce27f2]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `reactor.core.publisher.Flux` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (byte[])...

The error appears only after I replace spring-cloud-stream-reactive by spring-cloud-dependencies and spring-cloud-stream-dependencies and update Spring boot to 2.2.x

Is it because of the fact that spring-cloud-stream-reactive module is deprecated in spring-cloud-dependencies:Hoxton.SR3 and we need to use spring-cloud-function (Supplier/Function/Consumer model) for handling reactive messages?

sagarikamraj
@sagarikamraj
Hello Team. I am facing a weird issue. We are using using @StreamListener with condition. We have the condition as new String(payload).event.type matches 'MyEvent-Type'. This works fine in local Kafka, even in embedded kafka(integration-test). But when deployed to other environment for the same message i get Cannot find a @StreamListener matching for message".
Currently i do not have any more logs to provide more information.Sorry. Does someone have any idea why this could have happened? Is there any sort of setting i need to check in kafka or is it some problem with serializers etc?
2 replies
Moshe
@moshelav
when consumers contentiously revoking from a group since timeout, the group will count less and less consumers until it left 0 consumers, it obvious we need to increase the timeout and improve performance... but still, is there a possibility to join back the consumers to the group without restart the microservice?
yand
@yand

Hello, I have the following setup for consumer, it is working for the first message, and then I get an exception:

INFO 47834 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application-1.myConsumer-in-0' has 0 subscriber(s).
ERROR 47834 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.myConsumer-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload={...}]
public interface Sink<I> extends Consumer<Flux<I>> {

  @Override
  default void accept(Flux<I> input) {
    consume(input);
  }

  void consume(Flux<I> input);
}
@Component("myConsumer")
public class MyConsumer implements Sink<MyMessage> {

  public MyConsumer() {
  }

  @Override
  public void consume(Flux<MyMessage> input) {
    input
      .transform(...)
      .transform(...)
      .subscribe();
  }
}
spring:
  cloud:
    stream:
      function:
        definition: myConsumer
      bindings:
        myConsumer-in-0:
          destination: ...
          group: myConsumerGroup
          consumer:
            autoRebalanceEnabled: true
            partitioned: true
            maxAttempts: 3
      kafka:
        bindings:
          myConsumer-in-0:
            consumer:
              configuration:
                value:
                  deserializer: ...

Not sure what am I doing wrong, any help is appreciated

natraj09
@natraj09
Hi Team , I am reading through the docs here https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling , how does re-queue of the message work in case of kafka. Does it require any special configuration? I have maxAttempts: 3 . Here's my sample project https://github.com/natraj09/spring-cloud-stream-playground/blob/master/reactive-processor-kafka/src/main/java/reactive/kafka/ReactiveProcessorApplication.java#L52, if the rest payload has "error:true" it prints an error and throws an Exception. I only see the info log once instead of 3 times.
2 replies
GamiiisKth
@GamiiisKth
Hi, I can't set the producer properties delivery.timeout.ms, I got The configuration 'delivery.timeout.ms' was supplied but isn't a known config.
natraj09
@natraj09
Is it possible to map multiple destination to the same function. Something like this (but its not a valid yaml , as the key is the same)
testProcess-in-0:
destination: input-1
testProcess-in-0:
destination:  input-2
Gary Russell
@garyrussell
destination=input-1,input-2
1 reply
By default, this will create 2 bindings (listener containers). If you set the multiplex consumer property to true, you'll get a single binding consuming from both destinations.
boonware
@boonware
Hi all. I'm trying to ensure that applications always use round robin partitioning with sending to the DLQ. In the docs it says I should implement a DlqPartitionFunction bean, but looking at the parameters, it's not clear how I could implement round robin, since I must always return the actual partition number and there isn't any info on the number of partitions for the DLQ topic. Any advise? Reference: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka-dlq-processing
1 reply
JDGale8
@JDGale8

Hey, I have a question for some of the motivation for the new test style for the test binders.
Unlike all other spring tests, these are no longer leveraging @Autowired and instead using this context builder, like

context = new SpringApplicationBuilder(
            TestChannelBinderConfiguration.getCompleteConfiguration(Application.class)
            .run("--server.port=0",
                      "--spring.cloud.function.definition=listener");

This is a huge departure from the standard use of @SpringBootTest and @RunWith(SpringRunner.class). does anyone know why this is happening? Is it out of necessity, is the direction that Spring is moving in? I haven't been able to find any documentation or blog posts about the motivation behind this.
It's making rewriting existing tests really challenging, and it feels like a code smell to rewrite the existing tests is a far more verbose way.

1 reply
natraj09
@natraj09
@garyrussell I tried creating 2 bindings to the same function . The original topic and DLQ topic are both binded to the same function (sample configuration below). I found the message is not being sent to dlq on error https://github.com/natraj09/spring-cloud-stream-playground/blob/master/reactive-processor-kafka/src/main/java/reactive/kafka/ReactiveProcessorApplication.java#L54 . I came across this open issue spring-cloud/spring-cloud-stream#1922 , looks like DLQ is not supported for reactive function. Instead of throwing an exception can I use sendto destination to route it to error channel Mono.just(MessageBuilder.fromMessage(inbound).setHeader("spring.cloud.stream.sendto.destination", "rest-input-error").build()) . What is the recommended pattern to handle errors in case of reactive function?
spring:
  cloud:
    stream:
      function.definition: testProcess;testSource;testSink
      bindings:
        testProcess-in-0:
          destination: rest-input,rest-input-error
          group: input-group
          maxAttempts: 1
        testProcess-out-0:
          destination: rest-output
        testSink-in-0:
          destination: rest-output
          group: sink-group
        testSource-out-0:
          destination: rest-input
      kafka:
        bindings:
          testProcess-in-0:
            consumer:
              enableDlq: true
              dlqName: rest-input-error
3 replies
Andrey Shirobokov
@ashirobokov
Hi!
I'm using spring cloud streams 3.0.4 RELEASE (Hoxton.SR4). Is there any code sample of using bean validation (Function<@Valid Person, Person> uppercase()) with functional approach or not?
Oleg Zhurakousky
@olegz
no, and if anything that would be on spring-cloud-function side
feel free to raise the issue there
Andrey Shirobokov
@ashirobokov
ok, thanks!
sakella1986
@sakella1986
@sobychacko and all, quick question on improving kafka message consumer performance. we have 12 partitions in topic, and publishing at a rate of 200 TPS. after receiving the message (we use 12 sprintboot containers for reading from all partitions), we need to make an external vendor call which takes 600 ms which means I can only support 20 TPS. What options I have to achieve 200 TPS message consumption. Appreciate your help!
20 replies
Nadavbi87
@Nadavbi87
Hi,
Did the start-scs.cfapps.io site move to a new domain?
Soby Chacko
@sobychacko
@Nadavbi87 It's up and running now. There were some issues before.
Nadavbi87
@Nadavbi87
@sobychacko
Thanks!
sumit1912
@sumit1912

Hi team,

I have created 3 simple Spring cloud stream apps(Source/Processor/Sink) using Spring cloud function approach transferring Flux<String>.

Source-app:

@SpringBootApplication
public class SourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }

    @PollableBean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> {
            String v1 = String.valueOf("abc");
            String v2 = String.valueOf("pqr");
            String v3 = String.valueOf("xyz");
            return Flux.just(v1, v2, v3);
        };
    }
}

Processor-app:

@SpringBootApplication
public class ProcessorApplication {

    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase()).log();
    }

    public static void main(String[] args) {
        SpringApplication.run(ProcessorApplication.class, args);
    }
}

Sink-app:

@SpringBootApplication
public class SinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }

    @Bean
    public Consumer<Flux<String>> log() {
        return flux -> {
            flux.subscribe(f -> System.out.println("Received data: " + f));
        };
    }
}

The dependencies that I have added are:

SpringBoot version = 2.2.6.RELEASE

implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit")
implementation("org.springframework.cloud:spring-cloud-starter-function-webflux:3.0.7.RELEASE")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-webflux")

I have registered these these apps in Spring Cloud Data Flow and deployed in a stream.

I am able to transmit data to these apps and receive output both via HTTP and via RabbitMQ individually. But, the message is not communicated across the apps(Source->Processor->sink).
Am I missing any dependency, annotation or application property.
I am new to Spring cloud stream using function approach, any suggestion will be a great help :)

Complete code can be found here:https://github.com/sumit1912/springcloud.function.stream.pipeline

Soby Chacko
@sobychacko
@sumit1912 Didn't you ask and get an answer on SO regarding this?
Omkar Shetkar
@Omkar-Shetkar

Hi All,
Recently I started learning and using Spring Cloud Stream. I liked the way Spring Cloud Functions simplify message publishing and consumption. But in one of my application, Beans corresponding to Functions are not detected and hence not binded to destination.
When I compared this with my sample working version, difference is, in not-working version I have a module dependency which internally uses Spring Cloud Stream. This module uses @EnableBinding annotation.
In my working version If I put this annotation, I reproduce the same issue.
It seems there is a conflict between @EnableBinding and functional support.
Any hints or suggestions will be really helpful.
Thanks.


In unit test, when I try to receive the message using OutputDestination, I get following error:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34) at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:37) at com.example.loggingconsumer.LoggingConsumerApplicationTests.contextLoads(LoggingConsumerApplicationTests.java:50) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) ...

Omkar Shetkar
@Omkar-Shetkar
Not working version with @EnableBinding annotation. If I just remove this annotation it works!
@SpringBootApplication
@Controller
@EnableBinding
public class LoggingConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    @Bean
    public Consumer<CloudEventImpl<Person>> consumer() {
        return p -> {
            System.out.println("consumer--Received: " + p);

        };
    }
Soby Chacko
@sobychacko
@Omkar-Shetkar You cannot mix functional binding and the annotation approach in the regular message channel-based binders (Kafka Streams binder allows that in the current versions).
sumit1912
@sumit1912

@sumit1912 Didn't you ask and get an answer on SO regarding this?

Thanks @sobychacko for the reply. Answer given at SO worked like a charm.