Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 11:57

    olegz on 3.1.x

    GH-2202 Remove Reflection calls… (compare)

  • 11:52

    olegz on main

    GH-2202 Remove Reflection calls… (compare)

  • Jul 22 15:57
    olegz commented #2068
  • Jul 22 14:47
    antonywhatever commented #2068
  • Jul 20 13:54

    olegz on 3.1.x

    Fix supplier test (compare)

  • Jul 20 13:38

    olegz on 3.1.x

    Fix failing test (compare)

  • Jul 20 13:20

    olegz on 3.1.x

    Fix failing test (compare)

  • Jul 20 11:14

    olegz on 3.1.x

    Temporarily disable failing test (compare)

  • Jul 20 08:58

    olegz on 3.1.x

    Temporarily disable failing test (compare)

  • Jul 20 08:46

    olegz on 3.1.x

    Temporarily disable failing test (compare)

  • Jul 20 08:30

    olegz on 3.1.x

    Bump s-c-build to 3.0.4-SNAPSHOT (compare)

  • Jul 20 03:57
    velovint commented #1404
  • Jul 16 13:27
    olegz commented #2196
  • Jul 16 13:26

    olegz on 3.1.x

    GH-2196 Enhance test case to de… (compare)

  • Jul 16 13:25

    olegz on main

    GH-2196 Enhance test case to de… (compare)

  • Jul 16 08:04

    olegz on 3.1.x

    Bump timeout on randomly failin… (compare)

  • Jul 16 08:04

    olegz on main

    Bump timeout on randomly failin… (compare)

  • Jul 16 08:01

    olegz on 3.1.x

    Fix test (compare)

  • Jul 16 07:59

    olegz on main

    Fix test (compare)

  • Jul 15 18:08
    samragu commented #2194
iguissouma
@iguissouma
Hello, I have a strange behaviour for an application in production with a StreamListener and a kafka binder, Sometimes I consume messaages already processed after awhile of time. I'm disabling autoCommitOffset and ack manually the message when the header is present(works well with a local setup using a simple kafka with docker). What can cause this kind of problems from consumer side or server side? what checks I can do? thanks in advance.
13 replies
Mike Wallace
@argonium_gitlab

I'm writing a Java application that uses Spring Cloud Stream to read messages from an input RabbitMQ queue, process the messages, and then write a new message to an output RabbitMQ exchange. This works great when the input queue name is known at compile-time.

I now have a new requirement that the application needs to support discovering new RabbitMQ queues at runtime, and then use those new queues for input (reading messages from the new queues). Getting the list of queues is straightforward (via the REST API in RabbitMQ), but I don't see a way (in SCS) to use a variable queue name for reading messages from those new input queues.

I'm using Spring Cloud Stream v.3.0.3. Does Spring Cloud Stream support this use case?

Gary Russell
@garyrussell

There's a "back door" way to do it - add a ListenerContainerCustomizer and get a reference to the listener container:

@SpringBootApplication
public class Gitter87Application {

    public static void main(String[] args) {
        SpringApplication.run(Gitter87Application.class, args);
    }

    private final Map<String, AbstractMessageListenerContainer> containers = new ConcurrentHashMap<>();

    @Bean
    Consumer<String> input() {
        return System.out::println;
    }

    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer> cust() {
        return (container, destinationName, group) -> {
            this.containers.put(group, container);
        };
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            System.out.println("Hit Enter to add queue2");
            System.in.read();
            AbstractMessageListenerContainer container = this.containers.get("group1");
            container.addQueueNames("queue2");
            template.convertAndSend("queue2", "test");
        };
    }

}
spring.cloud.stream.bindings.input-in-0.destination=dest1
spring.cloud.stream.bindings.input-in-0.group=group1
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=direct

It's best to use a direct container; with the default simple container, the consumer(s) are stopped and restarted when adding queues - see "Choosing a Container" - https://docs.spring.io/spring-amqp/docs/current/reference/html/#choose-container

Gary Russell
@garyrussell
@argonium_gitlab :arrow_double_up:
Gary Russell
@garyrussell
With spring-cloud stream, there is no mechanism to start a container with 0 queues; but you can configure it with a dummy queue (even an anonymous one, by omitting the group) and remove it in the customizer before the container starts.
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer> cust() {
    return (container, destinationName, group) -> {
        this.containers.put(group, container);
        container.removeQueueNames(container.getQueueNames());
    };
}
Marcel Widmer
@marzelwidmer
Hello I am searching a documentation about the new https://docs.spring.io/spring-cloud-stream/docs/3.1.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-introducing with Kotlin ….. about Function,Supplier,Consumer do I need maybe also the spring-cloud-function-kotlin for this ? I thry to migrate my sample to the new API
@SpringBootApplication
class ProducerApplication
fun main(args: Array<String>) {
    runApplication<ProducerApplication>(*args)
}
@RestController
@RequestMapping(value = ["/producer"])
class ProducerController(private val producerMessagingService: ProducerMessagingService) {

