Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 21:52
    ziodave commented #2056
  • 21:47
    ziodave commented #2056
  • 20:05
    olegz commented #2056
  • 19:32
    dmsilver2 commented #2056
  • 15:14
    olegz commented #2056
  • 14:58
    lgraf commented #2056
  • 14:56
    lgraf commented #2056
  • 09:49
    RobertoHuang commented #2058
  • 08:18
    olegz closed #2036
  • 08:18

    olegz on master

    Minor typo input` vs input (compare)

  • 08:18
    olegz closed #2057
  • 08:16

    olegz on master

    GH-2054 Add test to validate re… (compare)

  • 08:11

    olegz on 3.0.x

    GH-2054 Add test to validate re… (compare)

  • 08:09
    olegz commented #2054
  • 04:50
    hemeda3 opened #2057
  • Nov 24 22:05
    dmsilver2 commented #2054
  • Nov 24 21:10
    dmsilver2 commented #2056
  • Nov 24 15:25
    0x006EA1E5 commented #2036
  • Nov 23 20:57
    garyrussell commented #2036
  • Nov 23 19:36

    olegz on master

    Update spring-cloud-stream.adoc (compare)

Paolo Contessi
@conteit
Hi @olegz and all the Spring Cloud Stream team, I’m trying to implement a system were clients connect to a service that forwards messages to a kafka topic whenever a client connects, provides data or disconnects. There is another kafka-streams service that listens to the previous topic and creates a table with clients availability and provided data summary (per client). When the former service shuts down I’m able to gracefully terminate the connection to the clients, but I haven't the time to publish to the kafka topic that the client has been disconnected because FluxMessageChannel gets immediately disposed. Is there anyway to delay the producer disposal or am I wrong in approaching availability in this manner? Thank you for your time
Sahil Chaddha
@Sahil333
Hi, I want to change the name of the thread used for @StreamListener methods as the current name is too long in the logs. I looked in the documentation but couldn't get any info. Is there a way?
ashevade1
@ashevade1
Hi I am new to this, so please excuse if this is a basic question. Is it possible to write a spring application which would abstract out which streaming system it using. For e.g the same code for KAfka should also work for Kineses . How can we achieve that kind of abstraction. Any help or pointers are appreciated.
Nagarajan Selvaraj
@iam-nagarajan

Hi, I am using spring cloud stream with kafka binder. One of our application is bundled into fat jar using maven shade plugin. Also we are injecting BindingsEndpoint bean in our class to start a specific binding after a delay from the application startup. We were using Hoxton.RELEASE and now upgraded to Hoxton.SR8. After this, when the application is bundled using shade plugin, we are getting following error during startup.

The following candidates were found but could not be injected:

        - Bean method 'bindingsEndpoint' in 'BindingsEndpointAutoConfiguration' not loaded because @ConditionalOnAvailableEndpoint no property management.endpoint.bindings.enabled found so using user defined default from management.endpoints.enabled-by-default; @ConditionalOnAvailableEndpoint no 'management.endpoints' property marked it as exposed

I have also tried setting management.endpoint.bindings.enabled to true, it results in following error

The following candidates were found but could not be injected:

    - Bean method 'bindingsEndpoint' in 'BindingsEndpointAutoConfiguration' not loaded because @ConditionalOnAvailableEndpoint found property management.endpoint.bindings.enabled with value true; @ConditionalOnAvailableEndpoint no 'management.endpoints' property marked it as exposed

Following is the actuator configuration

management:
  endpoints:
    enabled-by-default: true
    web:
      exposure:
        include: info, health
Marcus Lyth
@twitz

Hi,

I am trying to use the Spring Cloud Stream with Azure and can't seem to read anything from the queue. I can connect without any problem and I can send messages to the queue without problems. Whenever I read a message I get

error occurred during processing message in 'MethodInvokingMessageProcessor'

and

nested exception is org.springframework.messaging.converter.MessageConversionException: Could Not Convert Input, failedMessage=GenericMessage [payload=byte[423], headers={id=795f6447-9eab-3afb-9897-3aefdac0d444, raw_id=ID:ad6172e6-cb75-440f-965d-f081a8740a46:1:1:1-1, contentType=application/x-java-serialized-object, timestamp=1602616117542}]

All this works fine using a regular JmsTemplate and I can't figure out what I am doing wrong.

The message that is being read is from another application that published using a JmsTemplate
Marcus Lyth
@twitz
I think I solved this, I feel stupid but if I understand this correctly the JmsTemplate is publishing JMS messages using some other wire protocol whilst the Spring Cloud Stream Binder for Azure is publishing AMQP messages using Azures definition of the wire protocol which is why they don't understand eachother
Marcus Lyth
@twitz
So using a converter for the JmsTemplate to use a message type that the binder understands seems to do the trick
sonneybouy
@sonneybouy
Hi, I am currently exploring the possibility of being able to start our application even if some of our AWS dependencies are down ( we use DynamoDB and Kinesis Streams). Currently, without the connection, I can see the kinesisBinderHealthIndicator, among other beans defined in the KinesisBinderConfiguration failed to instantiate and that causes the application to fall over. Is there a way around this? Currently we are using springBootVersion = "2.2.5.RELEASE" & springCloudKinesisBinderVersion = "2.0.0.RELEASE" if that helps
Juan Antonio Breña Moral
@jabrena
Good morning, I am reviewing the samples: https://github.com/spring-cloud/spring-cloud-stream-samples but I don´t find an example about error handling using the function approach. Any orientation?
8 replies
Marcello Teodori
@mteodori
Hello is it also possible to provision a virtual host in RabbitMQ via API using the binder or only topic exchanges? Or is there any support on any of the RabbitMQ related dependencies to call the management HTTP API to do that? thank you!
2 replies
natraj09
@natraj09
Hi Spring Cloud Team ,
Started noticing this error in production. We can adjust the max.poll.interval.ms as suggested in the error message. We have a health check , the binder state is still "UP". Should the binder state be DOWN in this case?
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) ~[spring-kafka-2.5.5.RELEASE.jar!/:2.5.5.RELEASE]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.5.RELEASE.jar!/:2.5.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1265) ~[spring-kafka-2.5.5.RELEASE.jar!/:2.5.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1022) ~[spring-kafka-2.5.5.RELEASE.jar!/:2.5.5.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1213) ~[kafka-clients-2.5.1.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1140) ~[kafka-clients-2.5.1.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096) ~[kafka-clients-2.5.1.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076) ~[kafka-clients-2.5.1.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-2.5.1.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-2.5.1.jar!/:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
1 reply
ivkrishnai
@ivkrishnai

Hi All, I have made simple streaming application, that creates state store for a topic as ktable and updating table continuously. i would like to query with key and get the latest value . the following is the sample code, props.put(StreamsConfig.APPLICATION_ID_CONFIG, inputProps.get("application.id"));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, inputProps.get("bootstrap.servers"));

         final StreamsBuilder builder = new StreamsBuilder();
    KeyValueBytesStoreSupplier stateStore = Stores.inMemoryKeyValueStore(storeName);

KTable<String, KafkaEventDto> table = builder.table(
topicName,
Materialized.<String, KafkaEventDto>as(stateStore)
.withKeySerde(Serdes.String())
.withValueSerde(countryMessageSerde)
);

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);

    final StoreQueryService storeQueryService = new StoreQueryService(streams, storeName);

    Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));

    streams.start();              and another class,           public class StoreQueryService {

private  static KafkaStreams streams;
private  static String storeName;

public StoreQueryService( KafkaStreams streams,  String storeName) {
    StoreQueryService.streams = streams;
    StoreQueryService.storeName = storeName;
}


public static KafkaEventDto getValueByKey(String key) {

    final ReadOnlyKeyValueStore<String, KafkaEventDto> store = streams.store(storeName, QueryableStoreTypes.keyValueStore());
    if (store == null) {
        throw new AppException("The state store is null");
    }

    final KafkaEventDto value = store.get(key);

    System.out.println("key : "+ key);
    System.out.println("value : "+ value);
return value;
}

And i wanted to convert/make as using spring libraries. i didn't find relevant samples and not able to get clues to start. could please suggest any ways for this( any templates/ samples). Thanks in advance.!

2 replies
Sunil Yadav
@aiodsunil
@aiod-sunil
Hi All ,do we have protobuf support with Kafka in cloud stream?
i am trying to use it
Gonzalo Borobio
@gborobio73

Hello! I’m sure if this is the right place to ask this, so appologies if not. I trying to test a stream processor using TopologyTestDriver from org.apache.kafka.streams and I get the error:

The class  is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

I have tried all possible answers I found in stackoverflow and many othe places without no success.
I create the TopologyTestDriver with these props

props[JsonDeserializer.TRUSTED_PACKAGES] = “*"
props[spring.kafka.consumer.properties.spring.json.trusted.packages] = “*"

I’m using Kotling btw. Thanks a lot for you help!

Gonzalo Borobio
@gborobio73

Oh :point_up: I’m using spring cloud stream and my processor looks like this:

    @Bean
    fun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {
        return Function { input-> input.map { key, value ->
            println("\nPAYLOAD KEY: ${key.name}\n");
            println("\nPAYLOAD value: ${value.address}\n");
            val output = FooAddressPlus()
            output.address = value.address
            output.name = value.name
            output.plus = "$value.name-$value.address"
            KeyValue(key, output)
        }}
    }

Thanks a lot for the help!

Gonzalo Borobio
@gborobio73
Please note :point_up: that I have a Stream Processor, not a consumer/producer, thus I cannot set properties like spring.kafka.consumer.properties.spring.json.trusted.packages, nor define my own consumer/producer factories AFAIK. Thanks a lot!
1 reply
Yashodhan Ghadge
@codexetreme_gitlab

Hello all, I am trying to configure our kafka to use spring.cloud.stream.kafka.binder.jaas.loginModule: org.apache.kafka.common.security.scram.ScramLoginModule however I keep getting the error that

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

is there some mistake in my config?

1 reply
the jaas config is simply not picked up from the above config
1 reply
Viacheslav Petriaiev
@Viacheslav-Petriaiev
4 replies
Sidney Lee
@sesidlee_twitter

hi, guys! need a little help configuring spring.cloud.stream.kafka.streams.binder.configuration.application.server for interactive queries in kubernetes. I saw an example of setting this up as follows:

spec:
  containers:
    - env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP

and then to access the POD_IP as an environment variable, using a placeholder in the application.yaml, i.e.
spring.cloud.stream.kafka.streams.binder.configuration.application.server: ${POD_IP}

so my question is, is this the correct approach?

Ashith Raghunath
@dranzerashi_gitlab

Hi, I have a requirement where within same application I have to bind to the same destination multiple time but with different consumer-group so that each stream listener can receive the same message from the topic individually. but I dont see any documentation on whether this is possible.

bindings:
        binding1:
          destination: some-topic
          group: g1
          concurrency: 3
          contentType: application/*+avro
        binding2:
          destination: some-topic
          group: g2
          concurrency: 3
          contentType: application/*+avro
        binding3:
          destination: some-topic
          group: g3
          concurrency: 3
          contentType: application/*+avro

The bindings are as above. I need stream listeners to listen to each group seperately. Is there any way to achieve this

4 replies
FlxGschlr
@FlxGschlr

Hello, I've got the following problem. When upgrading to Hoxton.SR7 executing integration tests in an aws environment takes much more time (~1s -> ~30s for the exception). See the following logs:

SR6 (~1s):

09:49:32      2020-10-30 08:49:32.201  INFO [,,,] 896 --- [    Test worker] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: integration
09:49:32      2020-10-30 08:49:32.278  WARN [,,,] 896 --- [    Test worker] com.amazonaws.util.EC2MetadataUtils      : Unable to retrieve the requested metadata (/latest/meta-data/public-keys). The requested metadata is not found at http://169.254.169.254/latest/meta-data/public-keys
09:49:32  
09:49:32      com.amazonaws.SdkClientException: The requested metadata is not found at http://169.254.169.254/latest/meta-data/public-keys
09:49:32          at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:122) ~[aws-java-sdk-core-1.11.415.jar:na]
09:49:32          at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:82) ~[aws-java-sdk-core-1.11.415.jar:na]
09:49:32          at com.amazonaws.util.EC2MetadataUtils.getItems(EC2MetadataUtils.java:400) ~[aws-java-sdk-core-1.11.415.jar:na]
09:49:32          at com.amazonaws.util.EC2MetadataUtils.getData(EC2MetadataUtils.java:369) ~[aws-java-sdk-core-1.11.415.jar:na]
09:49:32          at com.amazonaws.util.EC2MetadataUtils.getData(EC2MetadataUtils.java:365) ~[aws-java-sdk-core-1.11.415.jar:na]

SR7 (~30s):

16:47:57      2020-10-29 15:47:57.784  INFO [,,,] 1065 --- [    Test worker] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: integration
16:48:00      2020-10-29 15:47:59.837  WARN [,,,] 1065 --- [    Test worker] com.amazonaws.util.EC2MetadataUtils      : Unable to retrieve the requested metadata (/latest/meta-data/public-keys). The requested metadata is not found at http://169.254.169.254/latest/meta-data/public-keys
16:48:00  
16:48:00      com.amazonaws.SdkClientException: The requested metadata is not found at http://169.254.169.254/latest/meta-data/public-keys
16:48:00          at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:89) ~[aws-java-sdk-core-1.11.792.jar:na]
16:48:00          at com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:70) ~[aws-java-sdk-core-1.11.792.jar:na]
16:48:00          at com.amazonaws.internal.InstanceMetadataServiceResourceFetcher.readResource(InstanceMetadataServiceResourceFetcher.java:75) ~[aws-java-sdk-core-1.11.792.jar:na]
16:48:00          at com.amazonaws.internal.EC2ResourceFetcher.readResource(EC2ResourceFetcher.java:66) ~[aws-java-sdk-core-1.11.792.jar:na]
16:48:00          at com.amazonaws.util.EC2MetadataUtils.getItems(EC2MetadataUtils.java:402) ~[aws-java-sdk-core-1.11.792.jar:na]
16:48:00          at com.amazonaws.util.EC2MetadataUtils.getData(EC2MetadataUtils.java:371) ~[aws-java-sdk-core-1.11.792.jar:na]
16:48:00          at com.amazonaws.util.EC2MetadataUtils.getData(EC2MetadataUtils.java:367) ~[aws-java-sdk-core-1.11.792.jar:na]

Any ideas?

rbrose
@rbrose
Hi
1 reply
If i start junit tests i get this error: Caused by: java.lang.NoClassDefFoundError: org/springframework/cloud/stream/binding/AbstractBindableProxyFactory
in pom i have
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
          <version>2.1.0.RELEASE</version>
        </dependency>
Andras Hatvani
@andrashatvani
hi, @olegz @garyrussell is spring-cloud-stream(-binder-kafka-streams) compatible/recommended to use with spring boot 2.4?
and does it work with KoFu (spring kotlin dsl) so that the streams will be declared in this declarative way?
Navid Ghahremani
@ghahramani
Hey guys, I am really stuck on testing spring cloud stream in functional mode. I have read the documentation and the sample that mentioned there is a binder but without network activity, also it does not respect any annotation as you start your application via SpringApplicationBuilder class, I want to test my kafka Function<Flux<String>, Flux<Int>> binder via TestContainer, it was much easier when I was using annotation based. Is there a documentation to show how can I achieve this?
1 reply
Knut Schleßelmann
@kschlesselmann
Hi! Currently we use the metric spring.integration.send.count to monitor our functions but it seems that it's missing since we updated from Spring Boot 2.3.3 to 2.3.5. Is this some known change, a bug or something weird on our side?
3 replies
abdullahly
@abdullahly

Hi, I'm using the Spring Cloud kafka binder (Hoxton.SR7)

Q1- How do I enable logContainerConfig to confirm settings are being applied ?

the docs indicate that we can use "spring.cloud.stream.default.<property>=<value>" to avoid repetition for consumer defaults. However when I set these the IDE doesn't detect them as existing fields, I'm guessing because it's passed to a map.

spring.cloud.stream.default.ackEachRecord=true
spring.cloud.stream.default.autoCommitOffset=true

Q2 - is this the correct config for setting the default consumer group for all consumers in an app?

spring.cloud.stream.bindings.default.group=my-group

I've been getting weird results, sometimes consumers are getting assigned to an anonymous group . At first I thought it was only happening if I had this default group set AND at least 1 consumer with a specific group defined, but it's been a bit unpredictable, I need to do a bit more testing.

Gary Russell
@garyrussell
  1. Add a ListenerContainerCustomizer @Bean

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
     return (container, destinationName, group) -> {
         container.getContainerProperties().setLogContainerConfig(true);
     };
    }
  2. For kafka-specific defaults, it's s.c.s.kafka.bindings.default...

I am not sure why you are seeing those weird results, but is not advisable to use the same group for multiple listeners - a rebalance on any topic will cause a rebalance on all.

abdullahly
@abdullahly

Thanks for the quick response @garyrussell !

Re: using the same group, even if they are running in the same app/container? Just confirming, thanks for pointing it out

Gary Russell
@garyrussell
Yes; even in the same app - as far as Kafka is concerned they are all discrete consumers; if they are all in the same group (even when listening to different topics) a rebalance on one will cause a rebalance on all.
abdullahly
@abdullahly
Thanks for the insight!
Knut Schleßelmann
@kschlesselmann
Hm … looks like our build time went up a fair bit (~3min to ~15 min) only on our CI system after migrating to something > Hoxton.SR6. On our local systems we do not see this … anyone an idea what could've happened? We're using the kafka-binder in production and the test-binder in our tests. On CI the builds run inside a docker container. Looks like there are ~30s delays between tests that we do not see on our local machines. Maybe some kind of network timeout? I'm writing here because everything was fine in one of services (Spring Boot 2.3.5 and Hoxton.SR8) until we attached it to kafka :-/
1 reply
blake-bauman
@blake-bauman
If I have a Kafka topic with 5 partitions and I want to process each partition in parallel, do I need to bring up 5 JVM instances? Or is there a way to have SCSt use 5 Consumers in one instance?
1 reply
ajohnson-ventera
@ajohnson-ventera

Hi, I'm looking for some help with spring-cloud-stream-kinesis-binder. When I use it directly against AWS it works fine, but when I try to use either localstack or kinesalite for local development, it throws an unmarshalling error. If anyone has any insight into why it would be doing that, I would really appreciate it.

result when using localstack

2020-11-09 13:15:53.864  INFO 26400 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=LATEST, sequenceNumber='null', timestamp=null, stream='test-kinesis-stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2020-11-09 13:16:22.106  INFO 26400 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception com.amazonaws.SdkClientException: Unable to execute HTTP request: Current token (VALUE_STRING) not VALUE_EMBEDDED_OBJECT, can not access as binary
 at [Source: (com.amazonaws.event.ResponseProgressInputStream); line: -1, column: 589] during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=LATEST, sequenceNumber='null', timestamp=null, stream='test-kinesis-stream', shard='shardId-000000000000', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.

more information about the configuration

application.yml

  cloud:
    stream:
      kinesis:
        binder:
          checkpoint:
            table: spring-service-kinesis-checkpoint
          locks:
            table: spring-service-kinesis-locks
      bindings:
        input:
          destination: test-kinesis-stream

AwsConfig.java (localhost:4566 is the localstack service)

@Configuration
public class AwsConfig {
  @Autowired
  public AWSCredentialsProvider awsCredentialsProvider;

  @Bean
  public AmazonKinesisAsync amazonKinesis() {
    return AmazonKinesisAsyncClientBuilder.standard().withCredentials(this.awsCredentialsProvider)
        .withEndpointConfiguration(new EndpointConfiguration("http://localhost:4566", "us-east-1")).build();
  }
}

SpringApplication.java

@SpringBootApplication
@EnableBinding(Sink.class)
public class SpringApplication {
  public static void main(final String[] args) {
    SpringApplication.run(SpringApplication.class, args);
  }

  @StreamListener("input")
  public void input(String foo) {
    System.out.println("Hello: " + foo);
  }
}
rbrose
@rbrose
Hi
I get this error on startup:
url bootstrap-servers is ignored and kafka used default localhost:9092
6 replies
maheshrajamanikurup
@maheshrajamanikurup
Hi , I am looking for some help with making InteractiveQuery feature of kafka working with spring kafka binder when we have multiple instance running . I learned that if we setup 'spring.cloud.stream.kafka.streams.binder.configuration.application.server' property with instance host and port it should work. But with cloud deployment , PCF and auto scaling , I am not sure how to set it up.
1 reply
Andras Hatvani
@andrashatvani
hi, any news regarding spring boot 2.4-compatibility?
4 replies
samragu
@samragu

Hi, would like some help on sending avro messages through a functional supplier

    @Bean
    public Supplier<Sensor> supplier() {
        Random random = new Random();
        return () -> Sensor.newBuilder()
            .setId("id")
            .setTemperature(random.nextFloat())
            .setAcceleration(random.nextFloat())
            .setVelocity(random.nextFloat())
            .build();
    }

and the configuration has

      bindings:
        sensor:
          destination: sensor-exchange
          group: sensor-queue
          content-type: application/*+avro

This is not sending the message as Avro, instead as JSON. What do I configure for the supplier to send the message as Avro?

6 replies
Nipun Arora
@nipunarora

Hi Folks, I am trying a transformer in spring cloud stream to consume a json and generate an avro schema record. Can you give me an idea as to what is the right way to right this?
After following all the documentation I wrote the following transformer

  @Bean
  public Function<String, GenericRecord> transformMaestroDirectSearchRequest() {
    return value -> {
      try {
        GenericRecord genericRecord = TransformJsonToAvro.convertJsonToAvro(
            value, transformJsonToAvro.getMaestroDirectSearchSchema());
        return genericRecord;
      } catch (Exception e){
        e.printStackTrace();
        return null;
      }
    };
  }

But I keep getting this error when I run the integration test

 nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an enum:
  @Test
  public void maestroSearchRequestTransformTest() throws IOException, URISyntaxException {

    GenericMessage<byte[]> inputMessage =
        new GenericMessage<>(Utils.getJsonFile("Maestro_direct_Req.json").getBytes());

    this.input.send(inputMessage);
    Message<byte[]> receive = this.output.receive();
    System.out.println(receive.getPayload());
  }

Any ideas on how I can resolve this?

14 replies
samragu
@samragu
Joe Pardi
@joepardi
How do I launch a spring cloud data flow task and pass an argument to my boot application that contains spaces? When I use task launch my-task --arguments "--spring.profiles.active=local --app.username=alpha beta gamma", it sets it to alpha only.
Nipun Arora
@nipunarora

:wave: Hi folks, how can I specify in my configuration the concurrency for a consumer… for instance if I want multiple instance of a consumer to work in parallel on a topic, such that they asynchronusly consume the messages

My current configuration is as follows -

  cloud:
    stream:
      default: 
         producer: 
          useNativeEncoding: true
         consumer:  
          useNativeEncoding: true
      function:
        definition: receive;produce
      bindings:
        receive-in-0:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
        produce-out-0:
          destination: employee-details
          content-type: application/*+avro
3 replies
Alessandro Sivieri
@sivieri
Hi everyone! I am currently using Spring Cloud Stream 3.0.2, and I would like to start using avro and the schema registry, but I cannot find a reliable (and especially working) source to understand how to specify the schema registry URL...
schema.registry.url seems to be necessary (otherwise the application blocks immediately), but I don't know how to specify it once for all the producers (only my output topics are in avro).
Guides usually mention having to enable the registry using @EnableSchemaRegistryClient, but it is not clear to me if it is needed, if it needs to have a function producing the SchemaRegistryClient bean or it is enough to specify a property with the URL (and which one, there are at least a couple possibilities in the documentation around), and how to make it work in unit tests (a URL starting with mock:// should work? or do I need a test config producing an instance of MockSchemaRegistryClient? I tried the former and it seems to work, but then I miss the schema.registry.url properties, and the latter then conflicts with what I assume is being created by the annotation...).
Thank you for your time!
5 replies
rbrose
@rbrose
Hi, one simple Question, what is the difference between Kafka and Kafka Streams?
1 reply