DefaultAWSCredentialsProvider
is used for services - which seems only be true if you aren’t using the spring boot starters. The ContextCredentialsAutoConfiguration
always creates a custom credentials provider chain that gets used for the amazon service calls - over time things have been added (like profile support) to that chain, but it’s still doesn’t support all the options of the default one
I want know if I can control the number of consumers when using Spring Cloud AWS Messaging and @MessageMapping annotation.
If I understood, I should create my own AsyncTaskExecutor. I really don't know if this is the right way.
Let me know if you could help me, guys. Thanks in advance.
@SqsListener
annotation appear to be extremely aggressive in their frequency. SQS charges per request and the Spring Cloud AWS defaults seem to be around 16 requests per second. I would like to use long pollling and adjust the interval. If anyone can point me to where I can set that I'd appreciate it. Obviously this can easily be done by using the Amazon SDK directly but I'm hoping there's a way to do this within Spring.
@kanderson450 Thanks for your attention. Actually I have the same need but in order to increase or decrease rps according to queue (it'll be decided by profile, each one with its configurations). I would like to hardly increase the number of consumers to deal with 150 rps.
If I'm right, I should pass other instance of AsyncTaskExecutor to SimpleMessageListenerContainer
https://github.com/spring-cloud/spring-cloud-aws/blob/master/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainer.java
But, is this the right way?
Thanks, guys. But that isn't my real question. Let's suppose that my application is consuming 50 records per second. If I want scale up to 150 rps, the only option that I have is scaling horizontally or can I increase (and control) my number of consumers?
protected AsyncTaskExecutor createDefaultTaskExecutor() {
String beanName = getBeanName();
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix(beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
int spinningThreads = this.getRegisteredQueues().size();
if (spinningThreads > 0) {
threadPoolTaskExecutor.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);
int maxNumberOfMessagePerBatch = getMaxNumberOfMessages() != null ? getMaxNumberOfMessages() : DEFAULT_WORKER_THREADS;
threadPoolTaskExecutor.setMaxPoolSize(spinningThreads * maxNumberOfMessagePerBatch);
}
// No use of a thread pool executor queue to avoid retaining message to long in memory
threadPoolTaskExecutor.setQueueCapacity(0);
threadPoolTaskExecutor.afterPropertiesSet();
return threadPoolTaskExecutor;
}
Can I do it just overwriting AsyncTaskExecutor configuration?
public QueueBufferConfig queueBufferConfig(@Value("${QUEUE_CONSUMER_COUNT}") int consumerCount) {
QueueBufferConfig config = new QueueBufferConfig();
// use long polling to avoid going nuts with empty fetches
config.setLongPoll(true);
config.setLongPollWaitTimeoutSeconds(20);
// more in-flight batches can help maximize throughput per box
config.setMaxInflightReceiveBatches(consumerCount * 2);
config.setMaxDoneReceiveBatches(consumerCount+1);
return config;
}
@Lazy
@Bean(destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQS( QueueBufferConfig queueBufferConfig, RegionProvider regionProvider, AWSCredentialsProvider awsCredentialsProvider) {
AmazonSQSAsyncClient amazonSQSAsyncClient;
amazonSQSAsyncClient = new AmazonSQSAsyncClient(awsCredentialsProvider);
if (regionProvider != null) {
amazonSQSAsyncClient.setRegion(regionProvider.getRegion());
}
return new AmazonSQSBufferedAsyncClient(amazonSQSAsyncClient, queueBufferConfig);
}
Hey @ryangardner! I'm so sorry for delay. I was in a really really critical project and I need to change my focus.
And, I noted that this chat isn't very active.
So, thank you so much for reply me. Seriously you need change your solution for Camel?
Is there no native solution for spring cloud aws?
I'll try overwrite AsyncTaskExecutor mentioned previously and I'll get back with the results.