Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 20 22:17
    garyrussell commented #2036
  • Oct 20 21:27
    0x006EA1E5 commented #2036
  • Oct 20 20:59
    garyrussell commented #2036
  • Oct 20 20:59
    garyrussell commented #2036
  • Oct 20 20:58
    garyrussell commented #2036
  • Oct 20 20:58
    garyrussell labeled #2036
  • Oct 20 20:58
    garyrussell labeled #2036
  • Oct 20 20:11
    0x006EA1E5 commented #2036
  • Oct 20 20:09
    0x006EA1E5 commented #2036
  • Oct 20 20:02
    0x006EA1E5 commented #2036
  • Oct 20 20:01
    0x006EA1E5 opened #2036
  • Oct 20 15:10
    Chr3is commented #2035
  • Oct 20 14:48
    garyrussell commented #2035
  • Oct 20 14:47
    garyrussell commented #2035
  • Oct 20 14:47
    garyrussell commented #2035
  • Oct 20 14:40
    Chr3is commented #2035
  • Oct 20 13:57
    olegz commented #2034
  • Oct 20 13:53
    olegz commented #2035
  • Oct 20 13:43

    olegz on master

    Change function to snapshot Cleanup and refactor to accomod… (compare)

  • Oct 20 11:26
    Chr3is commented #2035
dvirgiln
@dvirgiln

Hello,

I am following the example from https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution

I am receiving an error:

