Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Sep 20 17:59
    bono007 commented #2223
  • Sep 20 16:02
    AttitudeL commented #2223
  • Sep 18 21:38
    bono007 commented #2223
  • Sep 17 15:38
    metalpalo commented #2216
  • Sep 17 15:38
    metalpalo commented #2216
  • Sep 17 14:54
    davidmelia commented #2170
  • Sep 17 13:37
    metalpalo commented #2216
  • Sep 16 22:07
    pitinga commented #1114
  • Sep 16 13:33
    garyrussell commented #2213
  • Sep 16 13:27
    cicorias commented #2213
  • Sep 16 13:25
    olegz commented #2213
  • Sep 16 13:23
    olegz commented #2213
  • Sep 16 13:20
    cicorias commented #2213
  • Sep 16 13:19
    cicorias commented #2213
  • Sep 16 12:21
    olegz commented #2213
  • Sep 16 12:13
    cicorias commented #2213
  • Sep 16 12:13
    cicorias commented #2213
  • Sep 16 12:06
    olegz commented #2213
  • Sep 16 11:56
    cicorias commented #2213
  • Sep 15 21:12
    garyrussell commented #2213
AttitudeL
@AttitudeL

The binding shouldn't start again. If you can provide a small, simple, conplete, example that exhibits the behavior, attach it to a new issue against spring-cloud-stream on GitHub.

Thanks I will try to extract that code out and create a simple complete project.

Leonardo Torres
@leonardo_ta_twitter
Hello , i am looking the way to configure transactions but only for certain producer , when i use it :
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: , all my producers in my microservice become transactional
6 replies
Leonardo Torres
@leonardo_ta_twitter
Hi, regarding this code in class : org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner . Using spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar '
public void afterPropertiesSet() {
if (this.metadataRetryOperations == null) {
RetryTemplate retryTemplate = new RetryTemplate();
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(10);
        retryTemplate.setRetryPolicy(simpleRetryPolicy);

        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(100);
        backOffPolicy.setMultiplier(2);
        backOffPolicy.setMaxInterval(1000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        this.metadataRetryOperations = retryTemplate;
    }
}'   I need to set a RetryTemplate in startup with specific values, how can i get this ?  Something like a customizer for KafkaTopicProvisioner.
3 replies
Navid Ghahremani
@ghahramani
Hi, I am using reactive for spring cloud function for reading Kafka and I was wondering, is there a way to pause the binder from consuming when the database (Mongodb) is down?
Vladimir Kabanov
@vkabanorv
Hi all! I'm using the kafka streams binder, trying to implement an app with a processor. When running on our production Kafka v. 1.0.0 I get the following error: Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: Creating topics with default partitions/replication factor are only supported in CreateTopicRequest version 4+. The following topics need values for partitions and replicas. As I understand, there is a way to configure the num of partitions and replicas being passed to the request - can anyone confirm this? I've tried using e.g. these parametes spring.cloud.stream.kafka.binder.replicationFactor=3
spring.cloud.stream.kafka.binder.minPartitionCount=6
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.autoAddPartitions=true but to no avail =(
2 replies
Navid Ghahremani
@ghahramani

Hi,
I am trying to use manual commit in spring cloud stream kafka (reactive and function version) but it seems no matter what config I use, it still does autocommit , could someone point me to an example that does manual commit?

I have used

spring.kafka.consumer.enable-auto-commit: false
spring.cloud.stream.kafka.default.consumer.auto-commit-offset: false
spring.cloud.stream.kafka.bindings.my-input-in-0.consumer.auto-commit-offset: false
spring.cloud.stream.bindings.my-input-in-0.consumer.auto-commit-offset: false
Knut Schleßelmann
@kschlesselmann

Since the update to Spring Boot 2.4.5 and Spring Cloud 2020.0.2 I end up with

2021-04-29 09:36:49.159 ERROR [dispatcher,,] 44062 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'wtf.destination'; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=byte[32], headers={id=6af1ee72-9b99-af85-8b3d-2b5a46aa63ac, kafka_messageKey=43, contentType=application/json, timestamp=1619681809158}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:450)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:140)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
    …

if I try to receive from the OutputDestination. Any ideas what's going on here?

4 replies
Navid Ghahremani
@ghahramani

Hey guys, I am trying to pause the consumer in the spring cloud steam reactive but it shows me the error related to KafkaConsumer is not thread-safe...

Is there a way to pause the consumer in reactive mode?

