Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Nov 26 13:20
    olegz closed #2250
  • Nov 26 13:20
    olegz commented #2250
  • Nov 25 22:16
    oliverfuehrertsystems commented #2245
  • Nov 25 15:31
    oliverfuehrertsystems commented #2245
  • Nov 25 15:11
    olegz commented #2245
  • Nov 25 15:10

    olegz on 4.x

    GH-2245 StreamBridge partitioni… (compare)

  • Nov 25 15:10

    olegz on 3.1.x

    GH-2245 StreamBridge partitioni… (compare)

  • Nov 25 15:09

    olegz on main

    GH-2245 StreamBridge partitioni… (compare)

  • Nov 25 14:48

    olegz on 4.x

    Fix ApplicationJsonMessageMarsh… (compare)

  • Nov 25 14:47

    olegz on 3.1.x

    Fix ApplicationJsonMessageMarsh… (compare)

  • Nov 25 14:46

    olegz on main

    Fix ApplicationJsonMessageMarsh… (compare)

  • Nov 25 14:44

    olegz on 3.1.x

    Fix ApplicationJsonMessageMarsh… (compare)

  • Nov 25 13:13
    JoseLora commented #2090
  • Nov 24 18:16
    sobychacko assigned #2250
  • Nov 24 18:16
    sobychacko opened #2250
  • Nov 24 09:07
    oliverfuehrertsystems commented #2245
  • Nov 24 08:56
    oliverfuehrertsystems commented #2245
  • Nov 24 08:53
    oliverfuehrertsystems commented #2245
  • Nov 19 10:02
    olegz commented #2246
  • Nov 17 10:46
    olegz commented #2245
AttitudeL
@AttitudeL
Hi could someone please take a look at this issue spring-cloud/spring-cloud-stream#2223
1 reply
dmarmugi
@dmarmugi

Hi, good afternoon. I think there's an inconsistency in the docs:
I'm looking at https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html#_retry_with_the_rabbitmq_binder which says

To force a message to be dead-lettered, either throw an AmqpRejectAndDontRequeueException or set requeueRejected to true (the default) and throw any exception.

but then https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html#_rabbitmq_consumer_properties says

requeueRejected
Whether delivery failures should be re-queued when retry is disabled or republishToDlq is false.
Default: false.

1 reply
Salman khandu
@salman-khandu
is there any way to change the original payload before sending it to DLQ for processing? I want to wrap other objects around the original payload for some processing. So that my DLQ consumer do processing based on wrapper object.
skorpmeister
@skorpmeister
Someone has a application.properties using Function to consume and publish for rabbitmq?
Actually i have this method:

public class RabbitProcessor implements Function<MessageDTO,MessageRabbit> {

@Override
public MessageRabbit apply(MessageDTO messageDTO) {

    LOGGER.debug("Received msg {}", messageDTO);
    return new MessageRabbit();
}

}

Im using spring boot cloud
Roussi Abdelghani
@roussi

hello All, I have an issue with my consumer, I got this error :

o.a.k.c.c.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

knowing that the configuration is like bellow :

  cloud:
    stream:
      function:
        routing:
          enabled: true
      source: testProducer
      bindings:
        functionRouter-in-0:
          destination: test-consumer-topic
          group: test-consumer
        testProducer-out-0:
          destination: test-producer-topic
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: 127.0.0.1:9092
          auto-create-topics: false
          headerMapperBeanName: kafkaHeaderMapper
          configuration:
            auto.offset.reset: earliest
          producer-properties:
            '[key.serializer]': org.apache.kafka.common.serialization.StringSerializer
            '[value.serializer]': org.springframework.kafka.support.serializer.JsonSerializer
        bindings:
          testProducer-out-0:
            producer:
              messageKeyExpression: headers['kafka_messageKey']
8 replies
skorpmeister
@skorpmeister
@olegz are you working with rabbitmq for the Function part?
Salman khandu
@salman-khandu
is there any good tool to monitor my application which is build on top of spring cloud stream?
skorpmeister
@skorpmeister

Local config for developer environment.

server:
port: 30703

spring:
cloud:
stream:
bindings:
in-0:
destination: testPefExchange
binder: rabbit1
out-0:
destination: testPefExchange
group: goyoooooo
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: xxxxx
virtual-host: xxxxx
port: 5672
username: xxxx
password: xxxx

