Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Dec 03 08:33
    simsinak commented #2573
  • Dec 03 08:25
    simsinak commented #2573
  • Dec 02 19:03

    sobychacko on main

    Remove AWS Kinesis binder from … (compare)

  • Dec 02 17:52
    sobychacko assigned #2574
  • Dec 02 12:49
    tzolov review_requested #2574
  • Dec 02 12:49
    tzolov opened #2574
  • Dec 02 12:48

    tzolov on gh-2562

    Fix multirecord schema support … (compare)

  • Dec 02 12:48

    tzolov on gh-2562

    Fix multirecord schema support … (compare)

  • Dec 01 17:32

    spring-builds on main

    Update SNAPSHOT to 4.0.0-RC3 Going back to snapshots (compare)

  • Dec 01 17:29

    spring-builds on v4.0.0-RC3

    Update SNAPSHOT to 4.0.0-RC3 (compare)

  • Dec 01 16:13
    sobychacko commented #2467
  • Dec 01 03:23

    sobychacko on main

    Disconnect Kinesis binder docs … (compare)

  • Nov 30 23:40
    sobychacko commented #2543
  • Nov 30 21:38

    sobychacko on main

    Cleanup TestChannelBinderConfig… (compare)

  • Nov 30 20:26

    sobychacko on main

    Disconnect Kinesis binder from … (compare)

  • Nov 30 19:11
    artembilan commented #2572
  • Nov 30 19:01
    rbraeunlich commented #2572
  • Nov 30 16:10
    Dam14n commented #2569
  • Nov 30 14:36
    garyrussell commented #2573
  • Nov 30 14:32
    garyrussell commented #2573
Oleg Zhurakousky
@olegz
@kschlesselmann perfect! Just an FYI, we did confirm the approach with reactive team.
hoplers
@hoplers

Hi everyone,
Im using spring-cloud-stream 1.3 with kafka binder 11. I try to override the ProducerListener (by creating a @Bean function returning ProducerListener on configuration class), but it doesnt help and the binder's ProducerListener is the default one (created in KafkaBinderConfiguration). What am I doing wrong?

I just figured out - i am using two different binders, and do to so i have to use 'environment' property for the binders. according to the documentation of SCS, this is causing the contexts of the binders to be isolated from application context, therefore the beans i define in my application cannot be injected to these contexts.
So the new question is, how can i inject this bean (from the main application context) to the binder's context?

Serghei Ilin
@serghei-ilin

hi @olegz , I’ve checked you suggestion regarding lombok influence :point_up: November 1, 2019 5:04 PM. It turned out it’s not the lombok issue. The logger causes the issue. I removed from my sample project lombok completly. My consumer now looks the following way:

@Component("consumer")
public class ConsumerClass implements Consumer<Flux<Message<SampleObject>>> {

    private final DummyClass dummyClass;

    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(ConsumerClass.class);

    public ConsumerClass(DummyClass dummyClass) {
        this.dummyClass = dummyClass;
    }

    @Override
    public void accept(Flux<Message<SampleObject>> messageFlux) {
        messageFlux.subscribe(message -> System.out.println("consumer: " + message.getPayload()));
    }
}

With the logger defined I am getting the exeption I mentioned. If I remove the logger line it works fine.
You can reproduce the issue in the project: https://github.com/serghei-ilin/spring-cloud-stream-function

Oleg Zhurakousky
@olegz
@serghei-ilin I can't seem to reproduce it, there is something else in your project that causes it. . .
here is my consumer
@Component
public class SimpleConsumer implements Consumer<Flux<Message<String>>> {

    private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);

    @Override
    public void accept(Flux<Message<String>> messageFlux) {
        messageFlux.subscribe(message -> System.out.println("consumer: " + message.getPayload()));
    }

}
I'll look into your project and see if I can spot something
Serghei Ilin
@serghei-ilin
thank you @olegz . I will also try to re-produce it without constructor but with the custom message converter like in the project.
Bart Veenstra
@bartveenstra
Hi. Simple question regarding tasks in SCDF running on k8s
Is it expected that tasks shutdown automatically after completion? Of do I need to run System.exit after the functions of the command line runner are executed?
Oleg Zhurakousky
@olegz
@serghei-ilin i just added the constructor as well and all is good
@bartveenstra wrong channel
there is spring-cloud-dataflow channel
Bart Veenstra
@bartveenstra
Yeah. I just realized that :)
Oleg Zhurakousky
@olegz
@serghei-ilin I am actually having problems importing your project into my IDE
so the best I can do is mimic what you do and so far I can't reproduce
if you can slim it down to a bare minimum
Serghei Ilin
@serghei-ilin
hm, ok. thank you @olegz . yes, I will slim it down and will add things step-by-step
Michael Jefferson
@mtj8033
is there a way to get the spring boot application to startup when kafka is down faster? Right now it looks like it tries to connect for ~2mins or so due to retries. If I set request.timeout.ms low it starts up really fast, but not sure that's the best idea/way to work around the issue. It is a application that is producing and consuming kafka messages
Oleg Zhurakousky
@olegz
I am not sure what you're asking @mtj8033, the application is started and container is trying to connect. The same retries will happen at runtime if connection disappears for some reason. That is part of functionality
Gary Russell
@garyrussell
Reducing the request timeout is the only way; the binder has no control once it passes control to the AdminClient to examine and/or create the topic(s).
James Dee
@ScruffyDev_gitlab

