spring-integration
in combination with spring-cloud-gcp
to handle messages in batches. For this purposes, AggregatingMessageHandler
expects me to provide a CorrelationStrategy
, of which I don't have any for Pub/Sub. Receiving messages in batches in random order is perfectly fine by me. How shall I pass a CorrelationStrategy
in this case?
CorrelationStrategy
against every incoming message.
ReleaseStrategy
is satisfied, the aggregator releases a whole group with messages arrived to it so far
pullAndAck()
otherwise:
/**
* Pull and auto-acknowledge a number of messages from a Google Cloud Pub/Sub subscription.
* @param subscription canonical subscription name, e.g., "subscriptionName", or the fully-qualified
* subscription name in the {@code projects/<project_name>/subscriptions/<subscription_name>} format
* @param maxMessages the maximum number of pulled messages. If this value is null then
* up to Integer.MAX_VALUE messages will be requested.
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
* @return the list of received messages
*/
List<PubsubMessage> pullAndAck(String subscription, Integer maxMessages, Boolean returnImmediately);
subscribe
is not batch'able
pull
you are on your own
subscribe
semantics
ExecutorService
-backed PubSubTemplate#pullAndAck()
wrapper. This is not something new for me. My previous frustrations with Spring's Pub/Sub integration has led me to develop reactor-pubsub. reactor-pubsub
's experience is like a fresh breeze compared to all this madness. Don't get me wrong, I am not pointing any fingers. Spring Integration is a huge project and trying to do everything that is possible to do. Though this generalist approach comes with the cost of sometimes missing very fundamental and simple needs.
subscribe()
doesn't return for us a batch of messages
pull
from Spring Integration channel adapter perspective...