by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 06:22
    olegz commented #2021
  • Sep 24 15:43

    spring-buildmaster on master

    Bumping versions (compare)

  • Sep 24 15:28

    olegz on master

    Update reactor and function ver… (compare)

  • Sep 24 15:13

    olegz on master

    Update docs module artifact id (compare)

  • Sep 24 14:16

    olegz on master

    Fix docs links (compare)

  • Sep 24 12:21
    hdeweirdt commented on 9a906e7
  • Sep 24 07:23
    zergeborg commented #2019
  • Sep 24 06:33
    dotw commented #2020
  • Sep 23 15:55
    jonenst commented #1981
  • Sep 23 08:32
    olegz commented #1832
  • Sep 23 08:05
    dotw commented #1832
  • Sep 23 07:58
    dotw commented #1832
  • Sep 23 07:54
    dotw commented #1832
  • Sep 22 19:43

    olegz on 3.0.x

    GH-1981 Fix delayed subscriptio… (compare)

  • Sep 22 19:42

    olegz on master

    GH-1981 Fix delayed subscriptio… (compare)

  • Sep 22 18:33
    olegz commented #1956
  • Sep 22 18:32

    olegz on master

    GH-1956 Add support for Pollabl… (compare)

  • Sep 22 14:48

    olegz on 3.0.x

    Fix checkstyles (compare)

  • Sep 22 14:43

    olegz on 3.0.x

    Fix support for binding names f… (compare)

  • Sep 22 14:42

    olegz on master

    Fix support for binding names f… (compare)

Omkar Shetkar
@Omkar-Shetkar
Not working version with @EnableBinding annotation. If I just remove this annotation it works!
@SpringBootApplication
@Controller
@EnableBinding
public class LoggingConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    @Bean
    public Consumer<CloudEventImpl<Person>> consumer() {
        return p -> {
            System.out.println("consumer--Received: " + p);

        };
    }
Soby Chacko
@sobychacko
@Omkar-Shetkar You cannot mix functional binding and the annotation approach in the regular message channel-based binders (Kafka Streams binder allows that in the current versions).
sumit1912
@sumit1912

@sumit1912 Didn't you ask and get an answer on SO regarding this?

Thanks @sobychacko for the reply. Answer given at SO worked like a charm.

Omkar Shetkar
@Omkar-Shetkar

@Omkar-Shetkar You cannot mix functional binding and the annotation approach in the regular message channel-based binders (Kafka Streams binder allows that in the current versions).

OK... @sobychacko Thanks for the clarification...

boonware
@boonware
Hi. Is it possible to modify a message consumed via a @StreamListener method before sending it to the DLQ? My use case it to attach an error description to the message before routing it
I've tried mutating the object parameter to the method, but that doesn't seem to have any effect
Gonzalo Borobio
@gborobio73

Hello! I'm new to spring cloud stream and kafka and I could use some help. From the spring cloud samples, test embedded kafka (https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/testing-samples/test-embedded-kafka) I'm trying to modify the code so it uses KStream. The code inside EmbeddedKafkaApplication looks like this when using KStream:

    public Function<KStream<Object, byte[]>, KStream<Object, byte[]>> handle(){
        return in ->
            in.map((key, value) -> new KeyValue<Object, byte[]> (key, new String(value).toUpperCase().getBytes()));
    }

I have also modified the POM to include spring-cloud-stream-binder-kafka-stream

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

And also upgraded to newer Kafka as explained here: spring-cloud/spring-cloud-stream-samples#182
The issue I have is that when I run the test, I can see this error in the log:

2020-06-09 16:37:58.854 ERROR 6507 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [handle-applicationId-203b2c06-8fbf-4687-b8aa-71e231d46005-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance

org.apache.kafka.streams.errors.StreamsException: stream-thread [handle-applicationId-203b2c06-8fbf-4687-b8aa-71e231d46005-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
    at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216) ~[kafka-streams-2.3.1.jar:na]

And the tests eventually fails because

Expecting:
 <0>
to be equal to:
 <1>
but was not.

Any ideas on what I'm doing wrong? Thanks a lot!!

15 replies
natraj09
@natraj09
Is there an option to skip bad records with kafka binder , I see an option with kafka stream https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.2.1.RELEASE/spring-cloud-stream-binder-kafka.html#_handling_deserialization_exceptions . I am also using the reactive api , wondering if its something I have to handle on my own ?
Soby Chacko
@sobychacko
You can setup a DLQ for storing the bad records and get going with the processing of new records.
1 reply
ZhangXudong
@MLeft
image.png
1 reply
image.png
image.png
I bind a input channel, but why the ChannelInterceptor.preSend invoked, not the preReceive method?
dharezlak
@dharezlak

Hi, I have the following reactive consumer:

@Bean
public Function<Flux<String>, Mono<Void>> processor() {
    return messages -> messages
            .flatMap(storage::store)
            .then();
}

How is this going to behave with respect to Kafka message acknowledgement? I can't seem to find any documentation on this. Since storage::store is reactive as well I would assume the flatMap operator would parallelize message consumption here?

3 replies
shipra chaudhary
@ShipraShipra04_twitter

Hi Team, I am using reactive Spring data r2dbc repositories to get pojo's from Postgres DB

@Table("document")
public class Document {
@Id
private String id;
@Column(value = "authors_ids")
private Set<Author> authors;// object
}

Author is an pojo object authors_ids is array list in postgres Db

while trying to run i get below error because of custom conversion.how i can define Set<object> type in pojo to access authors_ids(array of string) field from postgres DB?

{
"timestamp": "2020-06-15T13:20:50.491+0000",
"status": 500,
"error": "Internal Server Error",
"message": "Could not read property @org.springframework.data.relational.core.mapping.Column(keyColumn=, value=authors_ids)private java.util.Set com.example.demo.Documents.authors from result set!",
"path": "/api/document"
}

dvirgiln
@dvirgiln

Hello,

I am following the example from https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution

I am receiving an error:

ailed to invoke method; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"HelloWorld","namespace":"com.example.avro","fields":[{"name":"transactionId","type":"string"},{"name":"message","type":"string"},{"name":"field1","type":["null","int"]}]} (through reference chain: com.example.HelloWorld["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])

My Avro file is quite simple:

{
    "namespace": "com.hollandandbarrett.dns.kafkaworkshop.avro",
    "type": "record",
    "name": "HelloWorld",
    "fields": [
        {"name": "transactionId", "type": "string"},
        {"name": "message", "type": "string"},
        {"name": "field1", "type": ["null", "int"]}
    ]
}

The application.yml is:

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamic-schema-generation-enabled: false
      bindings:
        output:
          contentType: application/*+avro
          destination: HelloWorld
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: localhost:29092
          auto-create-topics: true

        bindings:
          output:

            producer:
              configuration:
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
server.port: 9010
Any idea why is not working?
Why is it trying to use the Json serialization instead of the Avro one?
Soby Chacko
@sobychacko
@dvirgiln Did you get that example working properly? Or getting that error while running that?
dvirgiln
@dvirgiln
It works perfectly if I do not include the union operator
1 reply
{"name": "field1", "type": ["null", "int"]}
somehow it is trying to use the Jackson serde
dvirgiln
@dvirgiln
The same error happens when I execute the producer 2 on the example
https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution
mvn clean compile package
java -jar schema-registry-confluent-avro-serializer-producer2/target/schema-registry-confluent-avro-serializer-producer2-0.0.1-SNAPSHOT.jar
dvirgiln
@dvirgiln
I have used the same docker as it appears in the README file:
 docker-compose -f docker-compose-control-center.yaml up -d
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Angshuman Agarwal
@angshuman-agarwal

@sobychacko : I have a naive question related to one sample here

How do you know process-out-0, process-out-1, process-out-2 mappings to english, french and spanish respectively and why not to any other order of process-out-* index ?

spring.cloud.stream.bindings.process-out-0:
  destination: english-counts
spring.cloud.stream.bindings.process-out-1:
  destination: french-counts
spring.cloud.stream.bindings.process-out-2:
  destination: spanish-counts
4 replies
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

@sobychacko : Could you give some suggestions on how can I use Flux of org.springframework.messaging.Message?

Thank you

6 replies
Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Has anyone tried this please ?

Andras Hatvani
@andrashatvani
hi, does spring-cloud-stream support deduping based on the message key?
Víctor Reyes Rodríguez
@vicrdguez

Hi!
I have a problem trying to configure the applicationId for my processors. This is my application.yml

spring.cloud:
    stream:
      function:
        definition: changePriority;addComment
      kafka:
        streams:
          binder:
            functions:
              changePriority.applicationId: proxy-sbn-changePriority
              addComment.applicationId: proxy-sbn-addComment

Anyways when I run my app I still getting anonymous consumers. Any hint?
Thanks!

5 replies
Wil Collins
@wilcollins
Hi, our team is using a RabbitMQ backend and we need message deduplication so we are considering using this community-provided plugin https://github.com/noxdafox/rabbitmq-message-deduplication , however, it requires specific arguments to be provided to the Exchange when it is created and, as far as I know, there are currently no existing facilities to do this through the provided configuration class so I have a few questions about that:
1) is there an existing solution that I am overlooking?
2) if no, is this functionality that already exists on any roadmaps for implementation?
3) if not or if that timeline is reasonably long, would this be a feature that would be a welcomed contribution from the community?
2 replies
shipra chaudhary
@ShipraShipra04_twitter
Hi team, I am trying to create SCDF task which use jdbc dependancy (org.springframework.boot:spring-boot-starter-jdbc)to store metadata. In spring boot app I am using r2dbc dependency(io.r2dbc:r2dbc-postgresql:0.8.2.RELEASE) to get data from postgresql and post it to some rest point.
I am getting error I tried to follow below link https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/multiple-datasources
but i am not sure what will be my first and second EmbeddedDatabaseType in this Scenario please help.
Jon Harper
@jonenst
Hi, is it possible to use the new test binder ( https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#_test_binder_usage ) with "normal" annotation based Tests controlling the context ? something like this
@SpringBootApplication
public class MyApp() {
   @Bean
    public Supplier<Flux<Message<String>>> mypublisher() {
        ...
    }

   public void publishone() {
      //some function called by the app to send messages to the mypublisher flux
   }
}

@SpringBootTest(...)
public class MyTest {
  @AutoWired
   OutputDestination output;

  @Test mytest() {
     //context setup automatically
     app.publishone()
     assertThat(collector.receive().getPayload(), equalTo("hello".getBytes()));
  }

}
Oleg Zhurakousky
@olegz
yes, we have tests for that
what I don't understand from your code is the collector part
Jon Harper
@jonenst
Hi,
thanks for taking the time to answer
collector should have been "output"
Oleg Zhurakousky
@olegz
got it
Jon Harper
@jonenst
the mypublisher bean stores a fluxsink used by the app (when an http request comes in) to send messages to the flux to send a message to a broker queue
I just want my test of the http request to also assert that a message has been sent to to the queue
I don't use @EnableBinding, I use spring function definitions
Oleg Zhurakousky
@olegz
ok, then I missunderstod yoru question
Jon Harper
@jonenst
My test already exists, it uses regular spring annotations to make a test context; Ideally I would just get a bean using @Autowired and be able to assert that messages are send to the queue. Do you know if it's possible ?
you can type in here and I'll look later
Jon Harper
@jonenst
no problem, thank you for your time

I see the configuration, but it's still used in the @Test public void testGh1973() by creating the context manually

    try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
                TestChannelBinderConfiguration.getCompleteConfiguration(SupplierAndProcessorConfiguration.class))
                        .web(WebApplicationType.NONE).run("--spring.jmx.enabled=false",
                                "--spring.cloud.function.definition=echo;supplier",
                                "--spring.cloud.stream.bindings.supplier-out-0.destination=output",
                                "--spring.cloud.stream.bindings.echo-out-0.destination=output")) {

I want to get rid of this, in particular the "getCompleteConfiguration(...)" call; and instead use the regular context and application.yml of my app. Do you know if it's possible ?

using the @SpringRunner or @SpringExtension or @SpringBootTest