Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Nov 26 13:20
    olegz closed #2250
  • Nov 26 13:20
    olegz commented #2250
  • Nov 25 22:16
    oliverfuehrertsystems commented #2245
  • Nov 25 15:31
    oliverfuehrertsystems commented #2245
  • Nov 25 15:11
    olegz commented #2245
  • Nov 25 15:10

    olegz on 4.x

    GH-2245 StreamBridge partitioni… (compare)

  • Nov 25 15:10

    olegz on 3.1.x

    GH-2245 StreamBridge partitioni… (compare)

  • Nov 25 15:09

    olegz on main

    GH-2245 StreamBridge partitioni… (compare)

  • Nov 25 14:48

    olegz on 4.x

    Fix ApplicationJsonMessageMarsh… (compare)

  • Nov 25 14:47

    olegz on 3.1.x

    Fix ApplicationJsonMessageMarsh… (compare)

  • Nov 25 14:46

    olegz on main

    Fix ApplicationJsonMessageMarsh… (compare)

  • Nov 25 14:44

    olegz on 3.1.x

    Fix ApplicationJsonMessageMarsh… (compare)

  • Nov 25 13:13
    JoseLora commented #2090
  • Nov 24 18:16
    sobychacko assigned #2250
  • Nov 24 18:16
    sobychacko opened #2250
  • Nov 24 09:07
    oliverfuehrertsystems commented #2245
  • Nov 24 08:56
    oliverfuehrertsystems commented #2245
  • Nov 24 08:53
    oliverfuehrertsystems commented #2245
  • Nov 19 10:02
    olegz commented #2246
  • Nov 17 10:46
    olegz commented #2245
