Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 20 22:17
    garyrussell commented #2036
  • Oct 20 21:27
    0x006EA1E5 commented #2036
  • Oct 20 20:59
    garyrussell commented #2036
  • Oct 20 20:59
    garyrussell commented #2036
  • Oct 20 20:58
    garyrussell commented #2036
  • Oct 20 20:58
    garyrussell labeled #2036
  • Oct 20 20:58
    garyrussell labeled #2036
  • Oct 20 20:11
    0x006EA1E5 commented #2036
  • Oct 20 20:09
    0x006EA1E5 commented #2036
  • Oct 20 20:02
    0x006EA1E5 commented #2036
  • Oct 20 20:01
    0x006EA1E5 opened #2036
  • Oct 20 15:10
    Chr3is commented #2035
  • Oct 20 14:48
    garyrussell commented #2035
  • Oct 20 14:47
    garyrussell commented #2035
  • Oct 20 14:47
    garyrussell commented #2035
  • Oct 20 14:40
    Chr3is commented #2035
  • Oct 20 13:57
    olegz commented #2034
  • Oct 20 13:53
    olegz commented #2035
  • Oct 20 13:43

    olegz on master

    Change function to snapshot Cleanup and refactor to accomod… (compare)

  • Oct 20 11:26
    Chr3is commented #2035
boonware
@boonware
I've tried mutating the object parameter to the method, but that doesn't seem to have any effect
Gonzalo Borobio
@gborobio73

Hello! I'm new to spring cloud stream and kafka and I could use some help. From the spring cloud samples, test embedded kafka (https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/testing-samples/test-embedded-kafka) I'm trying to modify the code so it uses KStream. The code inside EmbeddedKafkaApplication looks like this when using KStream:

    public Function<KStream<Object, byte[]>, KStream<Object, byte[]>> handle(){
        return in ->
            in.map((key, value) -> new KeyValue<Object, byte[]> (key, new String(value).toUpperCase().getBytes()));
    }

I have also modified the POM to include spring-cloud-stream-binder-kafka-stream

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

And also upgraded to newer Kafka as explained here: spring-cloud/spring-cloud-stream-samples#182
The issue I have is that when I run the test, I can see this error in the log:

