Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Oct 16 19:24
    marcospds commented #2230
  • Oct 15 17:49

    olegz on 3.1.x

    Temporarily disable few tests (compare)

  • Oct 15 17:39

    olegz on 3.1.x

    GH-2235 Fix partitioning suppor… Update function dependency in P… (compare)

  • Oct 14 14:30
    sobychacko commented #2219
  • Oct 12 21:56
    bono007 commented #2230
  • Oct 12 19:32
    sobychacko commented #2230
  • Oct 12 19:31
    sobychacko commented #2230
  • Oct 12 14:54
    sabbyanandan assigned #2231
  • Oct 12 14:54
    sabbyanandan assigned #2233
  • Oct 10 19:37
    TheNotary commented #2024
  • Oct 10 10:13
    5aab commented #2234
  • Oct 10 03:36
    bono007 commented #2228
  • Oct 10 03:14
    bono007 commented #2230
  • Oct 07 20:32
    artembilan opened #2233
  • Oct 07 17:03
    olegz commented #2221
  • Oct 07 15:11
    olegz commented #2221
  • Oct 07 15:05

    olegz on main

    GH-2221 - Add internal caching … (compare)

  • Oct 07 15:00
    olegz commented #2221
  • Oct 07 14:11
    jaiananth commented #2230
  • Oct 07 14:10
    jaiananth commented #2230
Jeon ChangMin
@rafael81
Hi, I have experienced "Connection Reset" issues in cdc-source-debezium.
So I posted this issues in debezium project.
They told me that the issues was resolved in latest version of debezium.
Currently, cdc-source-supplier depends on version of "1.3.1 Final" which is released in 2020-12.
But latest version is "1.6.0 Final".
Is there any plan to upgrade this version ?
https://github.com/spring-cloud/stream-applications/blob/main/functions/supplier/cdc-debezium-supplier/pom.xml
2 replies
matthoward
@matthoward

Is there any way to get a reference to the Serdes that SCS creates? we generally use String keys and Avro values but we have an aggregation that needs a complex Avro key... for our Materialized.as we need to specify the serde - so I was wondering if we can just inject it as a bean. I tried to add it as a param to my function bean but got an exception:

Parameter 1 of method processRevenue in com.axispoint.rytebox.revenue.ingestion.RevenueIngestionApplication required a single bean, but 2 were found:

- messageConverterDelegateSerde: defined by method 'messageConverterDelegateSerde' in class path resource [org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.class]
- compositeNonNativeSerde: defined by method 'compositeNonNativeSerde' in class path resource [org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.class]
3 replies
Igor Rudenko
@fac30ff
Hi there. I've got a problem, I need to pause consumer if s3 bucket contains any file.
So I use @PostContruct in my service where I check If s3bucket contain something I inject BindingsEndpoint and setState to Pause (I've log with this file and it present), but in real when I start application method with @StreamListener starting consume as if I don't pause anything
4 replies
siarheiMishota
@siarheiMishota
Hi there, I have few topics in kafka and I read from them data. If spring cannot handle message and throw exception so spring record to dlqName topic, because spring.cloud.stream.kafka.bindings.input.consumer.enable-dlq=true, I need to record message in the same topic from which the message was read. I want to do it with spring expression, for example which doesn’t work spring.cloud.stream.kafka.bindings.input.consumer.dlqName=${spring.cloud.stream.bindings.input.destination}.
Can I do it with spring expression?
Keir Bailey
@keirbailey_gitlab

Hello! Thanks in advance for the help, it's great that there is so much support for SCS!

My query is around DLQs. I am using the functional style Spring Cloud Stream w/ Kafka Streams Binder. I am consuming data from Kafka as a String (where the string is valid JSON). Once the steam is consumed I .mapValues(jsonString -> convertToAvroSpecificRecord). During the conversion from String to Avro I validate the data and ensure it fits the schema.

Is there a way that I can utilise the spring cloud DLQ infrastructure to send stream events to the DLQ topic defined in the application.yml configuration? Currently what I'm doing is returning an KStream<Key, Either<ValidationFailure<String>, A>> where A is the desired SpecificRecord object. After that I'm branching the Either stream and manually sending the ValidationFailure<String> records to a manually configured DLQ topic.

Any help on this would be great! Would love to clean up this code! Thanks

angel-trail
@angel-trail:matrix.org
[m]

Is it possible in Spring Cloud Streams, with kafka-binder, to use batch and get access to the MessageHeaders.
I'm trying to do something like

spring.cloud.stream.bindings.<input>.consumer.batch-mode: true

@Bean
public Consumer<List<Message<String>>> process {
// do something with headers...
}

This is throwing -

ERROR LoggingHandler org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@71f46378]; nested exception is java.lang.ClassCastException: class org.springframework.messaging.support.GenericMessage cannot be cast to class java.util.List (org.springframework.messaging.support.GenericMessage is in unnamed module of loader 'app'; java.util.List is in module java.base of loader 'bootstrap'), failedMessage=GenericMessage ...