hnazmatrix
@hnazmatrix:matrix.org
[m]
Spring cloud : 2020.0.0-RC1
confluent kafka - 6.0
Kafka : 2.6
Kafka Streams java library : org.apache.kafka:kafka-streams:2.5.1
any idea why consumers are falling off ?
hnazmatrix
@hnazmatrix:matrix.org
[m]
thanks Sobychacko, I enabled debug logs on client side. Looks like the SSL handshake was failing intermittently - which was causing the heart beat to fail. Will post here if I find anything more.
Smark11
@Smark11
Anyone know if there is a way to force a producer flush on a Supplier<Flux>?
1 reply
Smark11
@Smark11
We need to make sure all of the messages have flushed to kafka previous to commiting to another stream
Vasanth B K
@nurturingmyself
Need help in stopping the particular binding - For ex - spring.cloud.stream.function.bindings.input-in-0=topic1, How to stop the function input from receiving messages from topic - topic1 through Rest API. Also need to know the way to start again.
desiderantes
@desiderantes:matrix.org
[m]
Hi, I'm trying to port a Kafka Streams app to Spring Cloud Streams. I have. TopicNameExtractor to choose a topic dynamically based on payload type. I rewrote the topology to have a Consumer<KStream<ProtoClass, ProtoClass>> that does stream -> stream.map(...).to(topicNameExtractor), but when I do that it ignores any producer property that I set (in particular, key and value serdes) and always defaults to the ByteArraySerde. How can I fix that?
Jeon ChangMin
@rafael81
Hi, Can I ask question about cdc-debezium in this room?
Smark11
@Smark11
Anyone receive an error when trying to deserialize to the S3EventNotification? "Could not read JSON: Cannot construct instance of com.amazonaws.services.s3.event.S3EventNotification"
(no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
danishgarg
@danishgarg

Hi. I am looking for an example to create a functional style processor using spring cloud stream kafka binder (without Kafka Streams) that can consume a batch of n messages from one topic and publish m messages to another topic (m < n). I have tried the following:

public Function<List<IncomingEvent>, List<Message<OutgoingEvent>>> aggregate() {
return ((batch) -> {
Map<Key, List<IncomingEvent>> resultsMap = batch.stream()
.collect(Collectors.groupingBy(result -> IncomingEvent::key));
List<Message<OutgoingEvent>> aggregationResults = new ArrayList<>();
for (Map.Entry<Key,List<IncomingEvent>> resultGroup : resultsMap.entrySet()) {
OutgoingEvent outgoingEvent = this.getOutgoingEvent(resultGroup.getKey(), resultGroup.getValue());
aggregationResults.add(
MessageBuilder.withPayload(outgoingEvent)
.setHeader(KafkaHeaders.MESSAGE_KEY, outgoingEvent.getKey())
.build()
);
}
return aggregationResults;
});
}
However, this produces a single event with an array of messages. I tried changing the return type of the function from List<Message> to Flux<Message> and then returning Flux.fromIterable(aggregationResults), and that seems to be publishing multiple messages, but the messages seem to be serialized instances of Flux with properties of scanAvailable and prefetch rather than the actual Message. I could not find any example of achieving this on the web. Would be very helpful to see such an example.

2 replies
blake-bauman
@blake-bauman
When using a Function<Flux, Flux> is there a way to assign one Flux to each partition in a Consumer? I've tried setting spring.cloud.stream.bindings.input.consumer.concurrency=N but it still seems to be a single Flux where the Consumer is assigned all partitions.
Dmitriy
@MoOFueL
Hi. Is there specific reason that SubscribableChannel's generated for StreamBridge output bindings always has unknown.channel.name as theirs fullChannelName's? This unknown.channel.name gets written into jaeger spans as message_bus.destination. Is it error in my configuration or is it accepted behavior?
akash-wadhwa
@akash-wadhwa
How can I disable SCStream based upon spring profile ?
Dmitriy
@MoOFueL

Hi. Is there specific reason that SubscribableChannel's generated for StreamBridge output bindings always has unknown.channel.name as theirs fullChannelName's? This unknown.channel.name gets written into jaeger spans as message_bus.destination. Is it error in my configuration or is it accepted behavior?

@olegz Can you help me with this?

Dhiraj singh
@dhirajsin
We received an assignment [profile-0] that doesn't match our current subscription Subscribe(profile.request); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
1 reply
Multiple Kafka Streams processors within a single application
trying to define - Multiple Kafka Streams processors within a single application
1 reply
realaseem
@realaseem
Hi, can anyone please let me know how can I override the default LogAndContinueExceptionHandler for deserialisation exceptions. I need to log and perform some operation before continuing. I am using spring cloud streams.
9 replies
Troy Taillefer
@tjtaill
Hi Anyone have a good example of fanning out from a single input to multiple outputs with different binders ? Guidance and hints on how to do this would also be greatly appreciated.
srikanth
@Srikanthkarkala_twitter
Hi, I'm using Spring cloud Stream for connecting to Kafka, i have not found any example for setting JASS config with "OAuthBearerLoginModule" to authenticate producer/consumer using service principal. any reference and suggestions please
3 replies
K solo
@mr_k_solo_twitter

Hello everyone, I’m having a particularly ‘nigglesome’ problem with interceptors I desperately need help with.
I’m using ProducerInterceptor and ConsumerInterceptor within a spring cloud streams application and have configured them via a StreamsBuilderFactoryConfig bean, they are firing as expected and I can retrieve some attributes of the Producer and Consumer Records, there is however a problem retrieving the actual ‘key’ and ‘value’ of these records. I’ve tried to assign/cast it to the expected value but I get a class cast exception. My initial instincts suggest this might be a serialisation/deserialisation issue. There isn’t however any indication of this elsewhere as the application works fine. The consumer binder has a default key/value serdes set as :

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde

Whilst. The producer binder has a key/value serdes set as :

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

Example output I get printing out the key/value from a Producer record is :

ProducerRecord(topic=.supplier.batch.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=[B@1089613d, value=[B@4a4e10fd, timestamp=1623572275792

Received Message timestamp =1623571603723, partition =3, offset = 92, key = [B@49e41727, value = [B

The following exception is thrown when trying to assign the key to a String instance :
Error executing interceptor onConsume callback
java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap’)

I’ve copied the ‘onConsume’ method of the consumer interceptor to provide a little more detail

@Override
public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> records) {
    log.info("---<< ProducerRecord being received {} >>---", records);
    for (ConsumerRecord<String, Object> record : records) {
        if (record.topic().equals(LOCATION_SUPPLIER_SOURCE)) {
            log.info("---<< setting timestamp for  >>---");
            String key = record.key();
            System.out.printf("Received Message: timestamp =%s, partition =%s, offset = %d, key = %s, value = %s\n",
                    record.timestamp(), record.partition(), record.offset(), record.key(), record.value().getClass().getName());
        }
    }
    return records;
}
26 replies
Soumendra Daas
@Sdaas

Hi, I am trying to figure out how to start off my Kafka Consumer ( using Spring Cloud String Kafka of course) at a custom offset value. I will be using MirrorMaker 2.0 to replicate the data to a secondary cluster. To start the application I will need to figure out the translated offset and then make the consumer "seek" to that offfset. Is there any way in SpringCloudStreamKafka to make the consumer "seek" to a particular offset at startup ?

I have two alternate approaches in mind (a) use determine the offset translation outside the applicaiton then use kafka-consumer-groups to set the offset, or (b) do this in KafkaRebalanceListener::onPartitionAssigned ( will need to throw in some logic to ensure that does not happen during every rebalance etc )

3 replies
michaelqi793
@michaelqi793
Is it possible for the middleware to do something after the application Consumer successfully consumes the message? It looks like Spring Cloud Stream allows middleware to provide an error handler. Is there a way to provide a success handler?
6 replies
ashish ranjan
@san_cse_twitter
This message was deleted
1 reply
David Riccitelli
@ziodave
hello, I have a processor backed by a Function. The function succeeds, but the its output is too large, yielding org.apache.kafka.common.errors.RecordTooLargeException: The message is 1292103 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
from the business perspective, it's fine for the message not to be delivered (I can't change the max size)
however, the Function stops working after the above error
future calls yield: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
is there a way to recover from this error?
2 replies
David Riccitelli
@ziodave
Would something like this help?
   @Bean
    Function<Flux<PojoIn>, Flux<PojoOut>> myProcessor(MyService myService {

        return flux -> flux.concatMap(myService::doSomething)
                .onErrorResume(t -> {
                    if (log.isErrorEnabled()) log.error("An error occurred: {}", t.getMessage(), t);
                    return Mono.empty();
                });
 }
David Riccitelli
@ziodave
nope it doesn't
the problem is that it looks that the function is then disconnected from the input topic, Failure was detected during execution of the reactive function

maybe this is relevant, is there a way to catch this?

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@6af5820a]; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1348377 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration., failedMessage=GenericMessage [payload=byte[1348200], headers={contentType=application/json, id=42341146-3252-2754-9d39-2d45572cfafa, timestamp=1624377510439}]

David Riccitelli
@ziodave
is @garyrussell around? 😇
Daniel Hall
@zheamourth_gitlab
Does anyone know off hand when support for compression was added for spring-cloud-stream-binder-kafka? I'm seeing documentation for it in 3.0.8 but not 3.0.7. Was 3.0.8 when it was added?
3 replies
AttitudeL
@AttitudeL

I have a stream application with 2 consumers, is there a way to configure one of the consumer "auto.offset.reset" to be the "earliest" and another one to be the "latest"? I'm using functional style binding.

Currently I configur "auto.offset.reset" as the following:

spring.cloud.stream:
  function:
    definition: foo
  kafka.binder:
    brokers: localhost:9092
    autoCreateTopics: false
    configuration:
      auto.offset.reset: earliest

However, this config affects all consumer instead of the one that I wanted in the binding function foo.

Can anyone please help? Thanks a lot.

27 replies
RikJansen81
@RikJansen81

Hello, to use some of the new features in kafka streams 2.4 (specifically KIP-307: Allow to define custom processor names with KStreams DSL), I changed the 'org.springframework.boot' version to 2.3.0.M2 and everything seems to work fine. I would like to name the source and sink processors so I don't have names like 'KSTREAM-SOURCE-0000000002' and 'KSTREAM-SINK-0000000013' but the kafka streams binder doesn't support it and it won't be available until Hoxton.SR5. Is there anyway I can name the source and sink topics so I don't get these autogenerated names?

I'm looking for the same configuration options. Is it possible to configure the function bindings so that we can actuallly name the source/sink processors?

8 replies
Moid Mohammad
@moid.mohammad:matrix.org
[m]
@sobychacko: Hello, I am getting this ClassCastException while working with Spring cloud stream
stack trace:
java.lang.ClassCastException: class com.sun.proxy.$Proxy83 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy83 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:91) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:202) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.8.jar:5.3.8]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
1 reply
Ankita Luthra
@ankitaluthra1
Hi, Can someone point to example or sample code which is using custom/single dlq for multiple input output binders. In our use case we have multiple source and sink(ibm mq, google pubsub and kafka), if some exception occurs during processing or deserialisation, after configured retries the error message for all binders should go to a single kafka topic (dlq), not to dlq of source binder. I am using functional style of spring cloud streams.
5 replies
Roussi Abdelghani
@roussi
Hi, Is there an example of producing a message (received from an endpoint) using the functional style ?
6 replies
Jeon ChangMin
@rafael81
Hi, Is there any reason the hdfs sink was deprecated in pre-packaged applications from https://dataflow.spring.io/docs/applications/pre-packaged/.
5 replies
r2lexs
@r2lexs
hello
can someone help?
I used AvroSchemaRegistryClientMessageConverter (without confluent)
with content-type: application/*+avro
and
consumer: batch-mode: true
but AvroSchemaRegistryClientMessageConverter not applied for payload with ArrayList of byte[]
does it exist any default mechanism for using Avro (without confluent) + batch mode ?
1 reply
Roussi Abdelghani
@roussi

Hi all,
I have multiple consumers that consumes from the same topic based on a particular header event_type. I've tried many configs but I didn't get it to work. I'm trying to not duplicating the binding configs since they share the same one

this is the configuration I'm using :

spring:
  cloud:
    stream:
      function:
        definition: consumerA;consumerB;consumerC
      bindings:
        default:
          destination: same_topic
          group: same_group
          consumer:
            ackEachRecord: true
            autoCommitOffset: true
            autoCommitOnError: false
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          configuration:
            auto.offset.reset: earliest
          headerMapperBeanName: kafkaHeaderMapper

with custom KAfkaHeaderMap :

@Configuration
public class KafkaHeaderMapperConfig {


    @Bean(name = "kafkaHeaderMapper")
    public KafkaHeaderMapper customHeaderMapper(){
        DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
        Map<String, Boolean> conf = new HashMap<>();
        conf.put("event_type", true);
        mapper.setRawMappedHeaders(conf);
        mapper.setCharset(StandardCharsets.UTF_8);
        return mapper;
    }

}

And this is my custom MessageRoutingCallback (which doesn't trigger once a message is pushed to the topic) :

public class CustomMessageRoutingCallback implements MessageRoutingCallback {

    public static final String EVENT_TYPE = "event_type";

    @Override
    public String functionDefinition(Message<?> message) {
        return Optional.of(message.getHeaders())
            .filter(headers -> headers.containsKey(EVENT_TYPE))
            .map(messageHeaders -> messageHeaders.get(EVENT_TYPE))
            .map(eventType -> EventTypeToBinding.valueOf((String)eventType))
            .map(EventTypeToBinding::getBinding)
            .orElseThrow(() -> new IllegalStateException("event_type was not recognized !! supported values are " + Arrays.toString(EventTypeToBinding.values())));
    }
}
@Getter
@RequiredArgsConstructor
public enum EventTypeToBinding {
    EVENT_A("consumerA-in-0"),
    EVENT_B("consumerB-in-0"),
    EVENT_C("consumerC-in-0");

    private final String binding;
}
`

And finally the declaration of my beans :

/**
 * Configuration class for all bindings used in the application
 */