    private val log = LoggerFactory.getLogger(javaClass)

    @PostMapping(value = ["{message}"])
    fun message(@PathVariable message: String) {
        log.info("------------> Receive message {} <-------------", message)
        producerMessagingService.sendMessage(message)
    }
}
@Service
class ProducerMessagingService(private val producerSource: ProducerSource) {
    fun sendMessage(message: String) = producerSource.producerChannel().send(MessageBuilder.withPayload(message).build())
}
interface ProducerSource {
    @Output(value = "producerOutput")
    fun producerChannel(): MessageChannel
}
@Configuration
@EnableBinding(value = [ProducerSource::class])
class MessagingConfig
K solo
@mr_k_solo_twitter
@Override
    public void process(String key, SpecificRecord value) {
        batchList.add((PriceSpecification) value);
        log.info("Context details " + context.taskId() + " " + context.partition() + " " +
                "storeSize " + batchList.size());
        if(batchList.size() == 10){
            var batch = PriceSpecificationBatch.newBuilder()
                    .setBatchCount(batchList.size())
                    .setBatchUuid(UUID.randomUUID().toString())
                    .setCreateDateTime(Instant.now())
                    .setPriceSpecifications(batchList)
                    .build();
            context.forward(UUID.randomUUID().toString(),batch, OUT_BOUND_TOPIC);
            batchList.clear();
            lastProcessedTime.set(System.currentTimeMillis());
        }
        context.commit();
    }

it throws the following error when invoked from the following consumer
Failed to process stream task 0_2 due to the following error:","stack_trace":"<#487f5746> org.apache.kafka.streams.errors.StreamsException: Unknown downstream node: <outbound_topic_name> either does not exist or is not connected to this processor

 @Bean
    public java.util.function.Consumer<KStream<String, PriceSpecificationSource>> process() {
        final long durationPerEventInMs = Duration.ofMinutes(serviceProperties.getTimeWindow()).toMillis();
        return input -> input
                .peek((k, v) -> log.info("pre processed message. {} key: {}  value: {} %n",JsonLoggerUtil.uuid(SERVICE_CODE),k, v))
                .filter((key, value) -> validationService.validate(value))
                .map((key, value) -> new KeyValue<>(key, transformerService.transform(value)))
                .transform(() -> new DeduplicationTransformer<>(durationPerEventInMs,(key, value) -> getId(value),
                        serviceProperties.getStoreName()), serviceProperties.getStoreName())
                .process(BatchProcessor::new);
    }

it's self explanatory what the issue is here, however I'm not. sure how to connect a topic with a processor

apologies , I'm desperately seeking help for the issue detailed (poorly ) above .. I'm experiencing the exception is being thrown whilst invoking the processor defined above from the consumer defined below it .. any help would be appreciated
maheshrajamanikurup
@maheshrajamanikurup

Hi, I am trying to create a windowed KTable using spring cloud stream binder. Code as follows.

return stream -> {
   stream
       .filter((key, order) -> {

   }).mapValues(s -> {

   })
       .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
       .windowedBy(TimeWindows.of(Duration.ofMinutes(n)))
       .reduce((Reducer<String>) (value1, value2) -> {
         return value2;
       }, Materialized.<String, String, WindowStore<Bytes, byte[]>>as(
           String.valueOf(Stores.persistentWindowStore(“snapshot",
               Duration.ofMinutes(n),
               Duration.ofMinutes(n),
               false))));
 };

Caused by: org.springframework.beans.factory.BeanInitializationException: Cannot setup function invoker for this Kafka Streams function.; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Name "org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier@6f99b7c8" is illegal, it contains a character other than ASCII alphanumerics, '.', '_' and '- . Unsure on why the statestore name is read as RocksDbWindowBytesStoreSupplier@6f99b7c8
Any help is appreciated .

Using spring boot 2.3.3 Release , Binder 3.0.8.release (Hoxton.SR8) , Kafka Streams 2.6.0

