Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jun 17 21:10
    rafaelbrizola commented #2129
  • Jun 17 16:14
    dharezlak commented #2129
  • Jun 16 04:31
    rafaelbrizola commented #2129
  • Jun 15 21:40
    andrewchouman commented #2107
  • Jun 15 21:39
    andrewchouman commented #2107
  • Jun 15 21:35
    andrewchouman commented #2107
  • Jun 15 21:35
    andrewchouman commented #2107
  • Jun 15 10:46
    mkluzacek commented #2187
  • Jun 15 06:44
    mkluzacek commented #2187
  • Jun 15 06:43
    mkluzacek commented #2187
  • Jun 15 06:43
    mkluzacek commented #2187
  • Jun 14 16:50
    olegz commented #2187
  • Jun 11 14:44

    olegz on main

    StreamBridge and ChannelInterce… (compare)

  • Jun 11 14:44
    olegz closed #2186
  • Jun 10 23:40
    sobychacko synchronize #2186
  • Jun 10 23:36
    sobychacko assigned #2186
  • Jun 10 23:36
    sobychacko opened #2186
  • Jun 10 12:40
    olegz commented #1931
  • Jun 09 13:20
    kim-ae commented #2184
  • Jun 09 11:41
    Spcemarine commented #2142
jay
@legendjaks
Instead of configuring Spring Cloud Stream Bindings in properties file as spring.cloud.stream.bindings, is there anyway to configure this programatically ? like using @Bean ?
Multiple services will produce to one topic and only one service will consume. So if I can configure in code, I can re-use the code
jay
@legendjaks
In imperative way SeekToCurrentErrorHandler can be used for handling errors in Kafka. Is there anything available for Reactive kafka consumers?
jay
@legendjaks

In imperative way SeekToCurrentErrorHandler can be used for handling errors in Kafka. Is there anything available for Reactive kafka consumers?

Got it. Its out of scope of SCSt. spring-cloud/spring-cloud-stream#1922

Vladimir Kabanov
@vkabanorv
Hi all! Sorry if this is a noob question. How/where can I get spring-cloud-stream-binder-kafka-streams version 3.1.3-SNAPSHOT for use in my Java project? The most recent version on maven is 3.1.2. I need 3.1.3 because I'm running into this issue https://stackoverflow.com/questions/66881210/spring-cloud-stream-kafka-streams-binder-kafkaexception-could-not-start-stream
5 replies
Sidney Lee
@sesidlee_twitter
hi, guys. does anyone know how to configure the internal repartition/changelog topics in spring?
https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#internal-topic-parameters
we are defining value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy to enable multiple event types within one of our topics, but this does not seem to be applied to the internal repartition/changelog topics associated with that topic... any suggestions?
6 replies
Ali Yeganeh
@AliYeganeh
This message was deleted
2 replies
jokefaker
@jokefaker

Hi,I’m using Spring Boot 2.3.5 and Spring Cloud Hoxton.SR8
here is my Spring Cloud Stream Kafka config

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          kafka-input:
            consumer:
              enableDlq: true
              dlqName: error.dlq
              dlqPartitions: 1
              dlqProducerProperties:
                topic:
                  properties:
                    retention.ms: 86500000
                  replication-factor: 2
      bindings:
        kafka-input:
          partitionCount: 2
          group: test

I have three problems/questions:

  1. the partitionCount property not work when auto create topics, it is a input bindings, and it will work for a output bindings. is it a correct situation?
  2. the dlqProducerProperties not work when auto create topics, the topic properties and replication-factor is not used .

  3. the dlqPartitions is not documented in reference guide , but I find the props in the code, should I use this props to control the dlq topic partition?

4 replies
Sahil Chaddha
@Sahil333
Hi All, I have multiple event types on a given topic but I am only interested in a subset of those event types. I have a single consumer group but multiple @StreamListener for each event type. But this leads to offset lags because of events I am not interested in which doesn't represent the actual lags for my consumer group. This is because I have auto-commit set to false. So, as a solution, I am thinking of having a default stream listener which is picked in the last when no filter matches and this listener will just only commit the offset manually. This way my lags for a consumer group will reflect the actual values.
Can someone suggest me how to do this with SCSt?
Kevin Lewis
@kevinrlewis
Hello, I am running into an issue using Spring Cloud Streams to publish to Kafka. Error is: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule. I get this when I build through mvn and then run the jar. Running this within my IDE, Intellij, works fine. I'm guessing it is a dependency versioning issue. I'm using spring-cloud-dependencies 2020.0.1 and spring-boot-starter-parent 2.4.5. Any help would be appreciated! Thank you.
2 replies
AttitudeL
@AttitudeL

