Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 15:52

    olegz on main

    Minor doc fixes (compare)

  • 15:52
    olegz closed #2231
  • 14:09
    olegz closed #2233
  • 14:09

    olegz on main

    GH-2235 Fix partitioning suppor… Remove poller props in favor of… (compare)

  • 13:42

    olegz on 3.1.x

    GH-2235 clean up PartitionAware… (compare)

  • Oct 17 14:09
    bono007 commented #2230
  • Oct 16 19:24
    marcospds commented #2230
  • Oct 15 17:49

    olegz on 3.1.x

    Temporarily disable few tests (compare)

  • Oct 15 17:39

    olegz on 3.1.x

    GH-2235 Fix partitioning suppor… Update function dependency in P… (compare)

  • Oct 14 14:30
    sobychacko commented #2219
  • Oct 12 21:56
    bono007 commented #2230
  • Oct 12 19:32
    sobychacko commented #2230
  • Oct 12 19:31
    sobychacko commented #2230
  • Oct 12 14:54
    sabbyanandan assigned #2231
  • Oct 12 14:54
    sabbyanandan assigned #2233
  • Oct 10 19:37
    TheNotary commented #2024
  • Oct 10 10:13
    5aab commented #2234
  • Oct 10 03:36
    bono007 commented #2228
  • Oct 10 03:14
    bono007 commented #2230
  • Oct 07 20:32
    artembilan opened #2233
Knut Schleßelmann
@kschlesselmann
@garyrussell Seems that ackMode is now the preferred way to control manual ackknowledgements in kafka. What about actually acknowledging the messages? Is it still intended to use the message header like https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.2/reference/html/spring-cloud-stream-binder-kafka.html#_usage_examples ? At least the code mentions some AcknowledgingMessageListeneryou'd have to provide?
1 reply
Enrique Fernández-Polo Puertas
@enrique-fernandez-polo

Hello! I have a project in which I use spring-boot 2.4.5 and spring cloud 2020.0.2. and spring-cloud-starter-stream-rabbit 3.1.2. When the message is received in the consumer app I see this error: Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.tfg.sdapi.archiver.pojo.dto.ArchiverMessageDto ([B is in module java.base of loader 'bootstrap'; com.tfg.sdapi.archiver.pojo.dto.ArchiverMessageDto is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @1bc7681b)
I tried to search for it but I couldn't find any hints.

Same issue here! I am using Kotlin btw... The message works if I use StreamBridge

Enrique Fernández-Polo Puertas
@enrique-fernandez-polo
The content-type is always application/octet-streambut if I wrap the message in Message<MyClass> the content-type is application/json and it works. I don't get it, json is supposed to be the default one, isn't it? How can I force json serialization for every message?
ghilainm
@ghilainm

Hi guys, just a question, I have a GlobalChannelInterceptor with a pattern 'input-*'. I am using streamBridge to send a message to a dynamic destination 'aggregator-in'. The channel interceptor is applied which is not expected. Any idea why?

Digging into code it seems that the pattern is simply not applied :). But I probably overlooked something.

private void addInterceptors(AbstractMessageChannel messageChannel) {
        String[] interceptorNames = this.applicationContext.getBeanNamesForType(ChannelInterceptor.class);
        List<ChannelInterceptor> interceptors = messageChannel.getInterceptors();
        for (String interceptorName : interceptorNames) {
            ChannelInterceptor interceptor = this.applicationContext.getBean(interceptorName, ChannelInterceptor.class);
            if (!CollectionUtils.containsInstance(interceptors, interceptor)) {
                messageChannel.addInterceptor(interceptor);
            }
        }
    }
1 reply
jay
@legendjaks
I am using SCSt function for producing & consuming messages. I am facing a strange issue, where my consumers suddenly stopped working. Producers working fine (StreamBridge). When I change the consumer group of the consumers, then again it started working. Is there any scenario, why SCSt consumers will stop working? When this happens, the Kafka doesn’t show that consumers are connected and log says channel has unknown consumer
jay
@legendjaks
Whether kafka server and spring-kafka should be same? my kafka is 2.0 and spring kafka is 2.6
Eric Deandrea
@edeandrea

what i'm trying to do seems pretty simple but I can’t figure it out. I have one function (Supplier<Flux<Integer>>) which generates data on a 5 second interval & publishes that to a kafka topic. I have another function (currently a Consumer<Flux<Integer>>) that listens on the kafka topic and does some stuff (in my simple example just multiplies the number by some constant). This part I have working just fine.

What I'm struggling with is I want to take the result of that computation & serve it as a server-sent event through a Spring WebFlux controller. I dont want that listener function to publish it back to kafka - instead i'd prefer just to keep it in memory.

