Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 09:20
    CrazySabri commented #2188
  • Jun 22 19:23
    ziodave commented #2000
  • Jun 22 19:21
    ziodave commented #2000
  • Jun 22 14:47
    dturanski commented #2190
  • Jun 21 07:54
    CrazySabri commented #2188
  • Jun 17 21:10
    rafaelbrizola commented #2129
  • Jun 17 16:14
    dharezlak commented #2129
  • Jun 16 04:31
    rafaelbrizola commented #2129
  • Jun 15 21:40
    andrewchouman commented #2107
  • Jun 15 21:39
    andrewchouman commented #2107
  • Jun 15 21:35
    andrewchouman commented #2107
  • Jun 15 21:35
    andrewchouman commented #2107
  • Jun 15 10:46
    mkluzacek commented #2187
  • Jun 15 06:44
    mkluzacek commented #2187
  • Jun 15 06:43
    mkluzacek commented #2187
  • Jun 15 06:43
    mkluzacek commented #2187
  • Jun 14 16:50
    olegz commented #2187
  • Jun 11 14:44

    olegz on main

    StreamBridge and ChannelInterce… (compare)

  • Jun 11 14:44
    olegz closed #2186
  • Jun 10 23:40
    sobychacko synchronize #2186
Gary Russell
@garyrussell
Boot auto-configures a KafkaAdmin so you just need the topic bean (and spring.kafka.bootstrap-servers).
jay
@legendjaks

Correct; auto-provisioning of the dead letter topic is only done when using binder-level DLQ; you can simply add a NewTopic @Bean to provision the topic. See https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics

Great. Tried and it worked. Thanks

ms0205
@ms0205
I am getting "Backoff none exhausted for ConsumerRecord" from "org.springframework.kafka.listener.SeekToCurrentErrorHandler". I am guessing, this is happening after my application closes after an exception - ""message":"Failed to stop bean 'inputBindingLifecycle'". is there any way to clean these bindings or recover from this?
3 replies
I am using Kafka binder
parthashah
@Parthashah

Hi Team,

I am new to cloud stream, but documentation is great and easy to understand.
I am stuck at an issue, of handling errors in Kinesis-binder.

My sample project use kafka and kinesis (2.1.0) binders with Ilford depdendencies.
I am trying to extract <destination-channel>.<Group>.errors and errorChannel , with ServiceActivator help to push the data in DLQ