Dear developers. I noticed that when I use nack() I see [Consumer clientId=consumer-anonymous.00b11061-91b5-49cd-b5de-1497631e25c8-26, groupId=anonymous.00b11061-91b5-49cd-b5de-1497631e25c8] in the log, somehow a groupId with anonymous prefix was created. I wonder if this is intentional? Also when I run integration tests I see a bunch of

2021-04-20 15:04:32 - [Consumer clientId=consumer-anonymous.00b11061-91b5-49cd-b5de-1497631e25c8-26, groupId=anonymous.00b11061-91b5-49cd-b5de-1497631e25c8] Connection to node -1 (/127.0.0.1:63714) could not be established. Broker may not be available.
2021-04-20 15:04:32 - [Consumer clientId=consumer-anonymous.00b11061-91b5-49cd-b5de-1497631e25c8-26, groupId=anonymous.00b11061-91b5-49cd-b5de-1497631e25c8] Bootstrap broker 127.0.0.1:63714 (id: -1 rack: null) disconnected

in the logs. There aren't any test failures but I wonder if I did something wrong?

I'm using 2.3.10.RELEASE for spring-boot and 3.0.11.RELEASE for spring-cloud-stream.

Gary Russell
@garyrussell
You get an anonymous group if the consumer binding has no group property.
AttitudeL
@AttitudeL
oh ok

@garyrussell thanks adding group now removes the anonymous my apology I thought the applicationId would override the group in my case.

However I still have the Connection to node -1 (/127.0.0.1:64891) could not be established. Broker may not be available. at the end of log after running the test. Are bindings supposed to drop the consumers at the end?

AttitudeL
@AttitudeL
Maybe I should manually disconnect the consumer in the @AfterEach clause in my test?
Gary Russell
@garyrussell
If you are creating your own consumers in your tests then, yes, you should close them at the end of the test.
AttitudeL
@AttitudeL
No this consumer group was created via binding. The consumers I created in my test have all been disconnected after each test
spring.cloud.stream:
  bindings:
    retry-processor-in-0:
      group: retry-group
      destination: retry_topic
      consumer:
        maxAttempts: 1
@garyrussell This is how I created the consumer group
Gary Russell
@garyrussell
Something strange then; the binding should be stopped by the ApplicationContext before it destroys the EmbeddedKafkaBroker. That said, it's just log noise - as you said, the tests all pass.
AttitudeL
@AttitudeL
Ok I will take a closer look in the log
Thanks
AttitudeL
@AttitudeL
@garyrussell strange...after I reverted version upgrade for spring-cloud-stream from 3.0.11.RELEASE to 3.0.7.RELEASE, these noises are gone.
So most likely something new introduced between the releases? I can't tell for sure but the noises did disappear from the log
AttitudeL
@AttitudeL
From the log I discovered that after the shutdown somehow a new consumer is created from that group and because the broker has already been shutdown it will not be able to connect to the broker. I don't know why there would be a new consumer created...
AttitudeL
@AttitudeL
Seems 3.0.9.RELEASE made the difference, I tried 3.0.8 with no noise, but 3.0.9 all in a sudden the binding seems to be creating a new consumer after the EmbeddedKakfaBroker shuts down which results in not able to connect.
Gary Russell
@garyrussell
The binding shouldn't start again. If you can provide a small, simple, conplete, example that exhibits the behavior, attach it to a new issue against spring-cloud-stream on GitHub.
matthoward
@matthoward
I'm struggling to get a set of streams to work together (with the new functional model)- I have 2 separate Consumer<Kstream> writing to KTables, and then I'm trying to do a join on those KTables with a BiFunction<KTable, KTable, KStream> ... I think I'm not defining the bindings correctly for the joining processor - the join never receives the changes written to the source KTables unless I set the destination to the actual changelog topic name... for KTable inputs, should I set the bindings destination to the name of the KTable or something totally different? I have the consumer.materializedAs set to the KTable name, but without the destination name it just points to some empty new topic it creates for the binding instead of the changelog
13 replies
AttitudeL
@AttitudeL