Chr3is
@Chr3is
Hey, @StreamListener was marked as deprecated - My question is now: Is there a migration guide to the function approach? How do I handle my injected headers for example?:
    @StreamListener(IncomingEvent.MyEvent)
    @SendTo(OutgoingEvent.MyEvent)
    @Nullable
    public MyOutgoingEvent handleEvent(
            @Payload MyIncomingEvent event,
            @Header(RECEIVED_TOPIC) String topic,
            @Header(RECEIVED_PARTITION_ID) int partitionId,
            @Header(CONSUMER) Consumer<?, ?> consumer,
            @Header(ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
        return super.handleEvent(event, topic, partitionId, consumer, acknowledgment);
    }
8 replies
Iulia Iancu
@iulia-iancu
Hi! I have a question related to spring cloud stream binder rabbit 3.1.1 version. I'm trying to configure the consumer app to create a queue. However when I start the consumer application it does not create a queue. Here is the configuration I use for the consumer app:
spring.cloud:
stream:
bindings:
process-in-0:
destination: pips.from-crawler
group: pips.to-api.fixture
For the producer app I use this configuration:
spring.cloud:
stream:
function:
bindings:
produce-out-0: pips.from-crawler
source: produce
The producer app creates the exchange pips.from-crawler that appears in rabbitmq UI. However the consumer app does not create a queue and bind it to pips.from-crawler
I tried to search online after the issue but I can't find any hint. I don't understand why the consumer app does not create the queue. Do you have any ideas about what I can do to solve the problem? Thank you!
3 replies
Bohdan Korinnyi
@BohdanKorinnyi

Hi, does anybody have an example how can I send message using SCS to different kafka topics that are not known in advance and received by HTTP call?

I found that it's easy to implement using KafkaTemplate and Kafka_Topic header but what's the approach with SCS?

4 replies
Iulia Iancu
@iulia-iancu
image.png
PJ
@pjiocnic
Can anyone please give me an example for Knative eventing + AWS SQS using spring cloud stream?
Iulia Iancu
@iulia-iancu
consumer:
Ivan
@ismorodin
I get infinite loop when throws MessageHandlingException and consumer working in batch mode
image.png
image.png
Andras Hatvani
@andrashatvani
hey, apparently EmitterProcessor has been deprecated in reactor 3.4 in favor of reactor.core.publisher.Sinks
i use the former as event producer, but i’m not sure how to implement the latter and i don’t see any reference to it in the spring cloud stream docs either - any relevant hint would be appreciated.
3 replies
jesrzrz
@jesrzrz
Hi, I'm having some issues with the ' InvalidStateStoreException: The state store, xxxxxxx , may have migrated to another instance'. This state store was working fine till today, when a new version of its producer was deployed. There are two instances of the producer in a kubernetes cluster and now, only one of both can connect to the state store. The point of fail is located in a QueryableStoreProvider.getStore method. I have increased the parameteres of the retry template, but it didn't fixed it. Any idea about this kind of behaviour? Thanks
Iheb Ben Romdhane
@iheb090
hello everyone ,
i want to create a message to multiple topic with a single producer
in the docs i found the branching method but it works with processor and not producer
there's a way to apply branching on supplier for exemple ??
Viacheslav Petriaiev
@Viacheslav-Petriaiev

Hello
I have the problem which is described at reactor/reactor-core#2145
I have the flux which is connected to the RabbitMQ topic. In case when the new message is processed by my flow and use the MongoDB I got the described exception.
What is the best way to stay connected with flux to the RabbitMQ when the error occurs without onErrorContinue usage?
I have my library for that:

                .onErrorContinue { th, causeElement ->
                    logger.error("Cause element: $causeElement, message: ${th.message}", th)
                }
                .doOnError { t: Throwable ->
                    logger.error(t.message ?: "Cause exception message is null", t)
                }
                .retry()

How onErrorContinue can be avoided for such a case?

Thank you

yaboong
@yaboong

Hello, I'm using spring-cloud-stream kafka binder with schema registry. (not kakfa streams)
What I'm trying to do is when un-deserializable message has got into an input topic, send the un-deserializable message to dlq.

So I tried like below but spring cloud stream app keeps retrying infinitely and says

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1

What am I doing wrong? Please help.

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
  kafka:
    binder:
      brokers: localhost:9092
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: input-topic-dlq
          autoCommitOnError: true
          autoCommitOffset: true
    default:
      consumer:
        configuration:
          schema.registry.url: http://localhost:8081
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
6 replies
yaboong
@yaboong

@garyrussell
sorry for asking a lot, but how to make onlyLogRecordMetadata=true work?
I did exactly the same like this (https://github.com/spring-projects/spring-kafka/issues/1659#issuecomment-749025910)

@Component
class Customizer {

    Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
        factory.getContainerProperties().setOnlyLogRecordMetadata(true);
    }

}

but when deserialization failed, it still prints all unrecognizable value like this

SeekToCurrentErrorHandler        : Backoff none exhausted for ConsumerRecord(topic = chat, partition = 0, leaderEpoch = 0, offset = 26, CreateTime = 1613666130733, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [-84, -19, 0, 5, .....so many values......., 69, 111, 114, 10])], isReadOnly = false), key = null, value = null)