How can i add routingkeys to my config?
RaickyDerwent
@RaickyDerwent
Hello! I'm trying out Spring Cloud Stream for the first time and I'm running into this exception
Caused by: org.apache.avro.AvroRuntimeException: Not an array: {"type":"record","name":"CDRFile","namespace":"com.example.avrocdrgen.types","fields":[{"name":"fileName","type":["null","string"]}]}
    at org.apache.avro.Schema.getElementType(Schema.java:361)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:689)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
    ... 46 more
Code : https://github.com/RaickyDerwent/hellofunctions
I don't think its a problem with the schema because I tried the same example on with KafkaTemplate and it worked fine
3 replies
versionguider
@versionguider
Hello I am trying to bind two kafka clusters but I can not do it on my own project. Then I tried it on the sample projects with multi-binder-two-kafka-clusters example. I am also using projects provided the docker compose file but binding does not work for it too. Both dataIn and dataOut topic are created on the first cluster which serves on 9092. Second cluster dont receive anything, even topics are not created. No errors or warns are populating during the execution. I can see only 4 warns during the shutdown which are:
1 reply

WARN 8120 --- [extShutdownHook] o.s.cloud.stream.binding.BindingService : Trying to unbind 'process-in-0', but no binding found.
WARN 8120 --- [extShutdownHook] o.s.cloud.stream.binding.BindingService : Trying to unbind 'process-in-0', but no binding found.
WARN 8120 --- [extShutdownHook] o.s.cloud.stream.binding.BindingService : Trying to unbind 'receive-in-0', but no binding found.
WARN 8120 --- [extShutdownHook] o.s.cloud.stream.binding.BindingService : Trying to unbind 'receive-in-0', but no binding found.

Can you give me any suggestions for this cluster binding?

Marx
@Marx2
Hi, maybe it's stupid question but I don't understand how queue filtering works. Let's say I have a few independent bindings, and I want to filter messages using different criteria. How can I do that? routing-expression or filterFunction are only single properties, so how can I define multiple independent filter criteria for each binding separately?
1 reply
brightinnovator
@brightinnovator
Can someone help me with latest working microservices demo or sample application to deploy in AWS and test? Please kindly help me..Need to learn it quickly as possible..
2 replies
Sergey Teplov
@serjteplov

Hi!
Having hard times to get message serialized as Message<ProductModelDto> on consumer side.
Сode on producer side:

ProductModelDto dto = new ProductModelDto(1L, "name", "desc", 100.5);
streamBridge.send(PRODUCTS_OUT, MessageBuilder.withPayload(dto).build());

Code on consumer side:

    @Bean
    public Consumer<Message<ProductModelDto>> productCreated() {
        return message -> {
            log.info("payload = {}", message.getPayload());
            log.info("payload class = {}", message.getPayload().getClass().getName());
            log.info("sourceData = {}", message.getHeaders().get("sourceData"));
        };
    }

Output:

payload = ProductModelDto(id=null, name=null, description=null, price=null)
payload class = ru.security.common.model.product.ProductModelDto
sourceData = (Body:'{"id":1,"name":"name","description":"desc","price":100.5}' MessageProperties [headers={}, timestamp=Tue Sep 07 11:13:02 MSK 2021, messageId=3840075f-1142-f94d-be37-7be950d73f54, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=product-service.product-created-or-changed, receivedRoutingKey=product-service.product-created-or-changed, deliveryTag=1, consumerTag=amq.ctag-3i5ECkRTPs5_O5ZW5af-MA, consumerQueue=product-service.product-created-or-changed.some-group4])

I expect to receive payload which have been sent by producer. But resulted payload is ProductModelDto(id=null, name=null, description=null, price=null)

2 replies
Igor Rudenko
@fac30ff
Hi! I want to use spring cloud function and one of requirement 2 input and 9 output topics, but all example with Kstream like Function<KStream<Object, Object>, KStream<Object, Object>> method() { return input -> input.branch(Predicate1, Predicate2) } but I need to branching by info from abstraction of Message Headers how can I retreive this and do branching if it only works with KStream?
2 replies
armkillbill
@armkillbill
1 reply
AttitudeL
@AttitudeL

Hi there,

I'm using spring-cloud-stream for kafka binding. I wanted to set the consumer of the binding to use MANUAL_IMMEDIATE ack mode. Below is the config in yml format.

spring.cloud.stream:
  function:
    definition: foo
  kafka.binder:
    brokers: localhost:9092
    autoCreateTopics: false
  kafka.bindings:
    foo-in-0.consumer:
      autoCommitOffset: false
      ackMode: MANUAL_IMMEDIATE

