Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Aug 23 18:48
    sabareeshkkanan commented #815
  • Aug 23 16:04
    fangjian0423 commented #1792
  • Aug 23 15:57
    sabbyanandan assigned #1792
  • Aug 23 15:57
    sabbyanandan opened #1792
  • Aug 23 00:01
    felipearomani commented #1789
  • Aug 22 19:43
    sobychacko commented #1789
  • Aug 22 18:44

    olegz on master

    add test for implicit consumer … (compare)

  • Aug 22 14:28
    ahmed-ali225 commented #1787
  • Aug 22 14:28
    ahmed-ali225 commented #1787
  • Aug 22 12:34
    ahmed-ali225 commented #1787
  • Aug 22 11:53
    olegz commented #1787
  • Aug 22 11:39
    olegz commented #796
  • Aug 22 11:39
    olegz commented #796
  • Aug 22 11:34
    olegz review_requested #1788
  • Aug 22 11:34
    olegz assigned #1788
  • Aug 22 02:27
    ashankabiswas commented #796
  • Aug 22 01:42
    philwebb opened #1788
  • Aug 19 23:19
    sobychacko assigned #1786
  • Aug 19 23:19
    sobychacko opened #1786
  • Aug 19 22:58
    sobychacko commented #1760
mmelsen
@mmelsen
@sobychacko would be nice, thanks
Devender Gollapally
@devender

Hi All, Apologies if this not the right forum

We are using spring-cloud-stream:1.2.2.RELEASE along with spring-cloud-starter-stream=kafka:1.2.1.RELEASE