In the ListenerUtils, it always goes to the else block even though I set exactly the same as you mentioned.

public static String recordToString(ConsumerRecord<?, ?> record) {
    if (Boolean.TRUE.equals(LOG_METADATA_ONLY.get())) {
        return record.topic() + "-" + record.partition() + "@" + record.offset();
    }
    else {
        return record.toString();
    }
}
2 replies
yaboong
@yaboong

Hello! I'm trying to use RetryTemplate and retry when an exception (specifically RedisException) occurred in message handling.
But when the exception has been thrown while handling messages, all the exceptions are caught in AbstractMessageHandler.

And then it wraps all the exceptions to MessageHandlingException in IntegrationUtils.

So there's no way for RetryTemplate to distinguish which exception specifically has been thrown. Because of this, I can't apply RetryTemplate's "retryOn" or "notRetryOn" method (I mean it doesn't work). Am I doing it wrong? there's another way to handle this? Please help.

2 replies
yaboong
@yaboong

In document 1.3.3. Consuming Batches it says

Retry within the binder is not supported when using batch mode, so maxAttempts will be overridden to 1

Does this mean that using retry-template-name: someRetryTemplateBean is not supported?

6 replies
D3jank
@D3jank

Hello, I've upgraded our app to spring boot 2.4.3 from 2.3.6 to use aws secrets manager spring.config.import=aws-secretsmanager:<secret>. It is a cloud stream app that uses Kafka. App works fine on 2.4.3, but when I add the spring.config.import, I get the following:
java.lang.IllegalStateException: Could not register object [com.amazonaws.services.secretsmanager.AWSSecretsManagerClient@5323b690] under bean name 'configDataAWSSecretsManager': there is already object [com.amazonaws.services.secretsmanager.AWSSecretsManagerClient@5323b690] ...

Any idea how to fix that?

happyangel
@happyangel:matrix.org
[m]
Hello, I have a spring-cloud-stream application that uses destination-is-pattern to listen to a dynamic set of Kafka topics. This works fine when I have one instance of the application running. When I start another instance of the application with the same consumer group id, it starts up fine, but remains idle. No messages get routed to the second instance of the application. I'm wondering if it is related to using the destination-is-pattern.
Following is my sanitized configuration
spring:
    cloud:
        function:
            definition: process1;process2
        stream:
            bindings:
                input1:
                    binder: kafka
                    destination: abc.* #Topic pattern to listen on
                    group: group1
                input2:
                    binder: kafka
                    destination: def.* #Topic pattern to listen on
                    group: group2
            function:
                bindings:
                    process1-in-0: input1
                    process2-in-0: input2
            kafka:
                binder:
                    brokers: localhost:9092
                    configuration:

                        application:
                            id: appid

                bindings:
                    input1:
                        consumer:
                            destination-is-pattern: true
                            auto-commit-offset: false
                    input2:
                        consumer:
                            destination-is-pattern: true
                            auto-commit-offset: false
happyangel
@happyangel:matrix.org
[m]
Its probably because I only have one partition per topic, which results in one-consumer per topic.
10 replies
René Schröder
@reneschroeder0000
Hi, is it possible to provide the kafka value-deserializer on the binding level?
spring.cloud.stream.bindings.my-input-binding.consumer-properties.value-deserializer = ... # this doesnt work
spring.cloud.stream.kafka.binder.consumer-properties.value-deserializer = ... # this works but i would prefer to put it within the binding configuration
3 replies
Aditya Parikh
@adityaparikh91087

Running a SCS Kafka Streams app against topics that I have required to change in course of local development.
The app keeps logging
ConsumerCoordinator : [Consumer clientId=-app--StreamThread-1-consumer, groupId=branching-kstreams-app] We received an assignment [topicA-0, topicA-2] that doesn't match our current subscription Subscribe(topicB); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription INFO --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=branching-kstreams-app-StreamThread-1-consumer, groupId=branching-kstreams-app] (Re-)joining group