ailed to invoke method; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"HelloWorld","namespace":"com.example.avro","fields":[{"name":"transactionId","type":"string"},{"name":"message","type":"string"},{"name":"field1","type":["null","int"]}]} (through reference chain: com.example.HelloWorld["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])

My Avro file is quite simple:

{
    "namespace": "com.hollandandbarrett.dns.kafkaworkshop.avro",
    "type": "record",
    "name": "HelloWorld",
    "fields": [
        {"name": "transactionId", "type": "string"},
        {"name": "message", "type": "string"},
        {"name": "field1", "type": ["null", "int"]}
    ]
}

The application.yml is:

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamic-schema-generation-enabled: false
      bindings:
        output:
          contentType: application/*+avro
          destination: HelloWorld
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: localhost:29092
          auto-create-topics: true

        bindings:
          output:

            producer:
              configuration:
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
server.port: 9010
Any idea why is not working?
Why is it trying to use the Json serialization instead of the Avro one?
Soby Chacko
@sobychacko
@dvirgiln Did you get that example working properly? Or getting that error while running that?
dvirgiln
@dvirgiln
It works perfectly if I do not include the union operator
1 reply
{"name": "field1", "type": ["null", "int"]}
somehow it is trying to use the Jackson serde
dvirgiln
@dvirgiln
The same error happens when I execute the producer 2 on the example
https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution
mvn clean compile package
java -jar schema-registry-confluent-avro-serializer-producer2/target/schema-registry-confluent-avro-serializer-producer2-0.0.1-SNAPSHOT.jar
dvirgiln
@dvirgiln
I have used the same docker as it appears in the README file:
 docker-compose -f docker-compose-control-center.yaml up -d
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Angshuman Agarwal
@angshuman-agarwal

@sobychacko : I have a naive question related to one sample here

How do you know process-out-0, process-out-1, process-out-2 mappings to english, french and spanish respectively and why not to any other order of process-out-* index ?

spring.cloud.stream.bindings.process-out-0:
  destination: english-counts
spring.cloud.stream.bindings.process-out-1:
  destination: french-counts
spring.cloud.stream.bindings.process-out-2:
  destination: spanish-counts
4 replies
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

@sobychacko : Could you give some suggestions on how can I use Flux of org.springframework.messaging.Message?

Thank you

6 replies
Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Has anyone tried this please ?

Andras Hatvani
@andrashatvani
hi, does spring-cloud-stream support deduping based on the message key?
Víctor Reyes Rodríguez
@vicrdguez

Hi!
I have a problem trying to configure the applicationId for my processors. This is my application.yml

spring.cloud:
    stream:
      function:
        definition: changePriority;addComment
      kafka:
        streams:
          binder:
            functions:
              changePriority.applicationId: proxy-sbn-changePriority
              addComment.applicationId: proxy-sbn-addComment

Anyways when I run my app I still getting anonymous consumers. Any hint?
Thanks!

5 replies
Wil Collins
@wilcollins
Hi, our team is using a RabbitMQ backend and we need message deduplication so we are considering using this community-provided plugin https://github.com/noxdafox/rabbitmq-message-deduplication , however, it requires specific arguments to be provided to the Exchange when it is created and, as far as I know, there are currently no existing facilities to do this through the provided configuration class so I have a few questions about that:
1) is there an existing solution that I am overlooking?
2) if no, is this functionality that already exists on any roadmaps for implementation?
3) if not or if that timeline is reasonably long, would this be a feature that would be a welcomed contribution from the community?
2 replies
shipra chaudhary
@ShipraShipra04_twitter
Hi team, I am trying to create SCDF task which use jdbc dependancy (org.springframework.boot:spring-boot-starter-jdbc)to store metadata. In spring boot app I am using r2dbc dependency(io.r2dbc:r2dbc-postgresql:0.8.2.RELEASE) to get data from postgresql and post it to some rest point.
I am getting error I tried to follow below link https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/multiple-datasources
but i am not sure what will be my first and second EmbeddedDatabaseType in this Scenario please help.
Jon Harper
@jonenst
Hi, is it possible to use the new test binder ( https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#_test_binder_usage ) with "normal" annotation based Tests controlling the context ? something like this
@SpringBootApplication
public class MyApp() {
   @Bean
    public Supplier<Flux<Message<String>>> mypublisher() {
        ...
    }

   public void publishone() {
      //some function called by the app to send messages to the mypublisher flux
   }
}

@SpringBootTest(...)
public class MyTest {
  @AutoWired
   OutputDestination output;

  @Test mytest() {
     //context setup automatically
     app.publishone()
     assertThat(collector.receive().getPayload(), equalTo("hello".getBytes()));
  }

}
Oleg Zhurakousky
@olegz
yes, we have tests for that
what I don't understand from your code is the collector part
Jon Harper
@jonenst
Hi,
thanks for taking the time to answer
collector should have been "output"
Oleg Zhurakousky
@olegz
got it
Jon Harper
@jonenst
the mypublisher bean stores a fluxsink used by the app (when an http request comes in) to send messages to the flux to send a message to a broker queue
I just want my test of the http request to also assert that a message has been sent to to the queue
I don't use @EnableBinding, I use spring function definitions
Oleg Zhurakousky
@olegz
ok, then I missunderstod yoru question
Jon Harper
@jonenst
My test already exists, it uses regular spring annotations to make a test context; Ideally I would just get a bean using @Autowired and be able to assert that messages are send to the queue. Do you know if it's possible ?
you can type in here and I'll look later
Jon Harper
@jonenst
no problem, thank you for your time

I see the configuration, but it's still used in the @Test public void testGh1973() by creating the context manually

    try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
                TestChannelBinderConfiguration.getCompleteConfiguration(SupplierAndProcessorConfiguration.class))
                        .web(WebApplicationType.NONE).run("--spring.jmx.enabled=false",
                                "--spring.cloud.function.definition=echo;supplier",
                                "--spring.cloud.stream.bindings.supplier-out-0.destination=output",
                                "--spring.cloud.stream.bindings.echo-out-0.destination=output")) {

I want to get rid of this, in particular the "getCompleteConfiguration(...)" call; and instead use the regular context and application.yml of my app. Do you know if it's possible ?

using the @SpringRunner or @SpringExtension or @SpringBootTest
Jon Harper
@jonenst
Thanks a lot ! You solved my problem. I hadn't looked at the prerelease docs. Adding @Import(TestChannelBinderConfiguration.class) did the trick. Also, instead of using Function<String,String>, I'm using Supplier<Flux<String>> , and in this case I had to use receive(timeout) because the plain receive() returned null.
Jon Harper
@jonenst
by the way it was a very good idea to add it to the docs in the next version, I think that's what most people look for when writing this kind of test. Good job
Angshuman Agarwal
@angshuman-agarwal

Hi,
In Spring Cloud Function (KAFKA), how do I control the lifetime of Producer please ? I want to control it via a REST endpoint on a per request basis (like new Producer for every request)

Currently, I see, as soon as I start my app, my Supplier function gets called. I want to disable this abstract polling from framework and control it myself please.

It is mapped like this :

stream.cloud.stream.kafka.bindings.producer-out-0 : myTopic
stream.cloud.function.definition : producer (where producer is the name of my Supplier<String> function).

Please can someone help on how to achieve that ?

boonware
@boonware
Hi there. Is it possible to write (delete) from a Kafka Streams state store with Spring Cloud Stream? Typically I access state store with InteractiveQueryService, however, that only provides read-only state stores. Is there a way to get a reference (preferably via an injected bean) to a state store with write access?
3 replies
Víctor Reyes Rodríguez
@vicrdguez

Hi folks!
I have 2 instances of the same kafka streams application deployed, One of the processors is a Supplier that queries an state store to send a record to a topic from it each second.
For some reason I don't know, one of the instances is returning this error all the time:
java.lang.IllegalStateException: Error when retrieving state store: j event-state
I'm using this configs to trying to fix this with no success:

spring.cloud.stream:
  kafka:
    streams:
      binder:
        stateStoreRetry:
          maxAttempts: 5
          backOffInterval: 3000

Maybe you can give me some Ideas. Thanks!

4 replies
See-no-evil
@See-no-evil
Hey all, does anyone know if it's possible to implement a non blocking retry like this https://eng.uber.com/reliable-reprocessing/ with Spring Cloud Stream (Kafka). Is there a way without doing everything manually? For Spring Kafka I found spring-projects/spring-kafka#1229
madhurjain12
@madhurjain12
Hi everyone I am trying a simple Spring Cloud functional App with schema registry and having some problems . I have pasted my yaml file , Function and the error below
```
madhurjain12
@madhurjain12
spring:
  application:
    name: app-name
  profiles:
    active: ${env:local}
  cloud:
   stream:
    function:
      definition: update
    bindings:
      update-in-0:
        contentType: avro/bytes
        destination: inputTopic
      update-out-0:
        contentType: avro/bytes
        destination: outputTopic
    kafka:
      streams:
       binder:
        brokers:  broker1:9092, broker2:9093, broker3:9094
        configuration:
          bootstrap-servers: broker1:9092, broker2:9093, broker3:9094
          commit.interval.ms: 1000
          default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          schema.registry.url: http://avro-schema-registry (not pasting the actual url)
          auto.offset.reset: latest
          specific.avro.reader: true
        application-id: appConsumerGroup
       bindings:
         update-in-0:
           consumer:
             useNativeDecoding: true
             keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
             ValueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
         update-out-0:
           producer:
             useNativeEncoding: true
             keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
             ValueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
    schema-registry-client:
      endpoint: http://avro-schema-registry (not pasting the actual url)

Function is

@Component
@Slf4j
public class StreamProcessor {

    @Autowired
    SomeService someService;

    @Bean
    public Function<KStream<String, Employee>, KStream<String, Employee>> update() {
        log.info("Processing...");
        return input -> {
            try {
                return input.map((key,value)-> new KeyValue<>(key, someService.someMethod(value)));
            } catch (Exception e) {
                log.error("Caught exception: {}", e.getMessage());
                return null;
            }
        };
    }

}

error is

java.lang.ClassCastException: class com.sun.proxy.$Proxy114 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy114 is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @5040c109; org.springframework.messaging.MessageChannel is in unnamed module of loader 'app')
3 replies
madhurjain12
@madhurjain12
i have this figured out
Taras Danylchuk
@tdanylchuk

Hello guys, having interesting case with spring cloud stream function + reactor + dlq.

spring.cloud.stream:
  kafka.bindings:
    rewardable:
      consumer:
        enableDlq: true
        dlqName: rewardable_dql
  bindings:
    rewardable:
      group: ${spring.application.name}
      content-type: application/json
      consumer.max-attempts: 1
  function:
    bindings:
      rewardableEventsConsumer-in-0: rewardable
    definition: rewardableEventsConsumer

and java consumer

  @Override
    public void accept(Flux<RewardRequest> rewardsFlux) {
        rewardsFlux
                .flatMap(rewardableService::reward)
                .subscribe();
    }

So on exception in flux rewardableService::reward I expect my message to be sent to DLQ, and porcess further messages from rewards topic.

But in fact if exception is thrown message is actually sending to DLQ, but : IntegrationReactiveUtils.adaptSubscribableChannelToPublisher

return publisher
                    .doOnCancel(() -> inputChannel.unsubscribe(messageHandler))

unsubscribe is being performed and my flux no longer receive messages.

Do you guys have any idea how should I configure my reactive function in order to continues processing and send error messages to DLQ?

4 replies
kenn-chen
@kenn-chen
Hi guys, is it possible to set rabbitmq policies with Spring Cloud Stream?
2 replies
boonware
@boonware
Hi all. With the Kafka Streams binder, is it possible to write to a state store outside of a Processor context? I have a use case where a scheduled thread reads out from a state store that is populated by a KTable binding and removes entries from the state store. The write operation seems to work, but I get IllegalStateException: This should not happen as timestamp() should only be called while a record is processed because I'm accessing the store outside of a processing context. Just swallowing the exception seems ugly. Is there a better way to do this?
1 reply
iguissouma
@iguissouma
I have a question, In case we are manually acknowledging kafka messages and using condition with the StreamListener, what happens if no condition is matched. does the offset get commited or no?
2 replies
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT, condition = "new String(headers['type']) == 'type1'")
 public void processt1(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }



 @StreamListener(Sink.INPUT, condition = "new String(headers['type']) == 'type2'")
 public void processt2(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}