The issue we are facing is sometimes our consumer simply stops receiving messages, (FYI we don't use reactor or ASYNC) our code is as simple as this

@StreamListener(ImageChannels.INPUT)
public void handle(Map<String, Object> map) {
    Event event = Event.fromMap(map);
    log.trace("Received Event {}", event);

    if (imageFilters.test(event)) {
        governorService.handleEvent(event);
    }
}

Upon restarting the service, it resumes receiving messages. We also don't see any exceptions when this happens. I read somewhere that if the consumer does not send back an ACK then no new messages are delivered, is there a way to check that ? or a way to always send ACK on receiving a message ?

Any suggestions on how to debug this is appreciated

Gary Russell
@garyrussell

The acks are sent by default so I don't think that's the problem. Many times, problems like this are caused by threads being "stuck" somewhere in user code. Next time it happens take a thread dump to see what the container threads are doing.

That said, 1.2.x is very old; it is based on spring-kafka 1.1.x which is no longer supported. I would recommend upgrading to at least spring-cloud-stream 1.3.x and override the spring-kafka version to 1.3.9. This version uses a much simpler threading model thanks to KIP-62. With older versions, we had to have 2 threads, one to keep the consumer "alive" and one to call the listener. If the listener gets behind, the consumer thread pause()s the consumer and resume()s it when the listener catches up. Pause/resume allows us to keep the consumer alive without retrieving more records on each poll().

I have seen problems in the past where the resume() did not take effect and resulted in the behavior you see. spring-kafka 1.3.x gets rid of this complexity because the kafka-clients now keeps the consumer "alive" on its own, with a background heartbeat mechanism.

It would be even better to upgrade to spring-cloud-stream 2.2.x, to get modern versions of the full stack.
Devender Gollapally
@devender
Thank you much appreciated
Jeff Maxwell
@jmax01
@sabbyanandan Hey sorry about the delay, I gave my 2 weeks notice to my current employer so my priorities have shifted a slightly. I should be able to roll back on it in the next couple days.
Sabby Anandan
@sabbyanandan
Thank you for the update, @jmax01. Good luck with the new role! :trophy: Looking forward to the continued collaboration.
Jose A. Iñigo
@codependent
Hi there! I am trying 3.0.0 branch of the scs kafka project and have found a couple of problems, one in the M2 version and another one in the current BUILD-SNAPSHOT. Being (the 3.0.0) currently in development should I report these problems in github or just wait for something more stable? BTW, could you tell me the current roadmap of this branch? Next scheduled milestone, RCs...
Soby Chacko
@sobychacko
Hi @codependent Can you please share your observations here? If they turn out to be issues, we can then create corresponding GH issues.
As to the roadmap, RC should happen before the Spring One conference (Oct) and targeting GA after that. There will be couple of milestones between now and the first RC. (at least M3 and M4)
Jose A. Iñigo
@codependent

Hi @sobychacko sure, here we go: using Spring Boot 2.0.0.M4/BUILD-SNAPSHOT and scs 3.0.0.M2 I have set upspring.cloud.stream.kafka.streams.binder.brokers=someURL. I end up having this

WARN log: Connection to node -1 (localhost/127.0.0.1:9092) could not be established

I've seen that in KafkaStreamsBinderSupportAutoConfiguration.streamConfigGlobalProperties() the KafkaStreamsConfiguration kafkaStreamsConfigurationparameter contains a localhost:9092 value for bootstrap-servers that comes from the default Spring boot inicialization of org.springframework.boot.autoconfigure.kafka.KafkaProperties, so it is not overriden by the binder config (if (ObjectUtils.isEmpty(properties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))) {). To work around this I had to add spring.kafka.bootstrap-servers=someURL

The second problem applies using the same project with 3.0.0.BUILD-SNAPSHOT:
Caused by: java.lang.IllegalArgumentException: 'avroSchemaServiceManager' cannot be null
    at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.0.BUILD-SNAPSHOT.jar:5.2.0.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.<init>(AvroSchemaRegistryClientMessageConverter.java:171) ~[spring-cloud-stream-schema-3.0.0.BUILD-SNAPSHOT.jar:3.0.0.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.schema.avro.AvroMessageConverterAutoConfiguration.avroSchemaMessageConverter(AvroMessageConverterAutoConfiguration.java:63) ~[spring-cloud-stream-schema-3.0.0.BUILD-SNAPSHOT.jar:3.0.0.BUILD-SNAPSHOT]
Jose A. Iñigo
@codependent
In AvroMessageConverterAutoConfiguration.avroSchemaMessageConverter() the autowired of the following properties is null (this worked in 3.0.0.M2):
    @Autowired
    private AvroMessageConverterProperties avroMessageConverterProperties;
    @Autowired
    private AvroSchemaServiceManager avroSchemaServiceManager;
Soby Chacko
@sobychacko
@codependent I will have a look.
Jose A. Iñigo
@codependent
Thank you so much!!
Soby Chacko
@sobychacko
@codependent The first problem you are describing is a bug that we already addressed on the snapshot. If you pull kafka streams binder 3.0.0 snapshot, it should work.
With the second problem, I still need to investigate.
Jose A. Iñigo
@codependent
@sobychacko thanks for the update. I've refreshed the SNAPSHOT deps but can't really go on since the java.lang.IllegalArgumentExceptionproblem is still there. I'll wait 'till you can verify it. Just let me know if you need me to provide some more detail to reproduce it.
Soby Chacko
@sobychacko
sorry i haven’t gotten to that yet as I was in the middle of a few other issues. Feel free to provide more details. I will get to it asap.
Jose A. Iñigo
@codependent
No worries. I'll see if it also happens with the kotlin e2e sample since the actual project in which is showing up is similar (nativeEncoding/decoding, Avro, schemaregistry and so on...)
Soby Chacko
@sobychacko
ok
Jose A. Iñigo
@codependent
Confirmed, the ShippingServiceApplication from the samples crashes with the same exception. I understand you must be very busy so, no worries there's no hurry for a solution for this ;)
jPrest
@jPrest

Hi, I have a question regarding stateful message processing using the cloud function across multiple messages. To me it seems, it is impossible to do something like because every message becomes its own flux.

    @Bean
    public Consumer<Flux<Message<?>>> sink() {
        return (input) -> input
            .buffer(Duration.ofSeconds(10))
            .doOnNext(System.out::println);
    }

Am I right with this assumption?

Oleg Zhurakousky
@olegz
Yes, and while we are in the process of simplifying it with Processor (to avoid Flux creation per Message), one must still understand that until the actual binder and the underlying API (e.g., Kafka, Rabbit etc) is natively reactive, the above is more of an API sugar for those who prefer to reactive programming model . . .
that said, there are some major changes in the process that should address at least some of these concerns
does that help?
@jPrest ^^
ghilainm
@ghilainm
Hi guys, small question, are the header filtered by SCS by default to only allow some headers? I have added a custom header in my message but it does not show up in Kafka.
jPrest
@jPrest

Yes, thanks a lot for the input.

The ability to reason about a stream of messages over time is a key feature for cloud stream in my opinion.
This would put cloud stream in a good spot, where full blown streaming solutions like flink are not needed, because spring cloud stream is capable enough to handle some complex scenarios.
And for me this was what spring-cloud-reactive promised.
So if the stream of messages in cloud function is not a modeled as a single Flux, imo we lose most of the advantages brought by the reactive API right now.

To make the binders reactive, couldn't we put a Flux Consumer above a PollableConsumer, which would act as a bridge?

I think the is a major concern, so I'm thinking about opening an issue for this topic. Do you think this will be the right approach?

@olegz
Knut Schleßelmann
@kschlesselmann
Is there currently a way to pull records from AWS kinesis (with https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis) at a dynamic pace? e.g. my service has processed 100 records so it's ok to pull 100 more?
Darren Rambaud
@xyzst
Is anybody here familiar with implementing a binder or are there any guides?
It's not clear to me how a binder connects to an actual messaging system/broker. I tried navigating the class tree of a binder implementation (RabbitMQ) but I just got lost in the abstraction.
Oleg Zhurakousky
@olegz
@xyzst We are planning to issue a binder implementation guide. That said; you can look at our test binder which uses Spring Integration as messaging system. We use it for internal testing but the binder itself is as any other binder and is probably the simplest implementation - https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/test/TestChannelBinder.java
You can also look at other binders Kafka, Rabbit as well as community maintained binders that you can see on our main page - https://spring.io/projects/spring-cloud-stream
Knut Schleßelmann
@kschlesselmann
Is there currently a way to configure Spring Cloud Stream (especially using the kafka binder) in a reactive service so that it only processes a specified amount of messages at once? I connect to a mongo db to process my data and SCS is overwhelming my service …
Darren Rambaud
@xyzst
@olegz thanks for the reply, I will take a look at the testbinder. Looks simple enough compared to the other binders. But it’s still not clear where the connection to the messaging broker system is managed
ishivesh
@ishivesh
Hi, I have a small question. Is there any way to set the Kafka producer "useTopicHeader" to true from application properties in SCDF application.
for reference:- "useTopicHeader" is used to override the topic name using Kafka header ´KafkaHeaders.TOPIC´.
Oleg Zhurakousky
@olegz
@kschlesselmann Please look at spring-cloud/spring-cloud-stream#1742 and see if that is what you're looking for
Soby Chacko
@sobychacko
@xyzst There is a provisioner sub system thats part of the binder infrastructure. It comes as a provisioning API that each binder can implement. Thats where the core implementation specific middleware communication is handled.
Darren Rambaud
@xyzst
Is it better to deploy two scs apps to support bidirectional messaging? (Eg, A —> B and B —> A) or just have one scs app?
Knut Schleßelmann
@kschlesselmann
@olegz Hm … I'm not sure what this feature will achieve? It looks like I just retrieve batches from my stream. Where will this enable my to throttle my application?
René S
@reneschroeder0000
@kschlesselmann @olegz I would also be interested in this. Was thinking of creating a Flux by polling the PollableMessageSource n times and putting the results in a new flux. Any idea on how to achieve this?
fcoayuso
@fcoayuso
Hi, is it possible to delete consumed messages from Spring cloud processor once the message has been processed?
René S
@reneschroeder0000
@kschlesselmann @olegz this is my first approach to process batches from PollableMessageSource. Doesn't feel quite right though. Any thoughts?
@Component
@EnableBinding(PolledConsumer::class)
class ReactiveConsumer(
        val service: DummyService,
        val consumer: PolledConsumer
) : ApplicationRunner {
    private val logger = KotlinLogging.logger { }

    override fun run(args: ApplicationArguments) {
        Flux.create<List<DummyElement>> { sink ->
            sink.onRequest {
                logger.info { "requested $it elements" }
                val elements: MutableList<DummyElement> = mutableListOf()

                for (i in (0..it.toInt())) {
                    consumer.destIn().poll({
                        val element: DummyElement = it.payload as DummyElement
                        elements.add(element)
                    }, object : ParameterizedTypeReference<DummyElement>() {})
                }
                sink.next(elements)
            }
        }
                .subscribe(BackpressureSubscriber(service))
    }

    private class BackpressureSubscriber(val service: DummyService) : BaseSubscriber<List<DummyElement>>() {

        override fun hookOnNext(elements: List<DummyElement>) {
            elements.toFlux()
                    .flatMap { service.process(it) }
                    .blockLast()
            request(10)
        }

        override fun hookOnSubscribe(subscription: Subscription) {
            request(10)
        }
    }
}

interface PolledConsumer {
    @Input("input")
    fun destIn(): PollableMessageSource
}
Jakub Kubryński
@jkubrynski
Hi! In the cloud-stream I can turn off the exchange declaration by setting declareExchange to false. However, still the RabbitAdmin.initialize tries to declare the exchanges. Any ideas of how can I really turn off auto-declarations of exchanges?
Tim
@tim-klug
Hi! I discovered a strange behavior after updating our services to the latest String Starter release. We use the messaging system from Spring to serialize and deserialize payloads send as Kafka messages. In our tests we use a producer that sends a message to a topic where some stream topology kicks in and routes the message according to our business rules. When I consume from a topic to check if the desired events had happened during the test, I got an deserialization exception. The cause was that the message type was not in the trusted packages. The strange thing is, that the class, that was not found is the one from the producer payload. So it looks like, that the message type is placed in the header by the by the producer and than forwarded all the way through the pipeline to a consumer. Is this behavior expected? Should I remove the content type from the header at the producer? THX
Felipe Romani
@felipearomani

Hi! I'm using Cloud Stream with Kafka Streams, and i've got a question about KTable. When my binder is a KStream type the spring cloud stream parse my JSON payload (value) to my Object automatic, but when i'm use KTable as input binder occurr an error that say "java.lang.ClassCastException: class java.lang.String cannot be cast to class br.com.wspot.accounting.streamprocessor.models.RawAccounting", bellow follow my code and my application.properties:

## This example works great!

@EnableBinding({Stream.AccountingProcessor.class })
public class Stream {

    @StreamListener
    public void process(@Input(AccountingProcessor.INPUT_ACCT) KStream<String, RawAccounting> rawAccountingKStream) {

        rawAccountingKStream.foreach((key, value) -> System.out.println(value));
    }

    public interface AccountingProcessor {
        String INPUT_ACCT = "inputAcct";

        @Input(AccountingProcessor.INPUT_ACCT)
        KStream<String, RawAccounting> rawAcct();
    }
}
## This example throw the follow exception: java.lang.ClassCastException: class java.lang.String cannot be cast to class br.com.wspot.accounting.streamprocessor.models.RawAccounting

@EnableBinding({Stream.AccountingProcessor.class })
public class Stream {

    @StreamListener
    public void process(@Input(AccountingProcessor.INPUT_ACCT) KTable<String, RawAccounting> rawAccountingKStream) {

        rawAccountingKStream.toStream().foreach((key, value) -> System.out.println(value));
    }

    public interface AccountingProcessor {
        String INPUT_ACCT = "inputAcct";

        @Input(AccountingProcessor.INPUT_ACCT)
        KTable<String, RawAccounting> rawAcct();
    }
}
## My application.properties

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.mms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde

spring.cloud.stream.bindings.inputAcct.destination=wspot.accounting.raw
spring.cloud.stream.bindings.inputAcct.content-type=application/json
spring.cloud.stream.kafka.streams.bindings.inputAcct.consumer.application-id=wspot-acct-raw-processor-id

spring.cloud.stream.kafka.binder.brokers=localhost:9092

maybe i missed some concept, is the first time i'm use Spring cloud Stream.

Soby Chacko
@sobychacko
@felipearomani I think i answered it here: spring-cloud/spring-cloud-stream#1789
Felipe Romani
@felipearomani
@sobychacko yes, thank you a lot! solve my problem!