and does not progress beyond that. How to address this?
I have changed the application id as well as deleted the state store changelog topics, changed the input topics and this does not seem to go away.

blake-bauman
@blake-bauman
Is there a way to have multiple, parallel Kafka Consumers when using a Flux? I'd like to handle each partition in parallel. I tried using spring.cloud.stream.bindings.input.consumer.concurrency=3 but got an error: Concurrency > 1 is not supported by reactive consumer, given that project reactor maintains its own concurrency mechanism.
Iheb Ben Romdhane
@iheb090
hello everyone
can i activate the consumer on batch mode in kafka streams binder ??
3 replies
happyangel
@happyangel:matrix.org
[m]
That fixed it! Many thanks Gary. I'll test out all my scenarios.
pranavvr1992
@pranavvr1992

Hi,

I have an issue with Spring cloud stream url. I am launching my spring cloud tasks from spring cloud stream. Stream contains http-kafka as source and taskLauncerKafka as sink. I used http-kafka service url to launch tasks. kubernetes service url changes after each deployment. First deployment the service name be like sample-stream-v1 after second deployment it will be sample-stream-v2. I am using kubernetes platform. So I have used kubernetes service urls to launch the tasks. The changes in the service name after each stream deployment is difficult to manage. I have tried enabling loadbalacer also. In that case also external ip address changed after each stream rollout. Any solutions ?

Iheb Ben Romdhane
@iheb090
hello everyone ,
can i activate dead letter queue (dlq) with a consumer on batch-mode with kafka binder ??
1 reply
Igor Rudenko
@fac30ff
hi, I have a question about spring cloud stream kafka streams, how to setup source topic? because it always assigned partition as bindings: name but my producer produce to specific topic
10 replies
Marcus Mosttler
@mmosttler_gitlab

Hello all,
We had been using the EmitterProcessor in projects that used the Supplier to publish a message. But now after upgrading one of them to use Springboot 2.4.3 and Spring Cloud 2020.0.1 I noticed that the EmitterProcessor has been Deprecated with notes to use Sinks instead. So I have modified the code to use the Sinks and still produce the same Message as before with key and value.

My Issue is that with the Sinks when publishing the Message with a header for message key it fails to publish. If I remove the key header it publishes just without a key. I am not sure what I am missing or if there is a more appropriate way to implement the Supplier now.

@Configuration
public class BrandConfigPublisher
{
    @Bean
    public Supplier<Flux<Message<MyBean>>> myBeanSupplier()
    {
        return () -> sinkBeanProducer().asFlux();
    }

    @Bean
    public Consumer<MyBean> sinkBeanPublisher(Sinks.Many<Message<MyBean>> sinkMyBeanProducer)
    {
        return myBean -> sinkBeanProducer
            .emitNext(
                MessageBuilder.withPayload(myBean)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, myBean.getName())
                    .build(),
                Sinks.EmitFailureHandler.FAIL_FAST
            )
        ;
    }

    @Bean
    public Sinks.Many<Message<MyBean>> sinkMyBeanProducer()
    {
        return Sinks.many().multicast().onBackpressureBuffer();
    }
}

Any help is greatly appreciated

Igor Rudenko
@fac30ff
hello again: as I understand aggregate; reduce and count same things by nature. But I need specific aggregator so I want to use own processor, and I findout method on KStream process that receive as supplier processor, but this method return void so it is terminal. So how I can use functional style if I need to process Kstream and than read from my store in the same function?
1 reply
yaboong
@yaboong

Hello!
The document says (same for the consumer)

The following properties are only available for Kafka Streams producers and must be prefixed with spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. For convenience, if there multiple output bindings and they all require a common value, that can be configured by using the prefix spring.cloud.stream.kafka.streams.default.producer.

so I tried using it like below but it doesn't work.

spring.cloud.stream.kafka.streams.default.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.default.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.default.producer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.default.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

this works

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

Is there anyone who's having the same issue?

René Schröder
@reneschroeder0000
Hi. I am using scs functional style with kafka binding. Is there any way to pause/resume the input binding at runtime?
2 replies
terancet
@terancet
Hello! Encountered the problem that newly created Kafka consumer group cannot read messages from the existing topic. At the production, we have the topic with the 12 partitions and 7 weeks retention policy. We are using the latest version of the Spring Cloud Stream Binder Kafka Streams (Hoxton.SR10). When we deploy the new consumer group, the the following exceptions happen in the consumers for that new consumer group: TimeoutException: Failed to send request after 30000 ms and DisconnectException: null and, they don't consumer any messages from the topic.
Could anybody point me the right direction for the further investigation?