5 replies
Leonardo Torres
@leonardo_ta_twitter
Hi guys, i am trying to write to 3 topics inside the same @StreamListener method , using Kafka streams and exacty_once and StreamBridge ... but i get the error : The binder 'xxxkstream' cannot bind a org.springframework.cloud.stream.messaging.DirectWithAttributesChannel
    @StreamListener(WorkerValueKStreamsChannel.WORKER_VALUE_KSTREAMS_INPUT)
    @SendTo(WorkerValueKStreamsChannel.WORKER_VALUE_KSTREAMS_OUTPUT)
    @AbsisOperation(eventType=AbsisOperation.EventType.LIGHT_EVENT)
    public KStream<?, TransformedWorkerValue> process(KStream<?, WorkerValue> input) {


        KStream<Object, TransformedWorkerValue> transformed =  input.flatMap(

                (key, value) -> {
                  List<KeyValue<Object, TransformedWorkerValue>> result = new LinkedList<KeyValue<Object, TransformedWorkerValue>>();
                  TransformedWorkerValue transvalue= new TransformedWorkerValue();
                  transvalue.setID(value.getID());
                  transvalue.setEDAD(value.getEDAD());
                  transvalue.setNOMBRE(value.getNOMBRE());
                  transvalue.setHERMANOS(value.getHERMANOS());

                  transvalue.setEMPRESA("YYYYY");
                  transvalue.setFECHALTA(Instant.now());
                  transvalue.setSALARIO(value.getSALARIO());
                  transvalue.setNUEVOSALARIO(value.getSALARIO());
                  transvalue.setCHGTMST(Instant.now());

                  result.add(KeyValue.pair(key, transvalue));
                  return result;
                }
              );

        transformed.peek((k,v) ->  { 
                if (v.getEDAD() > 20) {

                    streamBridge.send("firstdynamictarget-output", v);
                }else {
                    streamBridge.send("seconddynamictarget-output", v);
                }
            } );

        return transformed;

    }
4 replies
Navid Ghahremani
@ghahramani

Hey guys,
I am trying to pause the binding when the system health check status is down or unknown; I created a function that does the health check and pause the binding but the problem is I need to use ack = MANUAL to prevent committing offset in case of health check down but I am having a hard time with for two reasons

1- Messages are processing parallel and not sequential even though I use Schedulers.single() for both subscribeOn and publishOn so basically I am receiving multiple About to check the health check log even when the previous message is still processing
2- doOnNext is no called FIFO, so for instance, it acknowledges the offset 2 then 1

Here is what I did:

Configuration:

spring.cloud.stream.kafka.bindings.caseClose-in-0.consumer.ack-mode: MANUAL_IMMEDIATE
spring.kafka.consumer.enable-auto-commit: false
spring.kafka.consumer.max-poll-records: 1

The stream:

    @Bean
    fun caseClose(
        service: CaseCloseService,
        healthCheckService: HealthCheckService,

        streamBridge: StreamBridge,
        bindingsEndpoint: BindingsEndpoint
    ) = Function<Flux<Message<String>>, Mono<Void>> { emitter ->
        emitter
            .publishOn(Schedulers.single())
            .subscribeOn(Schedulers.single())
            .flatMap { message ->
                println("About to check the health check")
                healthCheckService
                    .retrieveStatus()
                    .publishOn(Schedulers.single())
                    .subscribeOn(Schedulers.single())
                    .flatMap { status ->
                        when (status) {
                            Status.UP -> Mono.just(message)
                            else -> {
                                bindingsEndpoint.changeState("caseClose-in-0", BindingsEndpoint.State.PAUSED)
                                Mono.empty()
                            }
                        }
                    }
            }
            .flatMap { request ->
                Mono
                    .just("Test")
                    .map { request }
                    .onErrorResume { exception -> handleDlq(request, streamBridge, exception) }
            }
            .onErrorContinue { exception, model ->
                logger.error("Unable to finish the message successfully, request: $model ", exception)
            }
            .doOnNext { request ->
                println("About to acknowledge partition: ${request.headers[KafkaHeaders.RECEIVED_PARTITION_ID]} and offset: ${request.headers[KafkaHeaders.OFFSET]}")
                val acknowledgment = request.headers[KafkaHeaders.ACKNOWLEDGMENT] as? Acknowledgment?
                acknowledgment?.acknowledge() ?: logger.warn("No acknowledge object in the message headers")
            }
            .then()
    }

    private fun handleDlq(
        request: Message<String>,
        streamBridge: StreamBridge,
        exception: Throwable
    ): Mono<Message<String>> {
        val receivedTopic = request.headers[KafkaHeaders.RECEIVED_TOPIC]
        val attempts = request.headers[KafkaHeaders.DELIVERY_ATTEMPT, AtomicInteger::class.java] ?: AtomicInteger(1)
        val topic = if (attempts.get() < 3) {
            attempts.incrementAndGet()
            receivedTopic.toString()
        } else {
            val dlq = receivedTopic.toString() + "-dlq"
            dlq
        }
        val message = MessageBuilder
            .fromMessage(request)
            .setHeaderIfAbsent("spring.cloud.stream.sendto.destination", topic)
            .setHeader(KafkaHeaders.DELIVERY_ATTEMPT, attempts)
            .setHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC, receivedTopic)
            .setHeader(KafkaHeaders.DLT_EXCEPTION_FQCN, exception.javaClass.name)
            .setHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE, exception.message)
            .setHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, exception.stackTraceToString())
            .build()
        streamBridge.send(topic, message)

        request.headers.kafkaAcknowledge()

        return Mono.empty()
    }