seems pretty simple but I can't seem to figure out that last part of it. I tried changing the consumer to be a Function<Flux<Integer>, Flux<Double>>, but I’m not sure how to “wire” that into my controller class so that my controller method can return Flux<ServerSentEvent<Double>>.

I came across this example (https://dzone.com/articles/spring-cloud-stream-a-brief-guide), but it uses EmitterProcessor, which is deprecated in favor of the Sinks class. It just feels wrong to me to have to dive down to the project reactor level to accomplish this, but I can’t think of a better way to do it natively in spring cloud stream.

hnazmatrix
@hnazmatrix:matrix.org
[m]
Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work though. My pods are restarting as the actuator health endpoint timesout. The actuator is working though (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.
hnazmatrix
@hnazmatrix:matrix.org
[m]
:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.
hnazmatrix
@hnazmatrix:matrix.org
[m]

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"
{"status":"DOWN","groups":["liveness","readiness"]}

hnazmatrix
@hnazmatrix:matrix.org
[m]

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"
`{"status":"DOWN","groups":["liveness","readiness"]}```

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"
`{"status":"DOWN","groups":["liveness","readiness"]}

      function:
        definition: processLpMaster
      bindings:
        processLpMaster-in-0:
          destination: PR741-HJ.LP_MASTER
      kafka:
        binder:
          healthTimeout : 5
        streams:
          binder:
            functions:
              processLpMaster:
                applicationId: lpLocation-lpMasterSync
            brokers: localhost:9092
            autoCreateTopics: false
            configuration:
              schema.registry.url: http://localhost:8083
              specific.avro.reader: true
              commit.interval.ms: 1000
              num.stream.threads: 1
          bindings:
            processLpMaster-in-0:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"
`{"status":"DOWN","groups":["liveness","readiness"]}

      function:
        definition: processLpMaster
      bindings:
        processLpMaster-in-0:
          destination: PR741-HJ.LP_MASTER
      kafka:
        binder:
          healthTimeout : 5
        streams:
          binder:
            functions:
              processLpMaster:
                applicationId: lpLocation-lpMasterSync
            brokers: localhost:9092
            autoCreateTopics: false
            configuration:
              schema.registry.url: http://localhost:8083
              specific.avro.reader: true
              commit.interval.ms: 1000
              num.stream.threads: 1
          bindings:
            processLpMaster-in-0:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

management:
metrics:
enable: # Enable/disable Spring Boot Actuator metrics
all: true
export:
stackdriver:
enabled: false
metric-target-project-id: ld-ff-operations-lower
batchSize: 10
project-id: ld-ff-operations-lower
reportingFrequencyInSeconds: 60
health:
binders:
enabled: false

:point_up: Edit: Hello, I am using Spring cloud stream kafka binders (functional style), the spring boot "actuator/health" endpoint seems to hang and never return. The kafka stream in my application is not critical and can live with the failure of the stream threads. I tried disabling using management.health.binders.enabled: false , this does not seem to work and my pods are restarting as the actuator health endpoint timesout. I confirmed the actuator is working (did a curl on /actuator/xyz - returns 404). Please suggest if there is any other setting to turn off.

PS: My org.apache.kafka.streams.processor.internals.StreamThread is running fine , still it takes about 10 mins to return a "DOWN"
`{"status":"DOWN","groups":["liveness","readiness"]}

      function:
        definition: processLpMaster
      bindings:
        processLpMaster-in-0:
          destination: PR741
      kafka:
        binder:
          healthTimeout : 5
        streams:
          binder:
            functions:
              processLpMaster:
                applicationId: lpMasterSync
            brokers: localhost:9092
            autoCreateTopics: false
            configuration:
              schema.registry.url: http://localhost:8083
              specific.avro.reader: true
              commit.interval.ms: 1000
              num.stream.threads: 1
          bindings:
            processLpMaster-in-0:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
```
management:
  metrics:
    enable:  # Enable/disable Spring Boot Actuator metrics
      all: true
    export:
      stackdriver:
        enabled: false
        metric-target-project-id: ff-operations-lower
        batchSize: 10
        project-id: ld-ff-operations-lower
        reportingFrequencyInSeconds: 60
  health:
    binders:
      enabled: false
hnazmatrix
@hnazmatrix:matrix.org
[m]
Never mind, figured out - Google pubsub health check was timing out. Disabled that one.
FabianF
@FabianF92

Hi, I have a stream using kafka-streams-binder that sends data to a transformer which implements interface ValueTransformer:

    @Bean
    public Function<KStream<String, byte[]>, KStream<String, byte[]>> transformerStream11(){
        LOGGER.debug("Transformer stream #11 initialized");
        return stream -> stream.transformValues(() -> beanFactory.getBean(MyTransformer.class));
    }

I know that that is possible to send consumer/producer errors to a DLQ.
Is there also a possibility to define global error handling like DLQ for application errors
that are thrown inside the transformer class (MyTransformer.class)?

3 replies
hnazmatrix
@hnazmatrix:matrix.org
[m]

Hi I am running 3 kafka streaming pods (spring cloud kafka - functional) . I am seeing this strange behavior

  • where the consumers are getting kicked out of the consumer group at random.
  • It starts off well, then I see consumers groups are rebalancing and eventually all consumers are kicked out.

We have defaults for below

heartbeat.interval.ms = 3 secs
session.timeout.ms = 10 secs

I understand it could be because the brokers did not receive a heartbeat. But how can this happen all of a sudden (assuming network is not an issue here) ?

1 reply
The consumer side seems to have following exceptions
  • Error sending fetch request (sessionId=1445202357, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.
  • Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
  • Join group failed with org.apache.kafka.common.errors.UnknownMemberIdException: The coordinator is not aware of this member.”
Broker seems to have just this
  • Member lpLocation-dev-lpMasterSync-08f64051-8c08-44f3-8469-51654fe963d7-StreamThread-1-consumer-a371b1c5-5021-41a7-9d28-b904ca46a340 in group lpLocation-dev-lpMasterSync has failed, removing it from the group
Spring cloud : 2020.0.0-RC1
confluent kafka - 6.0
Kafka : 2.6
Kafka Streams java library : org.apache.kafka:kafka-streams:2.5.1
any idea why consumers are falling off ?
hnazmatrix
@hnazmatrix:matrix.org
[m]
thanks Sobychacko, I enabled debug logs on client side. Looks like the SSL handshake was failing intermittently - which was causing the heart beat to fail. Will post here if I find anything more.
Smark11
@Smark11
Anyone know if there is a way to force a producer flush on a Supplier<Flux>?
1 reply
Smark11
@Smark11
We need to make sure all of the messages have flushed to kafka previous to commiting to another stream
Vasanth B K
@nurturingmyself
Need help in stopping the particular binding - For ex - spring.cloud.stream.function.bindings.input-in-0=topic1, How to stop the function input from receiving messages from topic - topic1 through Rest API. Also need to know the way to start again.
desiderantes
@desiderantes:matrix.org
[m]
Hi, I'm trying to port a Kafka Streams app to Spring Cloud Streams. I have. TopicNameExtractor to choose a topic dynamically based on payload type. I rewrote the topology to have a Consumer<KStream<ProtoClass, ProtoClass>> that does stream -> stream.map(...).to(topicNameExtractor), but when I do that it ignores any producer property that I set (in particular, key and value serdes) and always defaults to the ByteArraySerde. How can I fix that?
Jeon ChangMin
@rafael81
Hi, Can I ask question about cdc-debezium in this room?
Smark11
@Smark11
Anyone receive an error when trying to deserialize to the S3EventNotification? "Could not read JSON: Cannot construct instance of com.amazonaws.services.s3.event.S3EventNotification"
(no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
danishgarg
@danishgarg

Hi. I am looking for an example to create a functional style processor using spring cloud stream kafka binder (without Kafka Streams) that can consume a batch of n messages from one topic and publish m messages to another topic (m < n). I have tried the following:

public Function<List<IncomingEvent>, List<Message<OutgoingEvent>>> aggregate() {
return ((batch) -> {
Map<Key, List<IncomingEvent>> resultsMap = batch.stream()
.collect(Collectors.groupingBy(result -> IncomingEvent::key));
List<Message<OutgoingEvent>> aggregationResults = new ArrayList<>();
for (Map.Entry<Key,List<IncomingEvent>> resultGroup : resultsMap.entrySet()) {
OutgoingEvent outgoingEvent = this.getOutgoingEvent(resultGroup.getKey(), resultGroup.getValue());
aggregationResults.add(
MessageBuilder.withPayload(outgoingEvent)
.setHeader(KafkaHeaders.MESSAGE_KEY, outgoingEvent.getKey())
.build()
);
}
return aggregationResults;
});
}
However, this produces a single event with an array of messages. I tried changing the return type of the function from List<Message> to Flux<Message> and then returning Flux.fromIterable(aggregationResults), and that seems to be publishing multiple messages, but the messages seem to be serialized instances of Flux with properties of scanAvailable and prefetch rather than the actual Message. I could not find any example of achieving this on the web. Would be very helpful to see such an example.

2 replies
blake-bauman
@blake-bauman
When using a Function<Flux, Flux> is there a way to assign one Flux to each partition in a Consumer? I've tried setting spring.cloud.stream.bindings.input.consumer.concurrency=N but it still seems to be a single Flux where the Consumer is assigned all partitions.
Dmitriy
@MoOFueL
Hi. Is there specific reason that SubscribableChannel's generated for StreamBridge output bindings always has unknown.channel.name as theirs fullChannelName's? This unknown.channel.name gets written into jaeger spans as message_bus.destination. Is it error in my configuration or is it accepted behavior?
akash-wadhwa
@akash-wadhwa
How can I disable SCStream based upon spring profile ?
Dmitriy
@MoOFueL

Hi. Is there specific reason that SubscribableChannel's generated for StreamBridge output bindings always has unknown.channel.name as theirs fullChannelName's? This unknown.channel.name gets written into jaeger spans as message_bus.destination. Is it error in my configuration or is it accepted behavior?

@olegz Can you help me with this?

Dhiraj singh
@dhirajsin
We received an assignment [profile-0] that doesn't match our current subscription Subscribe(profile.request); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
1 reply
Multiple Kafka Streams processors within a single application
trying to define - Multiple Kafka Streams processors within a single application
1 reply
realaseem
@realaseem
Hi, can anyone please let me know how can I override the default LogAndContinueExceptionHandler for deserialisation exceptions. I need to log and perform some operation before continuing. I am using spring cloud streams.
9 replies
Troy Taillefer
@tjtaill
Hi Anyone have a good example of fanning out from a single input to multiple outputs with different binders ? Guidance and hints on how to do this would also be greatly appreciated.
srikanth
@Srikanthkarkala_twitter
Hi, I'm using Spring cloud Stream for connecting to Kafka, i have not found any example for setting JASS config with "OAuthBearerLoginModule" to authenticate producer/consumer using service principal. any reference and suggestions please
3 replies
K solo
@mr_k_solo_twitter

Hello everyone, I’m having a particularly ‘nigglesome’ problem with interceptors I desperately need help with.
I’m using ProducerInterceptor and ConsumerInterceptor within a spring cloud streams application and have configured them via a StreamsBuilderFactoryConfig bean, they are firing as expected and I can retrieve some attributes of the Producer and Consumer Records, there is however a problem retrieving the actual ‘key’ and ‘value’ of these records. I’ve tried to assign/cast it to the expected value but I get a class cast exception. My initial instincts suggest this might be a serialisation/deserialisation issue. There isn’t however any indication of this elsewhere as the application works fine. The consumer binder has a default key/value serdes set as :

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde

Whilst. The producer binder has a key/value serdes set as :

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

Example output I get printing out the key/value from a Producer record is :

ProducerRecord(topic=.supplier.batch.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=[B@1089613d, value=[B@4a4e10fd, timestamp=1623572275792

Received Message timestamp =1623571603723, partition =3, offset = 92, key = [B@49e41727, value = [B

The following exception is thrown when trying to assign the key to a String instance :
Error executing interceptor onConsume callback
java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap’)

I’ve copied the ‘onConsume’ method of the consumer interceptor to provide a little more detail

@Override
public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> records) {
    log.info("---<< ProducerRecord being received {} >>---", records);
    for (ConsumerRecord<String, Object> record : records) {
        if (record.topic().equals(LOCATION_SUPPLIER_SOURCE)) {
            log.info("---<< setting timestamp for  >>---");
            String key = record.key();
            System.out.printf("Received Message: timestamp =%s, partition =%s, offset = %d, key = %s, value = %s\n",
                    record.timestamp(), record.partition(), record.offset(), record.key(), record.value().getClass().getName());
        }
    }
    return records;
}
26 replies
Soumendra Daas
@Sdaas

Hi, I am trying to figure out how to start off my Kafka Consumer ( using Spring Cloud String Kafka of course) at a custom offset value. I will be using MirrorMaker 2.0 to replicate the data to a secondary cluster. To start the application I will need to figure out the translated offset and then make the consumer "seek" to that offfset. Is there any way in SpringCloudStreamKafka to make the consumer "seek" to a particular offset at startup ?

I have two alternate approaches in mind (a) use determine the offset translation outside the applicaiton then use kafka-consumer-groups to set the offset, or (b) do this in KafkaRebalanceListener::onPartitionAssigned ( will need to throw in some logic to ensure that does not happen during every rebalance etc )

3 replies
michaelqi793
@michaelqi793
Is it possible for the middleware to do something after the application Consumer successfully consumes the message? It looks like Spring Cloud Stream allows middleware to provide an error handler. Is there a way to provide a success handler?
6 replies
ashish ranjan
@san_cse_twitter
This message was deleted
1 reply
David Riccitelli
@ziodave
hello, I have a processor backed by a Function. The function succeeds, but the its output is too large, yielding org.apache.kafka.common.errors.RecordTooLargeException: The message is 1292103 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
from the business perspective, it's fine for the message not to be delivered (I can't change the max size)
however, the Function stops working after the above error
future calls yield: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
is there a way to recover from this error?
2 replies