Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 22:17
    garyrussell commented #2036
  • 21:27
    0x006EA1E5 commented #2036
  • 20:59
    garyrussell commented #2036
  • 20:59
    garyrussell commented #2036
  • 20:58
    garyrussell commented #2036
  • 20:58
    garyrussell labeled #2036
  • 20:58
    garyrussell labeled #2036
  • 20:11
    0x006EA1E5 commented #2036
  • 20:09
    0x006EA1E5 commented #2036
  • 20:02
    0x006EA1E5 commented #2036
  • 20:01
    0x006EA1E5 opened #2036
  • 15:10
    Chr3is commented #2035
  • 14:48
    garyrussell commented #2035
  • 14:47
    garyrussell commented #2035
  • 14:47
    garyrussell commented #2035
  • 14:40
    Chr3is commented #2035
  • 13:57
    olegz commented #2034
  • 13:53
    olegz commented #2035
  • 13:43

    olegz on master

    Change function to snapshot Cleanup and refactor to accomod… (compare)

  • 11:26
    Chr3is commented #2035
dvirgiln
@dvirgiln
I have used the same docker as it appears in the README file:
 docker-compose -f docker-compose-control-center.yaml up -d
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Angshuman Agarwal
@angshuman-agarwal

@sobychacko : I have a naive question related to one sample here

How do you know process-out-0, process-out-1, process-out-2 mappings to english, french and spanish respectively and why not to any other order of process-out-* index ?

spring.cloud.stream.bindings.process-out-0:
  destination: english-counts
spring.cloud.stream.bindings.process-out-1:
  destination: french-counts
spring.cloud.stream.bindings.process-out-2:
  destination: spanish-counts
4 replies
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

@sobychacko : Could you give some suggestions on how can I use Flux of org.springframework.messaging.Message?

Thank you

6 replies
Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Has anyone tried this please ?

Andras Hatvani
@andrashatvani
hi, does spring-cloud-stream support deduping based on the message key?
Víctor Reyes Rodríguez
@vicrdguez

Hi!
I have a problem trying to configure the applicationId for my processors. This is my application.yml

spring.cloud:
    stream:
      function:
        definition: changePriority;addComment
      kafka:
        streams:
          binder:
            functions:
              changePriority.applicationId: proxy-sbn-changePriority
              addComment.applicationId: proxy-sbn-addComment

Anyways when I run my app I still getting anonymous consumers. Any hint?
Thanks!