The binding shouldn't start again. If you can provide a small, simple, conplete, example that exhibits the behavior, attach it to a new issue against spring-cloud-stream on GitHub.

Thanks I will try to extract that code out and create a simple complete project.

Leonardo Torres
@leonardo_ta_twitter
Hello , i am looking the way to configure transactions but only for certain producer , when i use it :
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: , all my producers in my microservice become transactional
6 replies
Leonardo Torres
@leonardo_ta_twitter
Hi, regarding this code in class : org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner . Using spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar '
public void afterPropertiesSet() {
if (this.metadataRetryOperations == null) {
RetryTemplate retryTemplate = new RetryTemplate();
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(10);
        retryTemplate.setRetryPolicy(simpleRetryPolicy);

        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(100);
        backOffPolicy.setMultiplier(2);
        backOffPolicy.setMaxInterval(1000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        this.metadataRetryOperations = retryTemplate;
    }
}'   I need to set a RetryTemplate in startup with specific values, how can i get this ?  Something like a customizer for KafkaTopicProvisioner.
3 replies
Navid Ghahremani
@ghahramani
Hi, I am using reactive for spring cloud function for reading Kafka and I was wondering, is there a way to pause the binder from consuming when the database (Mongodb) is down?
Vladimir Kabanov
@vkabanorv
Hi all! I'm using the kafka streams binder, trying to implement an app with a processor. When running on our production Kafka v. 1.0.0 I get the following error: Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: Creating topics with default partitions/replication factor are only supported in CreateTopicRequest version 4+. The following topics need values for partitions and replicas. As I understand, there is a way to configure the num of partitions and replicas being passed to the request - can anyone confirm this? I've tried using e.g. these parametes spring.cloud.stream.kafka.binder.replicationFactor=3
spring.cloud.stream.kafka.binder.minPartitionCount=6
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.autoAddPartitions=true but to no avail =(
2 replies
Navid Ghahremani
@ghahramani

Hi,
I am trying to use manual commit in spring cloud stream kafka (reactive and function version) but it seems no matter what config I use, it still does autocommit , could someone point me to an example that does manual commit?

I have used

spring.kafka.consumer.enable-auto-commit: false
spring.cloud.stream.kafka.default.consumer.auto-commit-offset: false
spring.cloud.stream.kafka.bindings.my-input-in-0.consumer.auto-commit-offset: false
spring.cloud.stream.bindings.my-input-in-0.consumer.auto-commit-offset: false
Knut Schleßelmann
@kschlesselmann

Since the update to Spring Boot 2.4.5 and Spring Cloud 2020.0.2 I end up with

2021-04-29 09:36:49.159 ERROR [dispatcher,,] 44062 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'wtf.destination'; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=byte[32], headers={id=6af1ee72-9b99-af85-8b3d-2b5a46aa63ac, kafka_messageKey=43, contentType=application/json, timestamp=1619681809158}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:450)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:140)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
    …

if I try to receive from the OutputDestination. Any ideas what's going on here?

4 replies
Navid Ghahremani
@ghahramani

Hey guys, I am trying to pause the consumer in the spring cloud steam reactive but it shows me the error related to KafkaConsumer is not thread-safe...

Is there a way to pause the consumer in reactive mode?