Thanks in advance!

3 replies
ghilainm
@ghilainm

Hellow, using latest version of spring cloud stream (3.1.3) the content type referenced in the Message headers is not honoured on the output wire.

I am using functional programmation model with the following method signature

    @Override
    public Message<?> apply(Message<?> originalMessage) {
        return MessageBuilder.fromMessage(originalMessage)
                .setHeader(MessageHeaders.CONTENT_TYPE, someCondition ?  MimeTypeUtils.APPLICATION_XML_VALUE : "application/edifact")
                .build();
    }

The message payload received is of type byte[] and the output message payload is also of type byte[].

The contentType header is overridden to application/json which is not expected.

1 reply
Jeon ChangMin
@rafael81
Hello, does jdbc sink application support "UPDATE" statement ?
3 replies
ranjith1235
@ranjith1235

Hello,

Is there a way to requeue the message instead of sending to dlq in Kafka while using reactive stream. For eg: in the below case if the db is down, I would like to requeue the message. Is that possible?

@Bean
public Consumer<Flux<Message<SampleDto>>> consumer() {
// db operation.
}

Taulant Mehmeti
@taulanti_gitlab
Hi, is there any resource or example of a cloud stream Kafka producer that produces only once, when called.
Willem
@willemvr17_twitter
Hi All
New to Cloud Stream but know integration quite well. My issue is simple: Returning list of messages from a Supplier function that's a PollableBean is supposed to split into multiple messages according to docs, but it always ends up as 1 message with an array of Messages in the payload in Rabbit.
@Autowired
private SftpFolderScannerController sftpFolderScannerController;
    @PollableBean(splittable = true)
    public Supplier<List<Message<String>>> filenames() {
        return () ->  sftpFolderScannerController.scanAll();
    }
2 replies
Willem
@willemvr17_twitter
I have tried returning a list of Strings as well with same result. Ends up as array in Rabbit instead of multiple messages
Bill Bauernschmidt
@wbauern
Hello, I'm evaluating the use of Spring Cloud Stream and SCDF to basically bridge different messaging systems. Examples, receive any message posted to an external RabbitMQ, transform them and publish them to a Kafka topic, IBM MQ or an AWS SNS. I was very surprised to see that there was no prebuilt sinks for any of those that I could find. Am I not looking in the right place or would those need to be custom written? We are also looking into Apache NiFi and it does provide all of those but we are a Spring shop and the preference would be to go with the Spring solution if possible. Thanks for any guidance!
3 replies
Marcus Lyth
@twitz

Hello, is it possible to return a list that will publish each event as separate messages?

I tried returning a list of Message<?> and expected that to work but it still published the event as a serialized list of Message<?>, do I have to use batch consumer to do batch producing?

8 replies
Knut Schleßelmann
@kschlesselmann
Hi! If I have a simple Processor like Function<Flux<In>, Flux<Out>>, when does SCSt acknowledge the offset if I use the kafka binder? Currently we experience lost messages and try to find the cause.
22 replies
pjadav
@pjadav:matrix.org
[m]
How can I disable exchange/queue autocreation with RabbitMQ configuration. I want my application to throw error if exchange/queue not found on broker. With current spring cloud stream configuration not found exchange/queue gets auto created
6 replies
AttitudeL
@AttitudeL