@Configuration
public class MessageConfiguration {

    /**
     * {@link MessageRoutingCallback} for mapping to the appropriate
     * binding by checking the {@link CustomMessageRoutingCallback#EVENT_TYPE}
     * @return
     */
    @Bean
    public MessageRoutingCallback messageRoutingCallback() {
        return new CustomMessageRoutingCallback();
    }

    @Bean
    public Consumer<A> consumerA() {
        return a -> doSomething();
    }

    @Bean
    public Consumer<B> consumerB() {
        return b -> doSomething();
    }

    @Bean
    public Consumer<C> consumerC() {
        return c -> doSomething();
    }

I got confused by configuration properties, for example should I use spring.cloud.stream.bindings.<channelName>. or spring.cloud.stream.kafka.bindings.<channelName>. !

what am I missing ?

31 replies
yaboong
@yaboong

Hello guys :)

I added spring-cloud-starter-config, spring-cloud-starter-bootstrap to encrypt the key like this.

management.metrics.export.datadog.apiKey: '{cipher}2dc3840...'

But after adding the dependency and applying the {cipher} to my properties
spring.cloud.stream.kafka.streams.binder.functions.{input}.application-id property is not bound to KafkaStreamsConsumerProperties.

application-id property binding works perfectly if I don't use encryption supported by spring-cloud-starter-config, spring-cloud-starter-bootstrap dependency.