Hi all,
we built a spring kotlin service utilizing the spring-cloud-stream-binder-kafka to consume a topic from the AWS managed kafka.
On a given event we would like to reprocess everything in the topic and therefore reset the offset of the consumer.

I could probably use the resetOffsets property, but that would need a restart of the service.
Now I struggle to do do this without restarting the service.
I would be thankful if you could lead me to a solution.

Ivan Franchin
@ivangfr

Hi, I've updated a project

FROM
Spring Boot - 2.1.8
Spring Cloud Stream - Greenwich.SR2

TO
Spring Boot - 2.2.0
Spring Cloud Stream - Hoxton.RC1

No problems so far, except some deprecations... I have a converter bean. Below, it is the version I've used so far with Greenwich.SR2

    @Bean
    @StreamMessageConverter
    MessageConverter newsMessageConverter() {
        return new AvroSchemaMessageConverter(MimeType.valueOf("application/*+avro"));
    }

Now, with Hoxton.RC1, it is showing that the annotation @StreamMessageConverter and the constructor public AvroSchemaMessageConverter(MimeType supportedMimeType) are deprecated.

After some research, I've found out that the annotation @StreamMessageConverter is not required as it is written here.

However, how should I create an AvroSchemaMessageConverter bean?

Thanks!

Soby Chacko
@sobychacko
@ivangfr You are right - StreamMessageConverter is no longer required. As to your second question about AvroSchemaMessageConverter, there are going to be some announcements happening around this along with the RC2 release blog. In a nutshell, the entire schema registry code base is moved out of core Spring Cloud Stream and migrated over here. The maven coordinates for the schema registry artifacts have been renamed and the versions are starting from 1.0.0. In order to get AvroSchemaMessageConverter, you need to add this dependency in your pom. Sorry for any inconveniences this may have caused.
Ivan Franchin
@ivangfr

Hi @sobychacko, no problem. Thanks for the response.

Before, I was importing the AvroSchemaMessageConverter from

import org.springframework.cloud.stream.schema.avro.AvroSchemaMessageConverter;

The dependency was

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-schema</artifactId>
</dependency>

Now, I am using the dependency you suggested

import org.springframework.cloud.schema.registry.avro.AvroSchemaMessageConverter;

dependency

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency>

Btw, this new library has also the constructor deprecated

@Deprecated
public AvroSchemaMessageConverter(MimeType supportedMimeType) {
    super(supportedMimeType);
}

Anyway, I will wait for the RC2 release.

Thanks!

enkeebbx
@enkeebbx
Hello, I am just wondering if I am the only one having this issue. My scdf(k8s) dashboard keeps freezing when directed to the audit record menu. Then I have to resort to truncating the audit_record table. Does anyone else have this issue?
Oleg Zhurakousky
@olegz
@enkeebbx wrong forum. Try spring-cloud-dataflow
Artem Bilan
@artembilan
at org.springframework.cloud.stream.test.binder.TestSupportBinder.bindProducer(TestSupportBinder.java:52)
Doesn't look like you use Kafka Streams Binder...
If the test one is not what you want, just remove that dependency from your POM
Artem Bilan
@artembilan
Right, and that one is MessageChannel-based
when it expects to have bindable as a MessageChannel, but it is not in case of Kafka Streams
I mean, that is wrong assumption that TestSupportBinder is going to work with KStream bindings
I will defer to @sobychacko how to test those KStream bindings...
One more time: the spring-cloud-stream-test-support is wrong choice for testing Kafka Streams
yixingma
@yixingma
@artembilan TY
Artem Bilan
@artembilan
That's an answer to your question about that exception
yixingma
@yixingma
That clarifies
Artem Bilan
@artembilan
Everything else is not related
I guess it is best to test KStream binding with an Embedded Kafka
and of course real KStream Binder
Knut Schleßelmann
@kschlesselmann
Hi there! We have a kafka topic here which we'd like to reprocess on a given event. I thought about resetting the consumer groups offset to the beginning of the topic. How would I do that using SCS? The best I could find so far is that I can somewhere inject a org.apache.kafka.clients.consumer.Consumerbut I cannot find any documentation on how I would do that :-(
René Schröder
@reneschroeder0000
@kschlesselmann I would also be interested in resetting the offset of a consumer group for spring-cloud-stream-binder-kafka programatically. i think, stopping the application and using the application reset tool is not a good workaround...
Soby Chacko
@sobychacko
@yixingma Not sure I follow the entire setup. You can override the broker configuration somehow in that default starup context. Can we see a small application or some code/config?
We recently added a new sample applicaiton to demonstrate various testing strategies with the kafka streams binder: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples/kafka-streams-inventory-count
You might want to take a look there.
Soby Chacko
@sobychacko
@kschlesselmann When auto rebalancing is not enabled, you can set the resetOffsets to true on the consumer binding and then set the startOffset property on the consumer binding to earliest. I believe, this will make the consumer to reset the offset to the beginning .
Knut Schleßelmann
@kschlesselmann
@sobychacko But wouldn't I have to restart my service for that? I need to do this at runtime
I have a consumer group running and one of those consumers should reset the whole group to the beginning of the topic