13 replies
Arvind Kumar GS
@arvindkgs
Is there a spring property to configure timeout connecting to spring cloud stream binding?
forgetaboutme
@forgetaboutme
@garyrussell I figured it would be easier to ask you here, but is it possible to reset the offset if the offset is already committed?
3 replies
Swethaa Ramesh
@swethaaramesh_twitter

Hi, I had a question on why my spring boot tests keep trying to establish a connection to localhost kafka when I have only the kafka binder and cloud stream test dependencies -

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${spring-cloud.kafka.version}</version>
</dependency> and
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<version>${cloudstream.test.version}</version>
<scope>test</scope>
</dependency>

1 reply
Camilo Ariza
@aariza1
Hello, i have a question regarding the BindingProperties class and ConsumerProperties. I thought in the ConsumerProperties i may find the autoCommitOffset property as described here docs for manual acknowledgement. I can not find that property. Has anyone an idea?
5 replies
Pietro Galassi
@pietrogalassi
Hi all. Please help figure out this scenario: i have a microservices that has more than 1 instance. This microservice reads from topic (ordersTopic) with atleast 24 partitions and aggregate data (sums) then puts the results over another topic (countTopic) . Then this countTopic is read from the same microservices in order to handle some logic on the count. Due to multiple instance can i have bugs on the count numbers ? Thanks a lot.
2 replies
Iulia Iancu
@iulia-iancu
Hello! I have a project in which I use spring-boot 2.4.5 and spring cloud 2020.0.2. and spring-cloud-starter-stream-rabbit 3.1.2. When the message is received in the consumer app I see this error: Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.tfg.sdapi.archiver.pojo.dto.ArchiverMessageDto ([B is in module java.base of loader 'bootstrap'; com.tfg.sdapi.archiver.pojo.dto.ArchiverMessageDto is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @1bc7681b)
I tried to search for it but I couldn't find any hints.
2 replies
Do you have any ideas about what's going on and how I can fix it?
Thank you!
The problem seems to happen when deserializing in the consumer
if I use String instead of my custom class it works
Knut Schleßelmann
@kschlesselmann
@garyrussell Seems that ackMode is now the preferred way to control manual ackknowledgements in kafka. What about actually acknowledging the messages? Is it still intended to use the message header like https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.2/reference/html/spring-cloud-stream-binder-kafka.html#_usage_examples ? At least the code mentions some AcknowledgingMessageListeneryou'd have to provide?
1 reply
Enrique Fernández-Polo Puertas
@enrique-fernandez-polo

Hello! I have a project in which I use spring-boot 2.4.5 and spring cloud 2020.0.2. and spring-cloud-starter-stream-rabbit 3.1.2. When the message is received in the consumer app I see this error: Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.tfg.sdapi.archiver.pojo.dto.ArchiverMessageDto ([B is in module java.base of loader 'bootstrap'; com.tfg.sdapi.archiver.pojo.dto.ArchiverMessageDto is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @1bc7681b)
I tried to search for it but I couldn't find any hints.

Same issue here! I am using Kotlin btw... The message works if I use StreamBridge

Enrique Fernández-Polo Puertas
@enrique-fernandez-polo
The content-type is always application/octet-streambut if I wrap the message in Message<MyClass> the content-type is application/json and it works. I don't get it, json is supposed to be the default one, isn't it? How can I force json serialization for every message?
ghilainm
@ghilainm

Hi guys, just a question, I have a GlobalChannelInterceptor with a pattern 'input-*'. I am using streamBridge to send a message to a dynamic destination 'aggregator-in'. The channel interceptor is applied which is not expected. Any idea why?

Digging into code it seems that the pattern is simply not applied :). But I probably overlooked something.

private void addInterceptors(AbstractMessageChannel messageChannel) {
        String[] interceptorNames = this.applicationContext.getBeanNamesForType(ChannelInterceptor.class);
        List<ChannelInterceptor> interceptors = messageChannel.getInterceptors();
        for (String interceptorName : interceptorNames) {
            ChannelInterceptor interceptor = this.applicationContext.getBean(interceptorName, ChannelInterceptor.class);
            if (!CollectionUtils.containsInstance(interceptors, interceptor)) {
                messageChannel.addInterceptor(interceptor);
            }
        }
    }
1 reply
jay
@legendjaks
I am using SCSt function for producing & consuming messages. I am facing a strange issue, where my consumers suddenly stopped working. Producers working fine (StreamBridge). When I change the consumer group of the consumers, then again it started working. Is there any scenario, why SCSt consumers will stop working? When this happens, the Kafka doesn’t show that consumers are connected and log says channel has unknown consumer
jay
@legendjaks
Whether kafka server and spring-kafka should be same? my kafka is 2.0 and spring kafka is 2.6
Eric Deandrea
@edeandrea

what i'm trying to do seems pretty simple but I can’t figure it out. I have one function (Supplier<Flux<Integer>>) which generates data on a 5 second interval & publishes that to a kafka topic. I have another function (currently a Consumer<Flux<Integer>>) that listens on the kafka topic and does some stuff (in my simple example just multiplies the number by some constant). This part I have working just fine.

What I'm struggling with is I want to take the result of that computation & serve it as a server-sent event through a Spring WebFlux controller. I dont want that listener function to publish it back to kafka - instead i'd prefer just to keep it in memory.

seems pretty simple but I can't seem to figure out that last part of it. I tried changing the consumer to be a Function<Flux<Integer>, Flux<Double>>, but I’m not sure how to “wire” that into my controller class so that my controller method can return Flux<ServerSentEvent<Double>>.

I came across this example (https://dzone.com/articles/spring-cloud-stream-a-brief-guide), but it uses EmitterProcessor, which is deprecated in favor of the Sinks class. It just feels wrong to me to have to dive down to the project reactor level to accomplish this, but I can’t think of a better way to do it natively in spring cloud stream.

hnazmatrix
@hnazmatrix:matrix.org
[m]
Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work though. My pods are restarting as the actuator health endpoint timesout. The actuator is working though (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.
hnazmatrix
@hnazmatrix:matrix.org
[m]
:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.
hnazmatrix
@hnazmatrix:matrix.org
[m]

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"
{"status":"DOWN","groups":["liveness","readiness"]}

hnazmatrix
@hnazmatrix:matrix.org
[m]

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"
`{"status":"DOWN","groups":["liveness","readiness"]}```

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"
`{"status":"DOWN","groups":["liveness","readiness"]}

      function:
        definition: processLpMaster
      bindings:
        processLpMaster-in-0:
          destination: PR741-HJ.LP_MASTER
      kafka:
        binder:
          healthTimeout : 5
        streams:
          binder:
            functions:
              processLpMaster:
                applicationId: lpLocation-lpMasterSync
            brokers: localhost:9092
            autoCreateTopics: false
            configuration:
              schema.registry.url: http://localhost:8083
              specific.avro.reader: true
              commit.interval.ms: 1000
              num.stream.threads: 1
          bindings:
            processLpMaster-in-0:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"
`{"status":"DOWN","groups":["liveness","readiness"]}

      function:
        definition: processLpMaster
      bindings:
        processLpMaster-in-0:
          destination: PR741-HJ.LP_MASTER
      kafka:
        binder:
          healthTimeout : 5
        streams:
          binder:
            functions:
              processLpMaster:
                applicationId: lpLocation-lpMasterSync
            brokers: localhost:9092
            autoCreateTopics: false
            configuration:
              schema.registry.url: http://localhost:8083
              specific.avro.reader: true
              commit.interval.ms: 1000
              num.stream.threads: 1
          bindings:
            processLpMaster-in-0:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

management:
metrics:
enable: # Enable/disable Spring Boot Actuator metrics
all: true
export:
stackdriver:
enabled: false
metric-target-project-id: ld-ff-operations-lower
batchSize: 10
project-id: ld-ff-operations-lower
reportingFrequencyInSeconds: 60
health:
binders:
enabled: false

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"
`{"status":"DOWN","groups":["liveness","readiness"]}

      function:
        definition: processLpMaster
      bindings:
        processLpMaster-in-0:
          destination: PR741
      kafka:
        binder:
          healthTimeout : 5
        streams:
          binder:
            functions:
              processLpMaster:
                applicationId: lpMasterSync
            brokers: localhost:9092
            autoCreateTopics: false
            configuration:
              schema.registry.url: http://localhost:8083
              specific.avro.reader: true
              commit.interval.ms: 1000
              num.stream.threads: 1
          bindings:
            processLpMaster-in-0:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
```
management:
  metrics:
    enable:  # Enable/disable Spring Boot Actuator metrics
      all: true
    export:
      stackdriver:
        enabled: false
        metric-target-project-id: ff-operations-lower
        batchSize: 10
        project-id: ld-ff-operations-lower
        reportingFrequencyInSeconds: 60
  health:
    binders:
      enabled: false
hnazmatrix
@hnazmatrix:matrix.org
[m]
Never mind, figured out - Google pubsub health check was timing out. Disabled that one.
FabianF
@FabianF92

Hi, I have a stream using kafka-streams-binder that sends data to a transformer which implements interface ValueTransformer:

    @Bean
    public Function<KStream<String, byte[]>, KStream<String, byte[]>> transformerStream11(){
        LOGGER.debug("Transformer stream #11 initialized");
        return stream -> stream.transformValues(() -> beanFactory.getBean(MyTransformer.class));
    }

I know that that is possible to send consumer/producer errors to a DLQ.
Is there also a possibility to define global error handling like DLQ for application errors
that are thrown inside the transformer class (MyTransformer.class)?

3 replies
hnazmatrix
@hnazmatrix:matrix.org
[m]

Hi I am running 3 kafka streaming pods (spring cloud kafka - functional) . I am seeing this strange behavior

  • where the consumers are getting kicked out of the consumer group at random.
  • It starts off well, then I see consumers groups are rebalancing and eventually all consumers are kicked out.

We have defaults for below

heartbeat.interval.ms = 3 secs
session.timeout.ms = 10 secs

I understand it could be because the brokers did not receive a heartbeat. But how can this happen all of a sudden (assuming network is not an issue here) ?

1 reply
The consumer side seems to have following exceptions
  • Error sending fetch request (sessionId=1445202357, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.
  • Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
  • Join group failed with org.apache.kafka.common.errors.UnknownMemberIdException: The coordinator is not aware of this member.”
Broker seems to have just this
  • Member lpLocation-dev-lpMasterSync-08f64051-8c08-44f3-8469-51654fe963d7-StreamThread-1-consumer-a371b1c5-5021-41a7-9d28-b904ca46a340 in group lpLocation-dev-lpMasterSync has failed, removing it from the group
Spring cloud : 2020.0.0-RC1
confluent kafka - 6.0
Kafka : 2.6
Kafka Streams java library : org.apache.kafka:kafka-streams:2.5.1
any idea why consumers are falling off ?
hnazmatrix
@hnazmatrix:matrix.org
[m]
thanks Sobychacko, I enabled debug logs on client side. Looks like the SSL handshake was failing intermittently - which was causing the heart beat to fail. Will post here if I find anything more.
Smark11
@Smark11
Anyone know if there is a way to force a producer flush on a Supplier<Flux>?
1 reply
Smark11
@Smark11
We need to make sure all of the messages have flushed to kafka previous to commiting to another stream
Vasanth B K
@nurturingmyself
Need help in stopping the particular binding - For ex - spring.cloud.stream.function.bindings.input-in-0=topic1, How to stop the function input from receiving messages from topic - topic1 through Rest API. Also need to know the way to start again.
desiderantes
@desiderantes:matrix.org
[m]
Hi, I'm trying to port a Kafka Streams app to Spring Cloud Streams. I have. TopicNameExtractor to choose a topic dynamically based on payload type. I rewrote the topology to have a Consumer<KStream<ProtoClass, ProtoClass>> that does stream -> stream.map(...).to(topicNameExtractor), but when I do that it ignores any producer property that I set (in particular, key and value serdes) and always defaults to the ByteArraySerde. How can I fix that?
Jeon ChangMin
@rafael81
Hi, Can I ask question about cdc-debezium in this room?
Smark11
@Smark11
Anyone receive an error when trying to deserialize to the S3EventNotification? "Could not read JSON: Cannot construct instance of com.amazonaws.services.s3.event.S3EventNotification"
(no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)