Any help would be appreciated.

4 replies
ranjith1235
@ranjith1235

Hello all,

I am using a spring cloud reactive consumer for receiving messages from rabbitmq using the following function (just a sample)

@Bean
    public Consumer<Flux<Message<SampleMessageDto>>> consumer() {
        return payload -> payload.map(Message::getPayload).log().subscribe();
    }

But the problem is if I am sending any data which does not match the SampleMessageDto, I am still getting the data as byte[] in the payload. How can I filter these messages out

1 reply
Jeon ChangMin
@rafael81
Hi, I have experienced "Connection Reset" issues in cdc-source-debezium.
So I posted this issues in debezium project.
They told me that the issues was resolved in latest version of debezium.
Currently, cdc-source-supplier depends on version of "1.3.1 Final" which is released in 2020-12.
But latest version is "1.6.0 Final".
Is there any plan to upgrade this version ?
https://github.com/spring-cloud/stream-applications/blob/main/functions/supplier/cdc-debezium-supplier/pom.xml
2 replies
matthoward
@matthoward

Is there any way to get a reference to the Serdes that SCS creates? we generally use String keys and Avro values but we have an aggregation that needs a complex Avro key... for our Materialized.as we need to specify the serde - so I was wondering if we can just inject it as a bean. I tried to add it as a param to my function bean but got an exception:

Parameter 1 of method processRevenue in com.axispoint.rytebox.revenue.ingestion.RevenueIngestionApplication required a single bean, but 2 were found:

- messageConverterDelegateSerde: defined by method 'messageConverterDelegateSerde' in class path resource [org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.class]
- compositeNonNativeSerde: defined by method 'compositeNonNativeSerde' in class path resource [org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.class]
3 replies
Igor Rudenko
@fac30ff
Hi there. I've got a problem, I need to pause consumer if s3 bucket contains any file.
So I use @PostContruct in my service where I check If s3bucket contain something I inject BindingsEndpoint and setState to Pause (I've log with this file and it present), but in real when I start application method with @StreamListener starting consume as if I don't pause anything
4 replies