Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 14:27
    olegz commented #1865
  • 14:09
    zer0keefie commented #1865
  • 07:16
    olegz commented #1865
  • Dec 04 17:06

    olegz on master

    GH-1863 Add default keep-alive … (compare)

  • Dec 04 09:59
    olegz commented #1864
  • Dec 04 09:04
    msshitole commented #1863
  • Dec 04 09:04
    msshitole commented #1863
  • Dec 03 10:07
    TuvaevAndrey commented #1404
  • Dec 02 21:01
    rainerfrey commented #1848
  • Dec 02 20:24
    vincent-fuchs commented #1825
  • Dec 02 16:37
    smitp33 commented #1856
  • Dec 02 16:37
    smitp33 commented #1856
  • Dec 02 16:36
    smitp33 commented #1856
  • Dec 02 14:12
    olegz commented #1826
  • Dec 02 14:10
    olegz commented #1825
  • Dec 02 14:10
    olegz commented #1825
  • Dec 02 14:02
    olegz commented #1832
  • Dec 02 14:00

    olegz on master

    GH-1854 Fixed typo in the doc … (compare)

  • Dec 02 13:57
    olegz commented #1856
  • Dec 02 13:48
    olegz closed #1815
Soby Chacko
@sobychacko
I still need to get back to you on the earlier issue you raised with configuration.
Sergey
@Fonexn
ok, but each binder can have specific configuration? Because I don't understand why I not see applicationId from another binder and always is last binder config is applied.
Anyway I investigate this parte because my aggregation process is very slow for 12.000 message took 25 min!
Soby Chacko
@sobychacko
Any chance we can see some reproducible self contained sample for both issues?
Sergey
@Fonexn
Ok, no problem for first case. Second is more complex...
Soby Chacko
@sobychacko
I think the second case is most likely a core kafka streams related question.
Sergey
@Fonexn
For second case I have this code
```
@StreamListener(GlobalReportProcessor.GLOBAL_REPORT_TO_PROCESS)
@SendTo(GlobalReportProcessor.GLOBAL_REPORT_PROCESSED)
public KStream<String, GlobalReport> receiveGlobalReportToProcess(KStream<?, Record> input) {

    return input
            .filter(
                    (key, value) -> {
                        boolean isValidRecord = false;
                        if (value.getGlobalMetadata() != null && value.getGlobalMetadata().getOutputFormatID() != null) {
                            isValidRecord = true;
                        } else {
                            log.error("ERROR receiveGlobalReportToProcess(). Skip the corrupted message: {}", value);
                        }
                        return isValidRecord;
                    }
            )
            .groupBy(
                    (key, record) -> record.getGlobalMetadata().getOutputFormatID(),
                    Serialized.with(null, new JsonSerde<>(Record.class)))
            .windowedBy(SessionWindows.with(TimeUnit.SECONDS.toMillis(this.sessionWindowWithSeconds)).until(TimeUnit.SECONDS.toMillis(this.sessionWindowUntilSeconds)))
            .aggregate(
                    GlobalReport::new,
                    (aggKey, newRecord, aggregateRecord) -> {
                        aggregateRecord.setGlobalMetadata(newRecord.getGlobalMetadata());
                        aggregateRecord.setMetadataSend(newRecord.getMetadataSend());
                        GlobalRecord record = new GlobalRecord();
                        record.setMetadata(newRecord.getMetadata());
                        record.setText(newRecord.getText());
                        aggregateRecord.addRecord(record);
                        return aggregateRecord;
                    },
                    (aggKey, leftPageList, rightPageList) -> {
                        leftPageList.getRecords().addAll(rightPageList.getRecords());
                        return leftPageList;
                    },
                    Materialized.<String, GlobalReport, SessionStore<Bytes, byte[]>>as(this.stateStoreName)
                            .withValueSerde(new JsonSerde<>(GlobalReport.class))
            )
            .toStream()
            .filter(((key, value) -> value != null))
            .map((key, value) -> new KeyValue<>(key.key(), value));
}
```
windowedBy is 30 seconds with interval.commit 20000 ms and work fine until 6.000 messages with gzip, then it goes slow
I have two kstream process with different aggregation in the same app and for this case I tried to optimize aggregation increased commit.intreval is when I discovered what my app always use config only one of them
Soby Chacko
@sobychacko
I see.
Let's see if we can isolate the first issue, where you are experiencing the configuration issue.
Sergey
@Fonexn
tomorrow I will try get some example
Arturo Araya
@ccellist
@olegz sorry for the delay, and thanks for the response. You’re right, supplier-in-0 can be ignored. I was testing something else (somewhat blindly). I should mention that the message I’m trying to post in the topic is of custom type Message (for which I’ve set up a Serde). I was assuming I needed application/json in order for it to process the deserializing properly. I get no errors, I just don’t see anything show up in my target topic. Or any topic for that matter.
Mansingh Shitole
@msshitole
@olegz @sobychacko Just FYI, as @garyrussell suggested - I tried by making a thread to sleep 60_000 to 300_000. Not worked. The application is getting shut down after that interval.
Oleg Zhurakousky
@olegz
That means you have some issue connecting to Kafka
you can temporarily try to add web dependency to ensure the JVM does not exit at all and hopefully, the problem will manifest itself somehow
Mansingh Shitole
@msshitole
@olegz Nopes. But it works if I add spring-web dependency.
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
Oleg Zhurakousky
@olegz
ohh, well, i'll look later, and it's been fixed anyway but for now use web dependnecy
Mansingh Shitole
@msshitole
Yes. well, will do this temporarily. would be looking forward for the proper fix. Thank you!
@olegz :thumbsup:
Mansingh Shitole
@msshitole
@sobychacko with the latest version (Hoxton.RELEASE), our avro message conversion is not working. Here is the xception stackTrace for more details:
org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an array: {"type":"record","name":"Attachment","namespace":"com.avro.document","fields":[{"name":"url","type":[{"type":"string","avro.java.string":"String"},"null"]},{"name":"type","type":[{"type":"string","avro.java.string":"String"},"null"]},{"name":"description","type":[{"type":"string","avro.java.string":"String"},"null"]}]} (through reference chain: com.avro.document.Document["attachments"]->java.util.ArrayList[0]->com.avro.document.Attachment["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not an array: {"type":"record","name":"Attachment","namespace":"com.avro.document","fields":[{"name":"url","type":[{"type":"string","avro.java.string":"String"},"null"]},{"name":"type","type":[{"type":"string","avro.java.string":"String"},"null"]},{"name":"description","type":[{"type":"string","avro.java.string":"String"},"null"]}]} (through reference chain: com.avro.document.Document["attachments"]->java.util.ArrayList[0]->com.avro.document.Attachment["schema"]->org.apache.avro.Schema$RecordSchema["elementType"])
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:286) ~[spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertToInternal(ApplicationJsonMessageMarshallingConverter.java:69) ~[spring-cloud-stream-3.0.0.RELEASE.jar:3.0.0.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201) ~[spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191) ~[spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83) ~[spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputValueIfNecessary$2(BeanFactoryAwareFunctionRegistry.java:622) ~[spring-cloud-function-context-3.0.0.RELEASE.jar:3.0.0.RELEASE]
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[na:na]
    at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1631) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[na:na]
    at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:543) ~[na:na]
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:628) ~[spring-cloud-function-context-3.0.0.RELEASE.jar:3.0.0.RELEASE]
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputPublisherIfNecessary$5(BeanFactoryAwareFunctionRegistry.java:641) ~[spring-cloud-function-context-3.0.0.RELEASE.jar:3.0.0.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(Fl
Soby Chacko
@sobychacko
what kind of schema registry client dependencies are you using?
Mansingh Shitole
@msshitole
we are having these dependencies:
<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
            <version>2.1.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
Soby Chacko
@sobychacko
wonder if you can change to those.
Mansingh Shitole
@msshitole
@sobychacko looks like StreamMessageConverter has been deprecated. Something we are missing. But yes it works fine in the old version.
Here I have pushed sample project for you. Could you please have a look into it.
https://github.com/msshitole/reactive-avro-source
Soby Chacko
@sobychacko
ok.
Mansingh Shitole
@msshitole
Thank you!
Soby Chacko
@sobychacko
I will try to look at that this afternoon.
Mansingh Shitole
@msshitole
No problem. Take your time.
I tried adding these registries dependencies but no success. still the same error.
In the above repo, I kept both our current working prod version (Greenwich.SR3) and new. You can test/play by uncommenting it. Incase needed. Just to see working (success) and error cases.
Soby Chacko
@sobychacko

@msshitole I think I found the issue. You need to use spring.cloud.stream.bindings.emit-out-0.contentType=avro/bytes. You were using output as the binding name. Also, this is the right new dependency:

 <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-schema-registry-client</artifactId>
        </dependency>

After making those 2 changes, I am able to run the app without errors and see the output.

You don't need to annotate the converter with StreamMessageConverter any longer.
Mansingh Shitole
@msshitole
@sobychacko That's great! Thank you Soby. Tested, it works fine now. Earlier default channel was always output. With the new release, looks like, it takes supplier method name and appends -out-0. Can you put little light on the funda/concept behind this?
@sobychacko And one more question, AvroSchemaMessageConverter constructor has been deprecated. Could you please share the new way of instantiating the same. Thank you!
Mansingh Shitole
@msshitole
@sobychacko Additional info, AvroSchemaMessageConverterclass is present in two dependencies. Both work fine (any one of them). The constructor is only deprecated in spring-cloud-schema-registry-client. Hence we will use the old dependency. Just don't want to upgrade any deprecated version which will be eventually pushed into the prod. Do let us know is any recommendation from your side. Thank you!
 <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
            <version>2.1.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-schema-registry-client</artifactId>
        </dependency>
Soby Chacko
@sobychacko
@msshitole You can read the new binding name changes for functional beans here.
Soby Chacko
@sobychacko
I suggest you use the new artifact (spring-cloud-schema-registry) and rely on the version provided through SCSt bom.
In order to fix the deprecation issue, use this constructor: new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"), new AvroSchemaServiceManagerImpl())
Mansingh Shitole
@msshitole
@sobychacko cool! yes, it works like a charm. I highly appreciate your support. Thank you again for showing the right path.

@msshitole You can read the new binding name changes for functional beans here.

Exactly. I was looking for the same. I will go through it. Thank you! :)

blake-bauman
@blake-bauman

I'm running into an issue where I'm getting a ClassCastException invoking my function instead of the data being converted using ApplicationJsonMessageMarshallingConverter. The exception is for trying to cast a Message to my object type instead of converting. If I change the input type to Message<Object>, I can see that the headers include a content type of application/json. One thing I noticed in BeanFactoryAwareFunctionRegistry.doApply() is:

result = this.invokeFunction(this.composed ? input : this.convertInputValueIfNecessary( ... )));

Since I have multiple beans that are considered functions functions, I'm using spring.cloud.function.definition=myFunction to specify the desired bean for the input, but this marks the function as composed in the above code which doesn't seem to go through the input value conversion. Is there a way to force the type conversion for a composed function? Or a way to specify the bean to use without marking it as composed? (I'm using Spring Cloud Release Train Hoxton.RELEASE)

Sergey
@Fonexn
Hi. I have ready demo with multiple binder
but I cannot push it to github because I have corporate proxy...
I can attach here zip?
Oleg Zhurakousky
@olegz
@blake-bauman need a bit more info
can you actually post your application somehow?
I am trying to understand what triggers your function to be composed when it's a single function