Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Daniel Zou
    @dzou
    Maybe... ;)
    Thanks for the contact.
    Oskar Olsson
    @olssonoskar

    Hello everyone!

    I'm having some issues enabling Stackdriver Trace in a project that is using FirestoreReactiveRepository.
    Having tracing enabled with spring-cloud-gcp-starter-trace and doing something basic like fetching a document with a specific id will result in:

    'org.springframework.cloud.gcp.data.firestore.FirestoreDataException: Error while reading entries by id; nested exception is io.grpc.StatusRuntimeException: UNIMPLEMENTED: GRPC target method can't be resolved.'

    Is this not currently supported?

    dmitry-s
    @dmitry-s
    Hi @olssonoskar !
    could you submit an issue on our github tracker? Also, please include some sample code that replicates the issue.
    Thanks
    Oskar Olsson
    @olssonoskar
    Will do, thanks @dmitry-s
    Volkan Yazıcı
    @vy
    Hello! Trying to use spring-integration in combination with spring-cloud-gcp to handle messages in batches. For this purposes, AggregatingMessageHandler expects me to provide a CorrelationStrategy, of which I don't have any for Pub/Sub. Receiving messages in batches in random order is perfectly fine by me. How shall I pass a CorrelationStrategy in this case?
    Artem Bilan
    @artembilan
    I think there is no a "message group" concept in Pub/Sub at all
    so, you are on your own to determine whatever "key" could be used as a group identifier
    and that one is going to be for CorrelationStrategy against every incoming message.
    Volkan Yazıcı
    @vy

    @artembilan I've come up with this one:

    object SpringConstantCorrelationStrategy: CorrelationStrategy {
    
        override fun getCorrelationKey(message: Message<*>?): Any = "constant"
    
    }

    Though I am not sure if it'll do what it is expected to do.

    Artem Bilan
    @artembilan
    It does
    but you are going to have only one group for all the message
    and you are going to process all the released group instances only in a single thread
    Volkan Yazıcı
    @vy
    @artembilan What do you mean? I won't be able to receive messages in batches?
    Artem Bilan
    @artembilan
    Please, read more about an aggregator in Spring Integration docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator
    Volkan Yazıcı
    @vy

    and you are going to process all the released group instances only in a single thread

    In that case, shall I return a random String from a set of size concurrency, e.g., available CPU count?

    Artem Bilan
    @artembilan
    The CorrelationStrategy brings us a hook how to determine which group the current message belongs
    when a ReleaseStrategy is satisfied, the aggregator releases a whole group with messages arrived to it so far
    I somehow believe that there could be some business key in all your Pub/Sub messages to distinguish their group in your app
    Volkan Yazıcı
    @vy
    Is it possible to receive messages in batches in pubSubTemplate#subscribe()?
    Artem Bilan
    @artembilan
    Well, we have just discussed with you how to do that using an Aggregator from Spring Integration the PubSubInboundChannelAdapter is fully based on that pubSubTemplate#subscribe()
    You probably need to think about a pullAndAck() otherwise:
    Volkan Yazıcı
    @vy
    I was wondering if I can do without Spring Integration.
    Artem Bilan
    @artembilan
    /**
         * Pull and auto-acknowledge a number of messages from a Google Cloud Pub/Sub subscription.
         * @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
         * subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
         * @param maxMessages the maximum number of pulled messages. If this value is null then
         * up to Integer.MAX_VALUE messages will be requested.
         * @param returnImmediately returns immediately even if subscription doesn't contain enough
         * messages to satisfy {@code maxMessages}
         * @return the list of received messages
         */
        List<PubsubMessage> pullAndAck(String subscription, Integer maxMessages, Boolean returnImmediately);
    Or similar
    The subscribe is not batch'able
    it is a listener and reacts to the incoming message immediately
    with pull you are on your own
    you make a decision when and how to call this method
    We probably can think about batch support for the PubSubMessageSource
    but that's not what is going to be possible with the subscribe semantics
    Volkan Yazıcı
    @vy
    To tell the truth, I am working on a feature for a project that already uses Spring Integration for processing Pub/Sub messages. They had never considered batching. Now I am trying to get it done... I am so frustrated that I dunno whether I shall stick to Spring Integration or roll out my own ExecutorService-backed PubSubTemplate#pullAndAck() wrapper. This is not something new for me. My previous frustrations with Spring's Pub/Sub integration has led me to develop reactor-pubsub. reactor-pubsub's experience is like a fresh breeze compared to all this madness. Don't get me wrong, I am not pointing any fingers. Spring Integration is a huge project and trying to do everything that is possible to do. Though this generalist approach comes with the cost of sometimes missing very fundamental and simple needs.
    That said, I am very thankful for your prompt and elaborate replies @artembilan.
    Artem Bilan
    @artembilan
    Well, essentially it is not Spring Integration fault.
    It is just a fact that subscribe() doesn't return for us a batch of messages
    I think there was just no request to support batch for pull from Spring Integration channel adapter perspective...
    It might sound for you fundamental, but as you see for all these years only yo came to us asking about this feature from channel adapter implementation
    Therefore feels like not so fundamental :smile:
    What is fundamental that we provide a general batching solution with an Aggregator EIP implementation
    Volkan Yazıcı
    @vy
    When people start to experience performance problems due to subscriber speed, they tend to make the mistake of thinking it is the queueing medium they need to replace, i.e., Pub/Sub. Though the problem actually the interface its APIs expose.
    Artem Bilan
    @artembilan
    I leave a possible answer for your last comment to Google team members: I know little about that Pub/Sub API
    Volkan Yazıcı
    @vy
    Pub/Sub wire-API (i.e., Protobuf API) is okay. It is the official driver that sucks, IMHO.
    Mike Eltsufin
    @meltsufin
    You might be interested in this: spring-cloud/spring-cloud-gcp#2023
    We never merged it though. Maybe we should prioritize it if it's something you're looking for.
    Volkan Yazıcı
    @vy
    In case you're interested in, you can read the historical account of reactor-pubsub.
    Artem Bilan
    @artembilan
    I see our very Sergei Egorov as contributor to your project :smile:
    Volkan Yazıcı
    @vy
    He provided extensive reviews on the initial version, which I am very thankful of.
    Mike Eltsufin
    @meltsufin
    We've discussed this with your coworker @mzeijen and he submitted some improvements to Spring Cloud GCP reactive Pub/Sub support.
    Volkan Yazıcı
    @vy
    My only regret about the current state of reactor-pubsub is its reliance on HTTP 1.2, rather than HTTP 2. But this is due to HTTP 2 client support in reactor-netty. Once it is there, I will migrate from JSON-over-HTTP1 to Protobuf-over-HTTP2.
    @meltsufin Right, I am aware of it.