DefaultAWSCredentialsProvider
is used for services - which seems only be true if you aren’t using the spring boot starters. The ContextCredentialsAutoConfiguration
always creates a custom credentials provider chain that gets used for the amazon service calls - over time things have been added (like profile support) to that chain, but it’s still doesn’t support all the options of the default one
I want know if I can control the number of consumers when using Spring Cloud AWS Messaging and @MessageMapping annotation.
If I understood, I should create my own AsyncTaskExecutor. I really don't know if this is the right way.
Let me know if you could help me, guys. Thanks in advance.
@SqsListener
annotation appear to be extremely aggressive in their frequency. SQS charges per request and the Spring Cloud AWS defaults seem to be around 16 requests per second. I would like to use long pollling and adjust the interval. If anyone can point me to where I can set that I'd appreciate it. Obviously this can easily be done by using the Amazon SDK directly but I'm hoping there's a way to do this within Spring.
@kanderson450 Thanks for your attention. Actually I have the same need but in order to increase or decrease rps according to queue (it'll be decided by profile, each one with its configurations). I would like to hardly increase the number of consumers to deal with 150 rps.
If I'm right, I should pass other instance of AsyncTaskExecutor to SimpleMessageListenerContainer
https://github.com/spring-cloud/spring-cloud-aws/blob/master/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainer.java
But, is this the right way?
Thanks, guys. But that isn't my real question. Let's suppose that my application is consuming 50 records per second. If I want scale up to 150 rps, the only option that I have is scaling horizontally or can I increase (and control) my number of consumers?
protected AsyncTaskExecutor createDefaultTaskExecutor() {
String beanName = getBeanName();
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix(beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
int spinningThreads = this.getRegisteredQueues().size();
if (spinningThreads > 0) {
threadPoolTaskExecutor.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);
int maxNumberOfMessagePerBatch = getMaxNumberOfMessages() != null ? getMaxNumberOfMessages() : DEFAULT_WORKER_THREADS;
threadPoolTaskExecutor.setMaxPoolSize(spinningThreads * maxNumberOfMessagePerBatch);
}
// No use of a thread pool executor queue to avoid retaining message to long in memory
threadPoolTaskExecutor.setQueueCapacity(0);
threadPoolTaskExecutor.afterPropertiesSet();
return threadPoolTaskExecutor;
}
Can I do it just overwriting AsyncTaskExecutor configuration?
public QueueBufferConfig queueBufferConfig(@Value("${QUEUE_CONSUMER_COUNT}") int consumerCount) {
QueueBufferConfig config = new QueueBufferConfig();
// use long polling to avoid going nuts with empty fetches
config.setLongPoll(true);
config.setLongPollWaitTimeoutSeconds(20);
// more in-flight batches can help maximize throughput per box
config.setMaxInflightReceiveBatches(consumerCount * 2);
config.setMaxDoneReceiveBatches(consumerCount+1);
return config;
}
@Lazy
@Bean(destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQS( QueueBufferConfig queueBufferConfig, RegionProvider regionProvider, AWSCredentialsProvider awsCredentialsProvider) {
AmazonSQSAsyncClient amazonSQSAsyncClient;
amazonSQSAsyncClient = new AmazonSQSAsyncClient(awsCredentialsProvider);
if (regionProvider != null) {
amazonSQSAsyncClient.setRegion(regionProvider.getRegion());
}
return new AmazonSQSBufferedAsyncClient(amazonSQSAsyncClient, queueBufferConfig);
}