5 replies
Leonardo Torres
@leonardo_ta_twitter
Hi guys, i am trying to write to 3 topics inside the same @StreamListener method , using Kafka streams and exacty_once and StreamBridge ... but i get the error : The binder 'xxxkstream' cannot bind a org.springframework.cloud.stream.messaging.DirectWithAttributesChannel
    @StreamListener(WorkerValueKStreamsChannel.WORKER_VALUE_KSTREAMS_INPUT)
    @SendTo(WorkerValueKStreamsChannel.WORKER_VALUE_KSTREAMS_OUTPUT)
    @AbsisOperation(eventType=AbsisOperation.EventType.LIGHT_EVENT)
    public KStream<?, TransformedWorkerValue> process(KStream<?, WorkerValue> input) {


        KStream<Object, TransformedWorkerValue> transformed =  input.flatMap(

                (key, value) -> {
                  List<KeyValue<Object, TransformedWorkerValue>> result = new LinkedList<KeyValue<Object, TransformedWorkerValue>>();
                  TransformedWorkerValue transvalue= new TransformedWorkerValue();
                  transvalue.setID(value.getID());
                  transvalue.setEDAD(value.getEDAD());
                  transvalue.setNOMBRE(value.getNOMBRE());
                  transvalue.setHERMANOS(value.getHERMANOS());

                  transvalue.setEMPRESA("YYYYY");
                  transvalue.setFECHALTA(Instant.now());
                  transvalue.setSALARIO(value.getSALARIO());
                  transvalue.setNUEVOSALARIO(value.getSALARIO());
                  transvalue.setCHGTMST(Instant.now());

                  result.add(KeyValue.pair(key, transvalue));
                  return result;
                }
              );

        transformed.peek((k,v) ->  { 
                if (v.getEDAD() > 20) {

                    streamBridge.send("firstdynamictarget-output", v);
                }else {
                    streamBridge.send("seconddynamictarget-output", v);
                }
            } );

        return transformed;

    }
4 replies
Navid Ghahremani
@ghahramani

Hey guys,
I am trying to pause the binding when the system health check status is down or unknown; I created a function that does the health check and pause the binding but the problem is I need to use ack = MANUAL to prevent committing offset in case of health check down but I am having a hard time with for two reasons

1- Messages are processing parallel and not sequential even though I use Schedulers.single() for both subscribeOn and publishOn so basically I am receiving multiple About to check the health check log even when the previous message is still processing
2- doOnNext is no called FIFO, so for instance, it acknowledges the offset 2 then 1

Here is what I did:

Configuration:

spring.cloud.stream.kafka.bindings.caseClose-in-0.consumer.ack-mode: MANUAL_IMMEDIATE
spring.kafka.consumer.enable-auto-commit: false
spring.kafka.consumer.max-poll-records: 1

The stream:

    @Bean
    fun caseClose(
        service: CaseCloseService,
        healthCheckService: HealthCheckService,

        streamBridge: StreamBridge,
        bindingsEndpoint: BindingsEndpoint
    ) = Function<Flux<Message<String>>, Mono<Void>> { emitter ->
        emitter
            .publishOn(Schedulers.single())
            .subscribeOn(Schedulers.single())
            .flatMap { message ->
                println("About to check the health check")
                healthCheckService
                    .retrieveStatus()
                    .publishOn(Schedulers.single())
                    .subscribeOn(Schedulers.single())
                    .flatMap { status ->
                        when (status) {
                            Status.UP -> Mono.just(message)
                            else -> {
                                bindingsEndpoint.changeState("caseClose-in-0", BindingsEndpoint.State.PAUSED)
                                Mono.empty()
                            }
                        }
                    }
            }
            .flatMap { request ->
                Mono
                    .just("Test")
                    .map { request }
                    .onErrorResume { exception -> handleDlq(request, streamBridge, exception) }
            }
            .onErrorContinue { exception, model ->
                logger.error("Unable to finish the message successfully, request: $model ", exception)
            }
            .doOnNext { request ->
                println("About to acknowledge partition: ${request.headers[KafkaHeaders.RECEIVED_PARTITION_ID]} and offset: ${request.headers[KafkaHeaders.OFFSET]}")
                val acknowledgment = request.headers[KafkaHeaders.ACKNOWLEDGMENT] as? Acknowledgment?
                acknowledgment?.acknowledge() ?: logger.warn("No acknowledge object in the message headers")
            }
            .then()
    }

    private fun handleDlq(
        request: Message<String>,
        streamBridge: StreamBridge,
        exception: Throwable
    ): Mono<Message<String>> {
        val receivedTopic = request.headers[KafkaHeaders.RECEIVED_TOPIC]
        val attempts = request.headers[KafkaHeaders.DELIVERY_ATTEMPT, AtomicInteger::class.java] ?: AtomicInteger(1)
        val topic = if (attempts.get() < 3) {
            attempts.incrementAndGet()
            receivedTopic.toString()
        } else {
            val dlq = receivedTopic.toString() + "-dlq"
            dlq
        }
        val message = MessageBuilder
            .fromMessage(request)
            .setHeaderIfAbsent("spring.cloud.stream.sendto.destination", topic)
            .setHeader(KafkaHeaders.DELIVERY_ATTEMPT, attempts)
            .setHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC, receivedTopic)
            .setHeader(KafkaHeaders.DLT_EXCEPTION_FQCN, exception.javaClass.name)
            .setHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE, exception.message)
            .setHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, exception.stackTraceToString())
            .build()
        streamBridge.send(topic, message)

        request.headers.kafkaAcknowledge()

        return Mono.empty()
    }