Could someone please confirm this is correct? I checked the log for the consumer config and I see the enable.auto.commit has been turned to false which is correct but I'm unable to see the ack mode in the log since it's Spring specific abstraction.

Thanks!

3 replies
Sergey Teplov
@serjteplov
Hi! I have difficulties to test binder functions with mocked beans. Question is posted on stackoverflow: https://stackoverflow.com/questions/69504957/spring-integration-test-binder-doesnt-mock-dependency
Can you have a look on this?
8 replies
AttitudeL
@AttitudeL

@garyrussell I have a question regarding the Acknowledge.nack() lifecycle. In the document it says nack will negatively acknowledge the current record and while it's sleeping it will discard remaining polls and redelivery the current record after it wakes up.

My question is that while nack is sleeping, say 30 seconds, I restart the application. Would the consumer thread sleep get interrupted and then immediately reseek the partition and redelivery the current record?

I checked the source code and it looks like the doSeeks is invoked before Thread.sleep if this is the case I guess it's ok to interrupt the thread sleep since the record has already been redelivered to the front of the message queue. Please correct me if I'm wrong thanks!

6 replies
bharatnpti
@bharatnpti
@olegz - how can I use Spring Cloud Stream in functional way to create a single exchange with multiple queues and route the messages dynamically at runtime to different queues based on header (or any other attribute), tried with StreamBridge but could not produce end-to-end working example
. Application will contain both producer and consumer. Any working example will be of great help
1 reply
AttitudeL
@AttitudeL

Hi,

Is anyone aware of the compatibility issue with the latest spring-cloud version?

I upgraded my app from 2020.0.3 to 2020.0.4 and one of the binding function is throwing me NPE.

Caused by: java.lang.NullPointerException
    at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.<init>(KafkaStreamsBindableProxyFactory.java:83)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211)
    ... 99 more

Strange thing is that spring-cloud is trying to bind one of the functions as stream application but it isn't which results in NPE...

7 replies
Luis Copetti
@lhcopetti

Hey all, I am currently trying to write a functional style binder using batchMode but I am getting the error below. If I set batchMode to false and remove the List<> parameters my sample project works. Is there an additional setting I should use to be able to work with a List of Message<?> ?

batchMode: true
public Consumer<List<Message<?>>> handler()

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
4 replies
K solo
@mr_k_solo_twitter

Hi Everyone, I’m collecting a series of metrics for a kafka streams application, the application is a spring boot. application using the micrometer-core library. The issue I have is I’d like a consolidated value for the meters of a specific name. To make this a little clearer, these metrics are presented as an array of n items with n being the number of threads configured for the Kafka Streams application. I must add that I can consolidate the values by adding the sum of the ‘FunctionCounter’ totals. I however don’t have a mechanism to trigger the ‘aggregator method’ when the meters I’m interested in are updated. The aggregator method is copied below:

private Double aggregateValues(String idName){
    return meterRegistry.getMeters().stream()
        .filter(meter -> meter.getId().getName().startsWith(idName))
        .filter(FunctionCounter.class::isInstance)
        .map(FunctionCounter.class::cast)
        .mapToDouble(FunctionCounter::count)
        .sum();
}

I created a configuration bean to attempt the same, no joy.

@Configuration
@Component
@Slf4j
public class Metrics {

    @Bean
    public FunctionCounter getAggregateCounter(MeterRegistry registry) {
        List<FunctionCounter> counters = registry.getMeters().stream().
            filter(meter -> meter.getId().getName().
                startsWith("Output_Message_Count"))
            .filter( FunctionCounter.class::isInstance )
            .map(FunctionCounter.class::cast)
            .collect(Collectors.toList());

        FunctionCounter counter = FunctionCounter
                .builder("Combined_Output_Message_Count", counters, state -> state.stream().mapToDouble(FunctionCounter::count).sum())
                .description("a description of what this counter does")
                .tags("region", "test")
                .register(registry);
        return counter;
    }
}

An example of the raw data output from actuator/prometheus endpoint is listed below

# HELP kafka_stream_thread_task_created_total The total number of newly created tasks
# TYPE kafka_stream_thread_task_created_total counter
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-4",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-3",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-2",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-1",} 5.0

The end goal is to have a meter that consolidates the totals, in the case of the raw data shown above, 20
There has been talk of this work being done in a proposal but it hasn’t progressed beyond the initial proposal stage
The proposal can be viewed at. The following url :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-674%3A+Metric+Reporter+to+Aggregate+Metrics+in+Kafka+Streams