Hi there, I have a quick question. Let's say I have the following binding to materialize stream into a global ktable.
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store-1

Can I use interactive query for this "incoming-store-1" from a different binding later on?

2 replies
AttitudeL
@AttitudeL

Sorry another silly question. I have the following code where I use to pause the consumer based on some timing information from the payload. I have the binding to take the Message which contains the consumer, topic, partition, acknowledge as well as payload. I then check the payload timing info and either pause the consumer or process the payload.

public java.util.function.Consumer<Message<Mock>> processor() {
        return message -> {
            MessageHeaders headers = message.getHeaders();
            Mock payload = message.getPayload();
            Consumer<?, ?> consumer = headers.get(KafkaHeaders.CONSUMER, Consumer.class);
            String topic = headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
            Integer partitionId = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
            Acknowledgment acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                       ......................
}

My question is that I would need to join on a ktable to update some information from the payload. However since I don't have the KStream from this binding I cannot do the leftJoin directly with the payload. I wonder if I can somehow do the leftJoin inside this processor binding or I would need to produce the payload to another topic and then create a new topology to do the leftJoin there?

2 replies
realaseem
@realaseem
Hi, I have a simple consumer using spring cloud streams binder and when scaling up and increasing the number of threads, few messages are getting processed by two consumers of the same consumer group. We have 64 partitions and 16 instances of the app with 4 threads each. The configuration that we have is below: Any idea what setting could be going wrong
spring:
  application:
    name: my-consumer
  profiles:
    active: local
  kafka:
    bootstrap-servers: localhost:9092
  cloud:
    stream:
      function:
        definition: process
      bindings:
        process-in-0:
          destination: myTopic
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            configuration:
              commit:
                interval:
                  ms: 1000
              num:
                stream:
                  threads: 4
            deserializationExceptionHandler: sendToDlq
4 replies
AttitudeL
@AttitudeL

Hi, I upgraded spring-boot to the latest 2.5.4 and I noticed some weird behaviour. I have a bunch of kafka stream integration test using embeddedKafka. Some tests will fail on the second run after the version upgrade. It seems that the state doesn't get cleaned up after the first run or before the second run.

I didn't change anything in the code and I have @DirtiesContext at the test class level. Does anyone else facing the same issue?

NOTE: after manually cleaning up the /tmp folder on my local, the test will pass again, but still fail on the second run.

9 replies
MS
@marksinclair_twitter
Hello, I'm using spring-cloud-stream w/kafka and I'm trying to implement a Producer warm up strategy to improve instance start up performance. Is there a recommend approach? I tried creating a dummy producer however it doesn't warmup all producers. let me know your thoughts
vineethjoyson1
@vineethjoyson1
Hi All, When I enable Swagger in my application which already have a kafka producer and client. Swagger is trying to create a topic on that kafka. And this fails and the application will not start. If I disable swagger everything works fine.
main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: inMemorySwaggerResourcesProvider-out-0
8 replies
FlxGschlr
@FlxGschlr

Hello,
we are currently looking for a solution to shutdown our application gracefully so that
no messages read from kafka via spring cloud stream reactive functions are lost. Our workflow:

    Read message from kafka -> do stuff with the message content -> save result in mongodb

Since our migration to spring boot oci images we get the following error when shutting down our application:

    org.springframework.data.mongodb.ClientSessionException: state should be: server session pool is open; nested exception is java.lang.IllegalStateException: state should be: server session pool is open
    at org.springframework.data.mongodb.core.MongoExceptionTranslator.translateExceptionIfPossible(MongoExceptionTranslator.java:148)
    at org.springframework.data.mongodb.core.ReactiveMongoTemplate.potentiallyConvertRuntimeException(ReactiveMongoTemplate.java:2814)
    at org.springframework.data.mongodb.core.ReactiveMongoTemplate.lambda$translateException$90(ReactiveMongoTemplate.java:2797)
    at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3676)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:140)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:140)
    ...

Any ideas? We tried the following but to no avail:

  1. (https://stackoverflow.com/questions/67614658/can-i-apply-graceful-shutdown-when-using-spring-cloud-stream-kafka-3-0-3-release)

     @Bean
     public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
         return (container, dest, group) -> container.getContainerProperties()
                 .setShutdownTimeout(30_000L);
     }