@ServiceActivator(inputChannel = "errorChannel")
 public Boolean onError(Message message) {

For Kafka failures I am receiving errors in errorChannel,
but kinesis binder is not publishing any message,
I can see in logs <destination-channel>.<Group>.errors also as default subscriber System Logger I can see error message in console, but not able to capture in function.

Can anybody point me to right direction here.!

4 replies
parthashah
@Parthashah
Console logs for reference
 ERROR [] [SimpleAsyncTaskExecutor-9] o.s.c.s.c.ApplicationJsonMessageMarshallingConverter  Failed to convert payload [B@18dc3602
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.example.model.Message` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123123123)
 at [Source: (byte[])"123123123"; line: 1, column: 1]

at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1455)
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1081)
    at com.fasterxml.jackson.databind.deser.ValueInstantiator.createFromInt(ValueInstantiator.java:262)
    at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromInt(StdValueInstantiator.java:356)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromNumber(BeanDeserializerBase.java:1359)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:178)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:166)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3563)
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.lambda$convertParameterizedType$0(ApplicationJsonMessageMarshallingConverter.java:151)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:166)
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:104)
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)
    at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.fromMessage(SmartCompositeMessageConverter.java:61)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputMessageIfNecessary(SimpleFunctionRegistry.java:1066)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:864)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:579)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:434)
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:79)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:717)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:559)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at
Sidney Lee
@sesidlee_twitter
hi, guys.
i was wondering if anyone could point me to some best practice or good example for configuring multiple event types for the same topic. we have a need where event ordering is important in two rather different use cases. our event types can contain a lot of fields so common avros or unions won't work (and we are moving away from confluent as platform though, if that is relevant).
  • any tips would be greatly appreciated.
40 replies
Igor Rudenko
@fac30ff
Hi, ladies and gentelmens.
I have a question is possible with functional style (spring cloud stream kafka streams) consume avro object (with acro serde) store in state store middleware object as json (with json serde) and then produce avro again? And if you know about this approach please help in thread answer :)
7 replies
Navid Ghahremani
@ghahramani
This message was deleted

Hey guys, I have a question, I am using reactive form of the new spring cloud stream function for my kafka and have a requirement to read the partition sequentially: it means, it should not consume multi message at the same time. I tried approach below:

spring.cloud.stream.binding.my-bind.consumer.concurrency: 1

I also tried to use

.subscribeOn(Schedulers.single())

but still none of them are working and it consume multi messages from same partition at the same time :(

Leonardo Torres
@leonardo_ta_twitter
Hello guys, good morning . I am looking for the way to use a different serializer for each binding and not declare a default one at binder level
Is it possible with ClientFactoryCustomizer ? , i want to try but don't find any example
i am using spring cloud stream kafka wiht native encoding / decoding
5 replies
jay
@legendjaks
I want to use Spring Cloud Stream function way in Reactive Kafka Producer. When I tried StreamBridge, got error that it is a blocking method. When I checked this (https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleProducer.java) example, this is using KafkaSender for producer. Any otherway other than using ReactiveKafkaProducerTemplate and KafkaSender for sending messages in reactive way?
4 replies
hnazmatrix
@hnazmatrix:matrix.org
[m]

Hi, I am consuming from a 3 partition topic using spring cloud stream kafka functional model. I am running the application on kubernetes. The moment I scale the application to 2 pods, the consumers on Confluent dashboard goes to 0. Any idea why this happens ? I tried increasing concurrency using spring.cloud.stream.instanceCount=3, but even with just one pod, I see just one thread on the application.

Can anyone explain what I am missing ?

spring:
  profiles:
    active: ${environment}
  application:
    name: lp-location
  cloud:
    gcp:
      project-id: operations-lower
    stream:
      function:
        definition: processMaster
      bindings:
        processMaster-in-0:
          destination: MASTER
      kafka:
        streams:
          binder:
            functions:
              processLpMaster:
                applicationId: Location-MasterSync
            brokers: localhost:9092
            autoCreateTopics: false
            configuration:
              schema.registry.url: http://localhost:8081
              specific.avro.reader: true
              commit.interval.ms: 1000
          bindings:
            processMaster-in-0:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
10 replies
Ali Yeganeh
@AliYeganeh
Hi
How can I intercept Spring Cloud Stream ?
2 replies
Andras Hatvani
@andrashatvani
hi, is there a way to automatically deserialize json/bytearray-serialized events into the according type? @sobychacko
7 replies
Ali Yeganeh
@AliYeganeh
Is it possible to log inbound messages from stream function?
5 replies
AttitudeL
@AttitudeL

I have some questions regarding manual ack on consumer. I'm implementing a retry mechanism which pauses the consumer for a certain duration based on the consumed payload's retry timestamp. I'm trying to disable the consumer's auto commit offset and only acknowledge the offset before I send the payload to retry. So basically if the payload is not yet ready for the retry I would not acknowledge the offset and immediately pause the consumer. After the consumer gets resumed it will be able to consume the same payload (since the offset did not advance) and check the timestamp to determine if the payload is ready for retry.

In my app setting I have the following config: spring.cloud.stream.kafka.bindings.retryProcessor-in-0.consumer.autoCommitOffset=false.
In the java code I have the following:

MessageHeaders headers = message.getHeaders();
ApiPayload apiPayload = (ApiPayload) message.getPayload();
Consumer<?, ?> consumer = headers.get(KafkaHeaders.CONSUMER, Consumer.class);
String topic = headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
Integer partitionId = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
if (apiPayload.getRetryTime() > system.currentTimeMillis()){
    consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
} else {
    if (acknowledgment != null) {
        acknowledgment.acknowledge();
    }
    // do the retry...
}

However when I was testing the code the offset seems to be AUTO committed. Because after the consumer is resumed it does not consume the message.

13 replies
jay
@legendjaks

By default SCSt Kafka uses ByteArraySerializer for both key & value. I am using the same for compatibility with existing code.

When I send message via StreamBridge producer & consumer works fine.

e.g.,

val message = MessageBuilder.withPayload(product)
                .setHeader("partitionKey", product.name)
                .build()
streamBridge.send(postProductBinding, message)

But when I try to use KafkaSender (https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleProducer.java), getting

Caused by: java.lang.ClassCastException: class [B cannot be cast to class c.p..dto.ProductDto ([B is in module java.base of loader 'bootstrap'; c.p..dto.ProductDto is in unnamed module of loader 'app')

Here is my producer configuration,

fun buildProducerProperties():  Map<String, Any> {

    val configs = mutableMapOf<String, Any>()
    configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = brokers
    configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
    configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
    return configs
}

@Bean
fun <T> kafkaSender(): KafkaSender<ByteArray, T> {

    val senderOptions: SenderOptions<ByteArray, T> = SenderOptions.create(buildProducerProperties())
    return KafkaSender.create(senderOptions)
}

val kafkaSender: KafkaSender<ByteArray, ByteArray>
val payload = toByteArray(product)
kafkaSender.send(Flux.just(payload).map { p -> SenderRecord.create(ProducerRecord<>("create_product", p), null) })
        .timeout(Duration.ofMinutes(1))
        .doOnError { throwable -> log.error("Error Sending", throwable) }
        .subscribe { res ->
            val meta = res.recordMetadata()
            log.info(meta.topic() + " " + meta.offset())
        }
21 replies
jay
@legendjaks
Instead of configuring Spring Cloud Stream Bindings in properties file as spring.cloud.stream.bindings, is there anyway to configure this programatically ? like using @Bean ?
Multiple services will produce to one topic and only one service will consume. So if I can configure in code, I can re-use the code
jay
@legendjaks
In imperative way SeekToCurrentErrorHandler can be used for handling errors in Kafka. Is there anything available for Reactive kafka consumers?
jay
@legendjaks

In imperative way SeekToCurrentErrorHandler can be used for handling errors in Kafka. Is there anything available for Reactive kafka consumers?

Got it. Its out of scope of SCSt. spring-cloud/spring-cloud-stream#1922

Vladimir Kabanov
@vkabanorv
Hi all! Sorry if this is a noob question. How/where can I get spring-cloud-stream-binder-kafka-streams version 3.1.3-SNAPSHOT for use in my Java project? The most recent version on maven is 3.1.2. I need 3.1.3 because I'm running into this issue https://stackoverflow.com/questions/66881210/spring-cloud-stream-kafka-streams-binder-kafkaexception-could-not-start-stream
5 replies
Sidney Lee
@sesidlee_twitter
hi, guys. does anyone know how to configure the internal repartition/changelog topics in spring?
https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#internal-topic-parameters
we are defining value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy to enable multiple event types within one of our topics, but this does not seem to be applied to the internal repartition/changelog topics associated with that topic... any suggestions?
6 replies
Ali Yeganeh
@AliYeganeh
This message was deleted
2 replies
jokefaker
@jokefaker

Hi,I’m using Spring Boot 2.3.5 and Spring Cloud Hoxton.SR8
here is my Spring Cloud Stream Kafka config

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          kafka-input:
            consumer:
              enableDlq: true
              dlqName: error.dlq
              dlqPartitions: 1
              dlqProducerProperties:
                topic:
                  properties:
                    retention.ms: 86500000
                  replication-factor: 2
      bindings:
        kafka-input:
          partitionCount: 2
          group: test

I have three problems/questions:

  1. the partitionCount property not work when auto create topics, it is a input bindings, and it will work for a output bindings. is it a correct situation?
  2. the dlqProducerProperties not work when auto create topics, the topic properties and replication-factor is not used .

  3. the dlqPartitions is not documented in reference guide , but I find the props in the code, should I use this props to control the dlq topic partition?

4 replies
Sahil Chaddha
@Sahil333
Hi All, I have multiple event types on a given topic but I am only interested in a subset of those event types. I have a single consumer group but multiple @StreamListener for each event type. But this leads to offset lags because of events I am not interested in which doesn't represent the actual lags for my consumer group. This is because I have auto-commit set to false. So, as a solution, I am thinking of having a default stream listener which is picked in the last when no filter matches and this listener will just only commit the offset manually. This way my lags for a consumer group will reflect the actual values.
Can someone suggest me how to do this with SCSt?
Kevin Lewis
@kevinrlewis
Hello, I am running into an issue using Spring Cloud Streams to publish to Kafka. Error is: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule. I get this when I build through mvn and then run the jar. Running this within my IDE, Intellij, works fine. I'm guessing it is a dependency versioning issue. I'm using spring-cloud-dependencies 2020.0.1 and spring-boot-starter-parent 2.4.5. Any help would be appreciated! Thank you.
2 replies
AttitudeL
@AttitudeL

Dear developers. I noticed that when I use nack() I see [Consumer clientId=consumer-anonymous.00b11061-91b5-49cd-b5de-1497631e25c8-26, groupId=anonymous.00b11061-91b5-49cd-b5de-1497631e25c8] in the log, somehow a groupId with anonymous prefix was created. I wonder if this is intentional? Also when I run integration tests I see a bunch of

2021-04-20 15:04:32 - [Consumer clientId=consumer-anonymous.00b11061-91b5-49cd-b5de-1497631e25c8-26, groupId=anonymous.00b11061-91b5-49cd-b5de-1497631e25c8] Connection to node -1 (/127.0.0.1:63714) could not be established. Broker may not be available.
2021-04-20 15:04:32 - [Consumer clientId=consumer-anonymous.00b11061-91b5-49cd-b5de-1497631e25c8-26, groupId=anonymous.00b11061-91b5-49cd-b5de-1497631e25c8] Bootstrap broker 127.0.0.1:63714 (id: -1 rack: null) disconnected

in the logs. There aren't any test failures but I wonder if I did something wrong?

I'm using 2.3.10.RELEASE for spring-boot and 3.0.11.RELEASE for spring-cloud-stream.

Gary Russell
@garyrussell
You get an anonymous group if the consumer binding has no group property.
AttitudeL
@AttitudeL
oh ok

@garyrussell thanks adding group now removes the anonymous my apology I thought the applicationId would override the group in my case.

However I still have the Connection to node -1 (/127.0.0.1:64891) could not be established. Broker may not be available. at the end of log after running the test. Are bindings supposed to drop the consumers at the end?

AttitudeL
@AttitudeL
Maybe I should manually disconnect the consumer in the @AfterEach clause in my test?
Gary Russell
@garyrussell
If you are creating your own consumers in your tests then, yes, you should close them at the end of the test.
AttitudeL
@AttitudeL
No this consumer group was created via binding. The consumers I created in my test have all been disconnected after each test
spring.cloud.stream:
  bindings:
    retry-processor-in-0:
      group: retry-group
      destination: retry_topic
      consumer:
        maxAttempts: 1
@garyrussell This is how I created the consumer group
Gary Russell
@garyrussell
Something strange then; the binding should be stopped by the ApplicationContext before it destroys the EmbeddedKafkaBroker. That said, it's just log noise - as you said, the tests all pass.
AttitudeL
@AttitudeL
Ok I will take a closer look in the log
Thanks
AttitudeL
@AttitudeL
@garyrussell strange...after I reverted version upgrade for spring-cloud-stream from 3.0.11.RELEASE to 3.0.7.RELEASE, these noises are gone.
So most likely something new introduced between the releases? I can't tell for sure but the noises did disappear from the log
AttitudeL
@AttitudeL
From the log I discovered that after the shutdown somehow a new consumer is created from that group and because the broker has already been shutdown it will not be able to connect to the broker. I don't know why there would be a new consumer created...
AttitudeL
@AttitudeL
Seems 3.0.9.RELEASE made the difference, I tried 3.0.8 with no noise, but 3.0.9 all in a sudden the binding seems to be creating a new consumer after the EmbeddedKakfaBroker shuts down which results in not able to connect.
Gary Russell
@garyrussell
The binding shouldn't start again. If you can provide a small, simple, conplete, example that exhibits the behavior, attach it to a new issue against spring-cloud-stream on GitHub.
matthoward
@matthoward
I'm struggling to get a set of streams to work together (with the new functional model)- I have 2 separate Consumer<Kstream> writing to KTables, and then I'm trying to do a join on those KTables with a BiFunction<KTable, KTable, KStream> ... I think I'm not defining the bindings correctly for the joining processor - the join never receives the changes written to the source KTables unless I set the destination to the actual changelog topic name... for KTable inputs, should I set the bindings destination to the name of the KTable or something totally different? I have the consumer.materializedAs set to the KTable name, but without the destination name it just points to some empty new topic it creates for the binding instead of the changelog
13 replies
AttitudeL
@AttitudeL

The binding shouldn't start again. If you can provide a small, simple, conplete, example that exhibits the behavior, attach it to a new issue against spring-cloud-stream on GitHub.

Thanks I will try to extract that code out and create a simple complete project.

Leonardo Torres
@leonardo_ta_twitter
Hello , i am looking the way to configure transactions but only for certain producer , when i use it :
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: , all my producers in my microservice become transactional
6 replies
Leonardo Torres
@leonardo_ta_twitter
Hi, regarding this code in class : org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner . Using spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar '
public void afterPropertiesSet() {
if (this.metadataRetryOperations == null) {
RetryTemplate retryTemplate = new RetryTemplate();
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(10);
        retryTemplate.setRetryPolicy(simpleRetryPolicy);

        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(100);
        backOffPolicy.setMultiplier(2);
        backOffPolicy.setMaxInterval(1000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        this.metadataRetryOperations = retryTemplate;
    }
}'   I need to set a RetryTemplate in startup with specific values, how can i get this ?  Something like a customizer for KafkaTopicProvisioner.
3 replies