5 replies
Wil Collins
@wilcollins
Hi, our team is using a RabbitMQ backend and we need message deduplication so we are considering using this community-provided plugin https://github.com/noxdafox/rabbitmq-message-deduplication , however, it requires specific arguments to be provided to the Exchange when it is created and, as far as I know, there are currently no existing facilities to do this through the provided configuration class so I have a few questions about that:
1) is there an existing solution that I am overlooking?
2) if no, is this functionality that already exists on any roadmaps for implementation?
3) if not or if that timeline is reasonably long, would this be a feature that would be a welcomed contribution from the community?
2 replies
shipra chaudhary
@ShipraShipra04_twitter
Hi team, I am trying to create SCDF task which use jdbc dependancy (org.springframework.boot:spring-boot-starter-jdbc)to store metadata. In spring boot app I am using r2dbc dependency(io.r2dbc:r2dbc-postgresql:0.8.2.RELEASE) to get data from postgresql and post it to some rest point.
I am getting error I tried to follow below link https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/multiple-datasources
but i am not sure what will be my first and second EmbeddedDatabaseType in this Scenario please help.
Jon Harper
@jonenst
Hi, is it possible to use the new test binder ( https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#_test_binder_usage ) with "normal" annotation based Tests controlling the context ? something like this
@SpringBootApplication
public class MyApp() {
   @Bean
    public Supplier<Flux<Message<String>>> mypublisher() {
        ...
    }

   public void publishone() {
      //some function called by the app to send messages to the mypublisher flux
   }
}

@SpringBootTest(...)
public class MyTest {
  @AutoWired
   OutputDestination output;

  @Test mytest() {
     //context setup automatically
     app.publishone()
     assertThat(collector.receive().getPayload(), equalTo("hello".getBytes()));
  }

}
Oleg Zhurakousky
@olegz
yes, we have tests for that
what I don't understand from your code is the collector part
Jon Harper
@jonenst
Hi,
thanks for taking the time to answer
collector should have been "output"
Oleg Zhurakousky
@olegz
got it
Jon Harper
@jonenst
the mypublisher bean stores a fluxsink used by the app (when an http request comes in) to send messages to the flux to send a message to a broker queue
I just want my test of the http request to also assert that a message has been sent to to the queue
I don't use @EnableBinding, I use spring function definitions
Oleg Zhurakousky
@olegz
ok, then I missunderstod yoru question
Jon Harper
@jonenst
My test already exists, it uses regular spring annotations to make a test context; Ideally I would just get a bean using @Autowired and be able to assert that messages are send to the queue. Do you know if it's possible ?
you can type in here and I'll look later
Jon Harper
@jonenst
no problem, thank you for your time

I see the configuration, but it's still used in the @Test public void testGh1973() by creating the context manually

    try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
                TestChannelBinderConfiguration.getCompleteConfiguration(SupplierAndProcessorConfiguration.class))
                        .web(WebApplicationType.NONE).run("--spring.jmx.enabled=false",
                                "--spring.cloud.function.definition=echo;supplier",
                                "--spring.cloud.stream.bindings.supplier-out-0.destination=output",
                                "--spring.cloud.stream.bindings.echo-out-0.destination=output")) {

I want to get rid of this, in particular the "getCompleteConfiguration(...)" call; and instead use the regular context and application.yml of my app. Do you know if it's possible ?

using the @SpringRunner or @SpringExtension or @SpringBootTest
Jon Harper
@jonenst
Thanks a lot ! You solved my problem. I hadn't looked at the prerelease docs. Adding @Import(TestChannelBinderConfiguration.class) did the trick. Also, instead of using Function<String,String>, I'm using Supplier<Flux<String>> , and in this case I had to use receive(timeout) because the plain receive() returned null.
Jon Harper
@jonenst
by the way it was a very good idea to add it to the docs in the next version, I think that's what most people look for when writing this kind of test. Good job
Angshuman Agarwal
@angshuman-agarwal

Hi,
In Spring Cloud Function (KAFKA), how do I control the lifetime of Producer please ? I want to control it via a REST endpoint on a per request basis (like new Producer for every request)

Currently, I see, as soon as I start my app, my Supplier function gets called. I want to disable this abstract polling from framework and control it myself please.

It is mapped like this :

stream.cloud.stream.kafka.bindings.producer-out-0 : myTopic
stream.cloud.function.definition : producer (where producer is the name of my Supplier<String> function).

Please can someone help on how to achieve that ?

boonware
@boonware
Hi there. Is it possible to write (delete) from a Kafka Streams state store with Spring Cloud Stream? Typically I access state store with InteractiveQueryService, however, that only provides read-only state stores. Is there a way to get a reference (preferably via an injected bean) to a state store with write access?
3 replies
Víctor Reyes Rodríguez
@vicrdguez

Hi folks!
I have 2 instances of the same kafka streams application deployed, One of the processors is a Supplier that queries an state store to send a record to a topic from it each second.
For some reason I don't know, one of the instances is returning this error all the time:
java.lang.IllegalStateException: Error when retrieving state store: j event-state
I'm using this configs to trying to fix this with no success:

spring.cloud.stream:
  kafka:
    streams:
      binder:
        stateStoreRetry:
          maxAttempts: 5
          backOffInterval: 3000

Maybe you can give me some Ideas. Thanks!

4 replies
See-no-evil
@See-no-evil
Hey all, does anyone know if it's possible to implement a non blocking retry like this https://eng.uber.com/reliable-reprocessing/ with Spring Cloud Stream (Kafka). Is there a way without doing everything manually? For Spring Kafka I found spring-projects/spring-kafka#1229
madhurjain12
@madhurjain12
Hi everyone I am trying a simple Spring Cloud functional App with schema registry and having some problems . I have pasted my yaml file , Function and the error below
```
madhurjain12
@madhurjain12
spring:
  application:
    name: app-name
  profiles:
    active: ${env:local}
  cloud:
   stream:
    function:
      definition: update
    bindings:
      update-in-0:
        contentType: avro/bytes
        destination: inputTopic
      update-out-0:
        contentType: avro/bytes
        destination: outputTopic
    kafka:
      streams:
       binder:
        brokers:  broker1:9092, broker2:9093, broker3:9094
        configuration:
          bootstrap-servers: broker1:9092, broker2:9093, broker3:9094
          commit.interval.ms: 1000
          default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          schema.registry.url: http://avro-schema-registry (not pasting the actual url)
          auto.offset.reset: latest
          specific.avro.reader: true
        application-id: appConsumerGroup
       bindings:
         update-in-0:
           consumer:
             useNativeDecoding: true
             keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
             ValueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
         update-out-0:
           producer:
             useNativeEncoding: true
             keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
             ValueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
    schema-registry-client:
      endpoint: http://avro-schema-registry (not pasting the actual url)

Function is

@Component
@Slf4j
public class StreamProcessor {

    @Autowired
    SomeService someService;

    @Bean
    public Function<KStream<String, Employee>, KStream<String, Employee>> update() {
        log.info("Processing...");
        return input -> {
            try {
                return input.map((key,value)-> new KeyValue<>(key, someService.someMethod(value)));
            } catch (Exception e) {
                log.error("Caught exception: {}", e.getMessage());
                return null;
            }
        };
    }

}

error is

java.lang.ClassCastException: class com.sun.proxy.$Proxy114 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy114 is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @5040c109; org.springframework.messaging.MessageChannel is in unnamed module of loader 'app')
3 replies
madhurjain12
@madhurjain12
i have this figured out
Taras Danylchuk
@tdanylchuk

Hello guys, having interesting case with spring cloud stream function + reactor + dlq.

spring.cloud.stream:
  kafka.bindings:
    rewardable:
      consumer:
        enableDlq: true
        dlqName: rewardable_dql
  bindings:
    rewardable:
      group: ${spring.application.name}
      content-type: application/json
      consumer.max-attempts: 1
  function:
    bindings:
      rewardableEventsConsumer-in-0: rewardable
    definition: rewardableEventsConsumer

and java consumer

  @Override
    public void accept(Flux<RewardRequest> rewardsFlux) {
        rewardsFlux
                .flatMap(rewardableService::reward)
                .subscribe();
    }

So on exception in flux rewardableService::reward I expect my message to be sent to DLQ, and porcess further messages from rewards topic.

But in fact if exception is thrown message is actually sending to DLQ, but : IntegrationReactiveUtils.adaptSubscribableChannelToPublisher

return publisher
                    .doOnCancel(() -> inputChannel.unsubscribe(messageHandler))

unsubscribe is being performed and my flux no longer receive messages.

Do you guys have any idea how should I configure my reactive function in order to continues processing and send error messages to DLQ?

4 replies
kenn-chen
@kenn-chen
Hi guys, is it possible to set rabbitmq policies with Spring Cloud Stream?
2 replies
boonware
@boonware
Hi all. With the Kafka Streams binder, is it possible to write to a state store outside of a Processor context? I have a use case where a scheduled thread reads out from a state store that is populated by a KTable binding and removes entries from the state store. The write operation seems to work, but I get IllegalStateException: This should not happen as timestamp() should only be called while a record is processed because I'm accessing the store outside of a processing context. Just swallowing the exception seems ugly. Is there a better way to do this?
1 reply
iguissouma
@iguissouma
I have a question, In case we are manually acknowledging kafka messages and using condition with the StreamListener, what happens if no condition is matched. does the offset get commited or no?
2 replies
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT, condition = "new String(headers['type']) == 'type1'")
 public void processt1(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }



 @StreamListener(Sink.INPUT, condition = "new String(headers['type']) == 'type2'")
 public void processt2(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}
boonware
@boonware
Hi. I've recently switched an application from the Kafka binder 2.X line to 3.X. I've noticed now that there seems to be some delay in messages being processed in the @StreamListener method (KTable). It typically takes around 10 seconds after a message is sent to the topic before the code kicks in. I haven't seen this behaviour/delay with previous versions of the library. Just on the offchange, is there any newly introduced setting you can think of that might be at play here?
1 reply
winkidzz
@winkidzz
Hello Everyone, I am trying out multi binder kerberos with scs. spring-cloud/spring-cloud-stream#1404 issue seems to happen. only one broker or first broker gets picked up.
9 replies
Sergey
@Fonexn
Hi guys, anyone know why I get this WARN when I use SSL? All works fine, but I get WARN's...
2020-06-25 13:11:58.978 WARN 25471 --- [ main] o.a.k.clients.admin.AdminClientConfig : The configuration 'ssl.protocol' was supplied but isn't a known config. 2020-06-25 13:11:58.978 WARN 25471 --- [ main] o.a.k.clients.admin.AdminClientConfig : The configuration 'ssl.enabled.protocols' was supplied but isn't a known config. 2020-06-25 13:11:58.978 WARN 25471 --- [ main] o.a.k.clients.admin.AdminClientConfig : The configuration 'ssl.keystore.location' was supplied but isn't a known config. 2020-06-25 13:11:58.978 WARN 25471 --- [ main] o.a.k.clients.admin.AdminClientConfig : The configuration 'ssl.truststore.type' was supplied but isn't a known config. 2020-06-25 13:11:58.978 WARN 25471 --- [ main] o.a.k.clients.admin.AdminClientConfig : The configuration 'ssl.keystore.type' was supplied but isn't a known config. 2020-06-25 13:11:58.978 WARN 25471 --- [ main] o.a.k.clients.admin.AdminClientConfig : The configuration 'ssl.truststore.location' was supplied but isn't a known config. 2020-06-25 13:11:58.978 WARN 25471 --- [ main] o.a.k.clients.admin.AdminClientConfig : The configuration 'ssl.keystore.password' was supplied but isn't a known config. 2020-06-25 13:11:58.978 WARN 25471 --- [ main] o.a.k.clients.admin.AdminClientConfig : The configuration 'ssl.key.password' was supplied but isn't a known config. 2020-06-25 13:11:58.978 WARN 25471 --- [ main] o.a.k.clients.admin.AdminClientConfig : The configuration 'ssl.truststore.password' was supplied but isn't a known config.
winkidzz
@winkidzz
i have seen this warning in my kerberos setup too. everything works fine.
NeslihanKolukisa
@NeslihanKolukisa
Hello, I have a problem. I am developing a project using spring cloud stream. All employee services in the project are running on the same jvm. Therefore, I opened all the channels I used with the Input type. It is due to this that when I send a message it falls into the list before it falls into the queue. Listener is also changing a different message. How can I solve this.
Andras Hatvani
@andrashatvani
hi, since the outputs can only be kstreams, how can ktables be inputs? @sobychacko @garyrussell
2 replies
madhurjain12
@madhurjain12
Hi all in spring cloud streams Kafka streams I see there is a Dlq setting is there a retry setting before sending the message to dlq
1 reply