thx

3 replies
Kirill Andreev
@Flame239
Hi :wave:
Stumbled across the same issue as @marksinclair_twitter have.
When I send kafka message for the first time after application startup its super slow (up to 2s). According to logs, spring is creating producers not during application startup, but when first message is sent.
How can we create producer beforehand?
24 replies
AttitudeL
@AttitudeL
Hi, after upgrading to the latest spring boot and spring cloud stream, I'm seeing these in the log Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update from the stream thread. I'm using kafka. These seems to be constantly logged I'm a little concerned that the disk space is getting eaten up. I wonder if there's a way to turn such logs off?
6 replies
Marx
@Marx2
Hi, what can be a cause that StreamBridge is not available in context during Junit5 tests? I have
@Import(TestChannelBinderConfiguration.class)
and property
spring.cloud.stream.source=foo;bar"
and dependency
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <version>3.1.3</version> <type>test-jar</type> <scope>test</scope> <classifier>test-binder</classifier> </dependency>
yet I'm getting
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.cloud.stream.function.StreamBridge' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
Salman khandu
@salman-khandu

Can we declare dlq processor using spring cloud function like

```
@Bean
public Consumer<Message> dlqprocess(DLQProcess dlqprocess) {
    return t -> dlqprocess.do(t);
}
```

I read the spring cloud stream rabbit binder doc https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-rabbit.html#_non_partitioned_destinations where it declares @RabbitListener to process DLQ message. But I want to make it generic same as a consumer.

2 replies
AttitudeL
@AttitudeL
Hi could someone please take a look at this issue spring-cloud/spring-cloud-stream#2223
1 reply
dmarmugi
@dmarmugi

Hi, good afternoon. I think there's an inconsistency in the docs:
I'm looking at https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html#_retry_with_the_rabbitmq_binder which says

To force a message to be dead-lettered, either throw an AmqpRejectAndDontRequeueException or set requeueRejected to true (the default) and throw any exception.

but then https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties says

requeueRejected
Whether delivery failures should be re-queued when retry is disabled or republishToDlq is false.
Default: false.

1 reply
Salman khandu
@salman-khandu
is there any way to change the original payload before sending it to DLQ for processing? I want to wrap other objects around the original payload for some processing. So that my DLQ consumer do processing based on wrapper object.
skorpmeister
@skorpmeister
Someone has a application.properties using Function to consume and publish for rabbitmq?
Actually i have this method:

public class RabbitProcessor implements Function<MessageDTO,MessageRabbit> {

@Override
public MessageRabbit apply(MessageDTO messageDTO) {

    LOGGER.debug("Received msg {}", messageDTO);
    return new MessageRabbit();
}

}

Im using spring boot cloud
Roussi Abdelghani
@roussi

hello All, I have an issue with my consumer, I got this error :

o.a.k.c.c.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

knowing that the configuration is like bellow :

  cloud:
    stream:
      function:
        routing:
          enabled: true
      source: testProducer
      bindings:
        functionRouter-in-0:
          destination: test-consumer-topic
          group: test-consumer
        testProducer-out-0:
          destination: test-producer-topic
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: 127.0.0.1:9092
          auto-create-topics: false
          headerMapperBeanName: kafkaHeaderMapper
          configuration:
            auto.offset.reset: earliest
          producer-properties:
            '[key.serializer]': org.apache.kafka.common.serialization.StringSerializer
            '[value.serializer]': org.springframework.kafka.support.serializer.JsonSerializer
        bindings:
          testProducer-out-0:
            producer:
              messageKeyExpression: headers['kafka_messageKey']
8 replies
skorpmeister
@skorpmeister
@olegz are you working with rabbitmq for the Function part?
Salman khandu
@salman-khandu
is there any good tool to monitor my application which is build on top of spring cloud stream?
skorpmeister
@skorpmeister

Local config for developer environment.

server:
port: 30703

spring:
cloud:
stream:
bindings:
in-0:
destination: testPefExchange
binder: rabbit1
out-0:
destination: testPefExchange
group: goyoooooo
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: xxxxx
virtual-host: xxxxx
port: 5672
username: xxxx
password: xxxx