2020-06-09 16:37:58.854 ERROR 6507 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [handle-applicationId-203b2c06-8fbf-4687-b8aa-71e231d46005-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance

org.apache.kafka.streams.errors.StreamsException: stream-thread [handle-applicationId-203b2c06-8fbf-4687-b8aa-71e231d46005-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
    at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216) ~[kafka-streams-2.3.1.jar:na]

And the tests eventually fails because

Expecting:
 <0>
to be equal to:
 <1>
but was not.

Any ideas on what I'm doing wrong? Thanks a lot!!

15 replies
natraj09
@natraj09
Is there an option to skip bad records with kafka binder , I see an option with kafka stream https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.2.1.RELEASE/spring-cloud-stream-binder-kafka.html#_handling_deserialization_exceptions . I am also using the reactive api , wondering if its something I have to handle on my own ?
Soby Chacko
@sobychacko
You can setup a DLQ for storing the bad records and get going with the processing of new records.
1 reply
ZhangXudong
@MLeft
image.png
1 reply
image.png
image.png
I bind a input channel, but why the ChannelInterceptor.preSend invoked, not the preReceive method?
dharezlak
@dharezlak

Hi, I have the following reactive consumer:

@Bean
public Function<Flux<String>, Mono<Void>> processor() {
    return messages -> messages
            .flatMap(storage::store)
            .then();
}

How is this going to behave with respect to Kafka message acknowledgement? I can't seem to find any documentation on this. Since storage::store is reactive as well I would assume the flatMap operator would parallelize message consumption here?

3 replies
shipra chaudhary
@ShipraShipra04_twitter

Hi Team, I am using reactive Spring data r2dbc repositories to get pojo's from Postgres DB

@Table("document")
public class Document {
@Id
private String id;
@Column(value = "authors_ids")
private Set<Author> authors;// object
}

Author is an pojo object authors_ids is array list in postgres Db

while trying to run i get below error because of custom conversion.how i can define Set<object> type in pojo to access authors_ids(array of string) field from postgres DB?

{
"timestamp": "2020-06-15T13:20:50.491+0000",
"status": 500,
"error": "Internal Server Error",
"message": "Could not read property @org.springframework.data.relational.core.mapping.Column(keyColumn=, value=authors_ids)private java.util.Set com.example.demo.Documents.authors from result set!",
"path": "/api/document"
}

dvirgiln
@dvirgiln

Hello,

I am following the example from https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution

I am receiving an error:

ailed to invoke method; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"HelloWorld","namespace":"com.example.avro","fields":[{"name":"transactionId","type":"string"},{"name":"message","type":"string"},{"name":"field1","type":["null","int"]}]} (through reference chain: com.example.HelloWorld["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])

My Avro file is quite simple:

{
    "namespace": "com.hollandandbarrett.dns.kafkaworkshop.avro",
    "type": "record",
    "name": "HelloWorld",
    "fields": [
        {"name": "transactionId", "type": "string"},
        {"name": "message", "type": "string"},
        {"name": "field1", "type": ["null", "int"]}
    ]
}

The application.yml is:

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamic-schema-generation-enabled: false
      bindings:
        output:
          contentType: application/*+avro
          destination: HelloWorld
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: localhost:29092
          auto-create-topics: true

        bindings:
          output:

            producer:
              configuration:
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
server.port: 9010
Any idea why is not working?
Why is it trying to use the Json serialization instead of the Avro one?
Soby Chacko
@sobychacko
@dvirgiln Did you get that example working properly? Or getting that error while running that?
dvirgiln
@dvirgiln
It works perfectly if I do not include the union operator
1 reply
{"name": "field1", "type": ["null", "int"]}
somehow it is trying to use the Jackson serde
dvirgiln
@dvirgiln
The same error happens when I execute the producer 2 on the example
https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples/kafka-streams-schema-evolution
mvn clean compile package
java -jar schema-registry-confluent-avro-serializer-producer2/target/schema-registry-confluent-avro-serializer-producer2-0.0.1-SNAPSHOT.jar
dvirgiln
@dvirgiln
I have used the same docker as it appears in the README file:
 docker-compose -f docker-compose-control-center.yaml up -d
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Angshuman Agarwal
@angshuman-agarwal

@sobychacko : I have a naive question related to one sample here

How do you know process-out-0, process-out-1, process-out-2 mappings to english, french and spanish respectively and why not to any other order of process-out-* index ?

spring.cloud.stream.bindings.process-out-0:
  destination: english-counts
spring.cloud.stream.bindings.process-out-1:
  destination: french-counts
spring.cloud.stream.bindings.process-out-2:
  destination: spanish-counts
4 replies
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello,
I've started the migration to spring cloud Hoxton.SR5

I have the next code in kotlin, which has worked before:

    @StreamListener(CloudOperationsChannels.INPUT)
    @SendTo(CloudOperationsChannels.OUTPUT)
    fun operationsListener(input: Flux<Message<ByteArray>>): Flux<Message<ByteArray>>

Now ByteArray payload (which are bytes of json string) is trying to deserialize into Message and I've got

Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

Could you help me, how can I use org.springframework.messaging.Message in such case?

Thank you

@sobychacko : Could you give some suggestions on how can I use Flux of org.springframework.messaging.Message?

Thank you

6 replies
Angshuman Agarwal
@angshuman-agarwal

Hi,
I am trying to compose function and giving alias to map to my KAFKA topic as per here - https://github.com/spring-cloud/spring-cloud-stream/blob/master/docs/src/main/asciidoc/spring-cloud-stream.adoc

spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes=quotedUpperCase

But, yml file fails to parse this, any sample where you can point me the usage of aliasing piped function to a descriptive name which can be used to route to a custom topic please ?

Has anyone tried this please ?

Andras Hatvani
@andrashatvani
hi, does spring-cloud-stream support deduping based on the message key?
Víctor Reyes Rodríguez
@vicrdguez

Hi!
I have a problem trying to configure the applicationId for my processors. This is my application.yml

spring.cloud:
    stream:
      function:
        definition: changePriority;addComment
      kafka:
        streams:
          binder:
            functions:
              changePriority.applicationId: proxy-sbn-changePriority
              addComment.applicationId: proxy-sbn-addComment

Anyways when I run my app I still getting anonymous consumers. Any hint?
Thanks!

5 replies
Wil Collins
@wilcollins
Hi, our team is using a RabbitMQ backend and we need message deduplication so we are considering using this community-provided plugin https://github.com/noxdafox/rabbitmq-message-deduplication , however, it requires specific arguments to be provided to the Exchange when it is created and, as far as I know, there are currently no existing facilities to do this through the provided configuration class so I have a few questions about that:
1) is there an existing solution that I am overlooking?
2) if no, is this functionality that already exists on any roadmaps for implementation?
3) if not or if that timeline is reasonably long, would this be a feature that would be a welcomed contribution from the community?
2 replies
shipra chaudhary
@ShipraShipra04_twitter
Hi team, I am trying to create SCDF task which use jdbc dependancy (org.springframework.boot:spring-boot-starter-jdbc)to store metadata. In spring boot app I am using r2dbc dependency(io.r2dbc:r2dbc-postgresql:0.8.2.RELEASE) to get data from postgresql and post it to some rest point.
I am getting error I tried to follow below link https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/multiple-datasources
but i am not sure what will be my first and second EmbeddedDatabaseType in this Scenario please help.
Jon Harper
@jonenst
Hi, is it possible to use the new test binder ( https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#_test_binder_usage ) with "normal" annotation based Tests controlling the context ? something like this
@SpringBootApplication
public class MyApp() {
   @Bean
    public Supplier<Flux<Message<String>>> mypublisher() {
        ...
    }

   public void publishone() {
      //some function called by the app to send messages to the mypublisher flux
   }
}

@SpringBootTest(...)
public class MyTest {
  @AutoWired
   OutputDestination output;

  @Test mytest() {
     //context setup automatically
     app.publishone()
     assertThat(collector.receive().getPayload(), equalTo("hello".getBytes()));
  }

}
Oleg Zhurakousky
@olegz
yes, we have tests for that
what I don't understand from your code is the collector part
Jon Harper
@jonenst
Hi,
thanks for taking the time to answer
collector should have been "output"
Oleg Zhurakousky
@olegz
got it
Jon Harper
@jonenst
the mypublisher bean stores a fluxsink used by the app (when an http request comes in) to send messages to the flux to send a message to a broker queue
I just want my test of the http request to also assert that a message has been sent to to the queue
I don't use @EnableBinding, I use spring function definitions
Oleg Zhurakousky
@olegz
ok, then I missunderstod yoru question
Jon Harper
@jonenst
My test already exists, it uses regular spring annotations to make a test context; Ideally I would just get a bean using @Autowired and be able to assert that messages are send to the queue. Do you know if it's possible ?
you can type in here and I'll look later
Jon Harper
@jonenst
no problem, thank you for your time

I see the configuration, but it's still used in the @Test public void testGh1973() by creating the context manually

    try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
                TestChannelBinderConfiguration.getCompleteConfiguration(SupplierAndProcessorConfiguration.class))
                        .web(WebApplicationType.NONE).run("--spring.jmx.enabled=false",
                                "--spring.cloud.function.definition=echo;supplier",
                                "--spring.cloud.stream.bindings.supplier-out-0.destination=output",
                                "--spring.cloud.stream.bindings.echo-out-0.destination=output")) {

I want to get rid of this, in particular the "getCompleteConfiguration(...)" call; and instead use the regular context and application.yml of my app. Do you know if it's possible ?

using the @SpringRunner or @SpringExtension or @SpringBootTest
Jon Harper
@jonenst
Thanks a lot ! You solved my problem. I hadn't looked at the prerelease docs. Adding @Import(TestChannelBinderConfiguration.class) did the trick. Also, instead of using Function<String,String>, I'm using Supplier<Flux<String>> , and in this case I had to use receive(timeout) because the plain receive() returned null.
Jon Harper
@jonenst
by the way it was a very good idea to add it to the docs in the next version, I think that's what most people look for when writing this kind of test. Good job
Angshuman Agarwal
@angshuman-agarwal

Hi,
In Spring Cloud Function (KAFKA), how do I control the lifetime of Producer please ? I want to control it via a REST endpoint on a per request basis (like new Producer for every request)

Currently, I see, as soon as I start my app, my Supplier function gets called. I want to disable this abstract polling from framework and control it myself please.

It is mapped like this :

stream.cloud.stream.kafka.bindings.producer-out-0 : myTopic
stream.cloud.function.definition : producer (where producer is the name of my Supplier<String> function).

Please can someone help on how to achieve that ?

boonware
@boonware
Hi there. Is it possible to write (delete) from a Kafka Streams state store with Spring Cloud Stream? Typically I access state store with InteractiveQueryService, however, that only provides read-only state stores. Is there a way to get a reference (preferably via an injected bean) to a state store with write access?
3 replies