13 replies
Arvind Kumar GS
@arvindkgs
Is there a spring property to configure timeout connecting to spring cloud stream binding?
forgetaboutme
@forgetaboutme
@garyrussell I figured it would be easier to ask you here, but is it possible to reset the offset if the offset is already committed?
3 replies
Swethaa Ramesh
@swethaaramesh_twitter

Hi, I had a question on why my spring boot tests keep trying to establish a connection to localhost kafka when I have only the kafka binder and cloud stream test dependencies -

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${spring-cloud.kafka.version}</version>
</dependency> and
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<version>${cloudstream.test.version}</version>
<scope>test</scope>
</dependency>

1 reply
Camilo Ariza
@aariza1
Hello, i have a question regarding the BindingProperties class and ConsumerProperties. I thought in the ConsumerProperties i may find the autoCommitOffset property as described here docs for manual acknowledgement. I can not find that property. Has anyone an idea?
5 replies
Pietro Galassi
@pietrogalassi
Hi all. Please help figure out this scenario: i have a microservices that has more than 1 instance. This microservice reads from topic (ordersTopic) with atleast 24 partitions and aggregate data (sums) then puts the results over another topic (countTopic) . Then this countTopic is read from the same microservices in order to handle some logic on the count. Due to multiple instance can i have bugs on the count numbers ? Thanks a lot.
2 replies
Iulia Iancu
@iulia-iancu
Hello! I have a project in which I use spring-boot 2.4.5 and spring cloud 2020.0.2. and spring-cloud-starter-stream-rabbit 3.1.2. When the message is received in the consumer app I see this error: Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.tfg.sdapi.archiver.pojo.dto.ArchiverMessageDto ([B is in module java.base of loader 'bootstrap'; com.tfg.sdapi.archiver.pojo.dto.ArchiverMessageDto is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @1bc7681b)
I tried to search for it but I couldn't find any hints.
2 replies
Do you have any ideas about what's going on and how I can fix it?
Thank you!
The problem seems to happen when deserializing in the consumer
if I use String instead of my custom class it works
Knut Schleßelmann
@kschlesselmann
@garyrussell Seems that ackMode is now the preferred way to control manual ackknowledgements in kafka. What about actually acknowledging the messages? Is it still intended to use the message header like https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.2/reference/html/spring-cloud-stream-binder-kafka.html#_usage_examples ? At least the code mentions some AcknowledgingMessageListeneryou'd have to provide?
1 reply
Enrique Fernández-Polo Puertas
@enrique-fernandez-polo

Hello! I have a project in which I use spring-boot 2.4.5 and spring cloud 2020.0.2. and spring-cloud-starter-stream-rabbit 3.1.2. When the message is received in the consumer app I see this error: Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.tfg.sdapi.archiver.pojo.dto.ArchiverMessageDto ([B is in module java.base of loader 'bootstrap'; com.tfg.sdapi.archiver.pojo.dto.ArchiverMessageDto is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @1bc7681b)
I tried to search for it but I couldn't find any hints.

Same issue here! I am using Kotlin btw... The message works if I use StreamBridge

Enrique Fernández-Polo Puertas
@enrique-fernandez-polo
The content-type is always application/octet-streambut if I wrap the message in Message<MyClass> the content-type is application/json and it works. I don't get it, json is supposed to be the default one, isn't it? How can I force json serialization for every message?