How can i add routingkeys to my config?
RaickyDerwent
@RaickyDerwent
Hello! I'm trying out Spring Cloud Stream for the first time and I'm running into this exception
Caused by: org.apache.avro.AvroRuntimeException: Not an array: {"type":"record","name":"CDRFile","namespace":"com.example.avrocdrgen.types","fields":[{"name":"fileName","type":["null","string"]}]}
    at org.apache.avro.Schema.getElementType(Schema.java:361)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:689)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
    ... 46 more
Code : https://github.com/RaickyDerwent/hellofunctions
I don't think its a problem with the schema because I tried the same example on with KafkaTemplate and it worked fine
3 replies
versionguider
@versionguider
Hello I am trying to bind two kafka clusters but I can not do it on my own project. Then I tried it on the sample projects with multi-binder-two-kafka-clusters example. I am also using projects provided the docker compose file but binding does not work for it too. Both dataIn and dataOut topic are created on the first cluster which serves on 9092. Second cluster dont receive anything, even topics are not created. No errors or warns are populating during the execution. I can see only 4 warns during the shutdown which are:
1 reply

WARN 8120 --- [extShutdownHook] o.s.cloud.stream.binding.BindingService : Trying to unbind 'process-in-0', but no binding found.
WARN 8120 --- [extShutdownHook] o.s.cloud.stream.binding.BindingService : Trying to unbind 'process-in-0', but no binding found.
WARN 8120 --- [extShutdownHook] o.s.cloud.stream.binding.BindingService : Trying to unbind 'receive-in-0', but no binding found.
WARN 8120 --- [extShutdownHook] o.s.cloud.stream.binding.BindingService : Trying to unbind 'receive-in-0', but no binding found.

Can you give me any suggestions for this cluster binding?

Marx
@Marx2
Hi, maybe it's stupid question but I don't understand how queue filtering works. Let's say I have a few independent bindings, and I want to filter messages using different criteria. How can I do that? routing-expression or filterFunction are only single properties, so how can I define multiple independent filter criteria for each binding separately?
1 reply
brightinnovator
@brightinnovator
Can someone help me with latest working microservices demo or sample application to deploy in AWS and test? Please kindly help me..Need to learn it quickly as possible..
2 replies
Sergey Teplov
@serjteplov

Hi!
Having hard times to get message serialized as Message<ProductModelDto> on consumer side.
Сode on producer side:

ProductModelDto dto = new ProductModelDto(1L, "name", "desc", 100.5);
streamBridge.send(PRODUCTS_OUT, MessageBuilder.withPayload(dto).build());

Code on consumer side:

    @Bean
    public Consumer<Message<ProductModelDto>> productCreated() {
        return message -> {
            log.info("payload = {}", message.getPayload());
            log.info("payload class = {}", message.getPayload().getClass().getName());
            log.info("sourceData = {}", message.getHeaders().get("sourceData"));
        };
    }

Output:

payload = ProductModelDto(id=null, name=null, description=null, price=null)
payload class = ru.security.common.model.product.ProductModelDto
sourceData = (Body:'{"id":1,"name":"name","description":"desc","price":100.5}' MessageProperties [headers={}, timestamp=Tue Sep 07 11:13:02 MSK 2021, messageId=3840075f-1142-f94d-be37-7be950d73f54, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=product-service.product-created-or-changed, receivedRoutingKey=product-service.product-created-or-changed, deliveryTag=1, consumerTag=amq.ctag-3i5ECkRTPs5_O5ZW5af-MA, consumerQueue=product-service.product-created-or-changed.some-group4])

I expect to receive payload which have been sent by producer. But resulted payload is ProductModelDto(id=null, name=null, description=null, price=null)

2 replies
Igor Rudenko
@fac30ff
Hi! I want to use spring cloud function and one of requirement 2 input and 9 output topics, but all example with Kstream like Function<KStream<Object, Object>, KStream<Object, Object>> method() { return input -> input.branch(Predicate1, Predicate2) } but I need to branching by info from abstraction of Message Headers how can I retreive this and do branching if it only works with KStream?
2 replies