2 replies
Ghost
@ghost~616fc00b6da0373984884308
This message was deleted
3 replies
Andreas Evers
@andreasevers

Hi folks. I am running into a classcast exception when using the Kafka binder together with the Kafka Streams binder, and I'm not using the spring-cloud-stream-test-support dependency (this is happening while running the app normally):

class jdk.proxy2.$Proxy134 cannot be cast to class org.springframework.messaging.MessageChannel (jdk.proxy2.$Proxy134 is in module jdk.proxy2 of loader 'app'; org.springframework.messaging.MessageChannel is in unnamed module of loader 'app')

I've got a regular Consumer Bean consuming a certain KTable, but I also have a REST endpoint that can post messages to another topic. For that I am using the StreamBridge. Wiring it all up gave me the classcast exception.

4 replies
Salman khandu
@salman-khandu
I want to do performance testing of the spring cloud stream with rabbitmq binder. is there any way to do with spring cloud stream?
iguissouma
@iguissouma
Hello, I have an application using multi binder rabbitmq and kafka, I'm reading a message from rabbitmq queue and constructing new message with a new payload and set some headers and sending it to a different queus and a kafka topic, the partner consuming from the kafka topic is complaining about the content of headers amqp_*, spring_json_headers_types, source_data .... any reason why this headers related to rabbitmq/spring are sent to the topic kafka, is it the expected behaviour? anything I can do to send the minimal required headers? I'm using the ol annotation style @StremaListener and injecting multiple @Output channels with spring cloud stream version Hoxton.SR8
2 replies
Salman khandu
@salman-khandu
I have posted an issue on stack overflow regarding concurrency not working with Artemis binder https://stackoverflow.com/questions/69753400/spring-cloud-stream-artemis-binder-concurrency-not-working can anyone please help?
AttitudeL
@AttitudeL

I have this stream binding function as the following:

public BiFunction<KStream<String, String>, KTable<String, String>, KStream<String, String>> process() {
 ...
}

As you see the second input param I chose to materialize to ktable. My question is that is it possible to set the first input binding consumer to use latest for auto.offset.reset and the second input binding which is the ktable consumer to use earliest?

27 replies
AttitudeL
@AttitudeL
I have a question regarding the num.stream.threads setting for the stream app. Let's say that I have a stream app consuming from 2 input topics and I have 1 partition per topic and also have num.stream.threads set to 1. In this case do I get this single stream thread assigned to the partition for both topics?
4 replies
Andras Hatvani
@andrashatvani
Hi, when can the integration of kafka 3.0.0 with Apple Silicon support be expected?
6 replies
aliaksandr-pleski
@aliaksandr-pleski

Hi,
I have SCS app that works with Kafka Streams and also should be able to consume messages from RabbitMQ (Spring Boot 2.5.5 and Spring Cloud 2020.0.3).
These are my dependencies:

    implementation 'org.apache.kafka:kafka-streams'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
    implementation 'org.springframework.kafka:spring-kafka'
        implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'

This is my code:

@Bean
    public BiConsumer<KStream<String, Event1>, KTable<String, Event2>> processEvents() {
        // this one is working if I remove spring-cloud-stream-binder-rabbit from dependencies and configuration
    }
@Bean
    public Consumer<Event1> gpsEvents() {
        // this one should consume events from Rabbit
        return System.out::println;
    }

This is my application.yaml:

spring:
  rabbitmq:
    addresses: amqps://rabbitmq-url
  cloud:
    stream:
      function:
        definition: processEvents;gpsEvents
      bindings:
        processEvents-in-0:
          binder: kafka
          destination: kafka-topic1
        processEvents-in-1:
          binder: kafka
          destination: kafka-topic2
        gpsEvents-in-0:
          binder: rabbit
          destination: rabbit-queue

When I run this I'm getting
No bean named 'kafka-KafkaStreamsBinderConfigurationProperties' available
If I set default-binder: kafka and remove binder: kafka from processEvents configuration I'm getting
java.lang.ClassCastException: class com.sun.proxy.$Proxy110 cannot be cast to class org.springframework.messaging.MessageChannel

Any advice on how to make friends kafka-streams and rabbit binders? Thanks in advance

16 replies
enterprisesoftwaresolutions
@enterprisesoftwaresolutions

Hi! I'm trying to implement a delayed retry queue with Kafka + latest Spring Cloud Stream. The basic idea is: create a function which receives a message, waits until a timestamp which is contained in the message header, then forward it. I have two questions:

1) How do I avoid Kafka consumer timeout/rebalance?
2) How do I skip the deserialization/serialization configuration so I can just "shovel" the content regardless it's type?

Thanks in advance!

3 replies
rvnhere
@rvnhere
Hello Team, I'm looking to merge two Kafka topics into one using Spring Cloud Kafka Stream. Both the input topics contain similar entities - very similar to the example provided at https://youtu.be/5NoU7D4OGA0?t=167 Could you please advise how to achieve this using Spring Cloud Kafka Stream ? Please help redirect me if there is an example available already ?
2 replies
enterprisesoftwaresolutions
@enterprisesoftwaresolutions
Is there a way to get the source topic name from the Message object in a functional Kafka consumer/processor?
2 replies
AttitudeL
@AttitudeL
Quick question. I currently use logAndContinue to handle deserialization error however I wanted to log more information. Is it possible to write a custom error handler and assign it to deserializationExceptionHandler?
2 replies
Salman khandu
@salman-khandu
Has anyone used sqs binder ? I am facing issue like the consumer application didn't get the model object in the spring function Consumer<T> function(). I have raised an issue idealo/spring-cloud-stream-binder-sqs#37.
3 replies
rvnhere
@rvnhere

Hello Team, I might be missing something basic with deserialization configuration, could you please advise ?

Application 1
@Bean
public Function<KStream<String, TaskRequest>, KStream<String, TaskRequest>> process1() {
}

Header _ TypeId _ is set to value “com.xxx.model.TaskRequest” for the data sent to the output topic

Application 2
@Bean
public Function<KStream<String, ObjectNode>, KStream<String, ObjectNode>> process2() {
}

Is it possible to receive the data from output topic of Application 1 as generic Jackson ObjectNode ? Doing so fails with below deserialization error :

"class":"org.springframework.messaging.converter.MessageConversionException","msg":"failed to resolve class name. Class not found [com.xxx.model.TaskRequest]; nested exception is java.lang.ClassNotFoundException: com.xxx.model.TaskRequest","stack":["org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)

2 replies
Sherry Ummen
@sherry-ummen

Hello, we are getting an error when using cloud stream + kafka binder. And it happens only for 1 specific Bean. Not sure why.

The error stack is :

021-11-25 11:48:44.777 ERROR 74215 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : java.lang.NoClassDefFoundError: kotlinx/coroutines/Dispatchers
    at org.springframework.cloud.function.context.config.CoroutinesUtils.invokeSuspendingSupplier(CoroutinesUtils.kt:103)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.invoke(KotlinLambdaToFunctionAutoConfiguration.java:163)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.apply(KotlinLambdaToFunctionAutoConfiguration.java:127)
    at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.get(KotlinLambdaToFunctionAutoConfiguration.java:179)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:657)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:506)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.get(SimpleFunctionRegistry.java:517)
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.get(PartitionAwareFunctionWrapper.java:83)
    at org.springframework.integration.dsl.IntegrationFlows$1.doReceive(IntegrationFlows.java:174)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:444)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Our gradle dep looks something like this

plugins {
    id("org.springframework.boot") version "2.5.6"
    id("io.spring.dependency-management") version "1.0.11.RELEASE"
    id("com.github.davidmc24.gradle.plugin.avro") version "1.2.1"
    kotlin("jvm") version "1.5.31"
    kotlin("plugin.spring") version "1.5.31"
    kotlin("plugin.jpa") version "1.5.31"
    kotlin("plugin.allopen") version "1.5.31"
}

version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11

repositories {
    mavenCentral()
    maven(url = "https://packages.confluent.io/maven/")
}

extra["springCloudVersion"] = "2020.0.4"
extra["testcontainersVersion"] = "1.16.0"

not able to figure out why it fails

2 replies
Salman khandu
@salman-khandu
I have used spring cloud stream with a rabbit binder. I want to send a manual acknowledgment from a consumer. How can I do with the spring cloud function?
    public Consumer<Model> consumer() {
        return model -> {
            // send manully ack
        };
    }
2 replies
Miguel González
@magg
hello there. can you tell me how or point me to a class on how stopping/starting a Kafka Streams is implemented. I'm referring to this https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.5/reference/html/spring-cloud-stream-binder-kafka.html#_binding_visualization_and_control_in_kafka_streams_binder. I have just tried calling the actuator endpoint. I'm assuming it's calling kafkastreams.close() and kafkastreams.start() is that correct? I would like to implement something similar for the regular kafka streams library...
2 replies