Processing more than 10 SQS messages concurrently with Spring Cloud AWS Messaging

2021-09-14


Problem

The issue with Spring Cloud AWS Messaging and its @SqsListener annotation is that it cannot process more than 10 concurrent messages. Why is that?
AWS SQS works via a polling model. A client is making requests to the SQS endpoint in an attempt to consume available messages. If more than 1 message is available upon receiving a request, the endpoint could return multiple messages in the response. The client can control the maximum number of returned messages via MaxNumberOfMessages[1] parameter. However, SQS limits this number by 10.
So what to do if you need to take more than 10 messages into processing simultaneously? You should make multiple requests. However, the problem with Spring Cloud AWS is that it won’t make another request until all messages from the previous request have been processed.
Let’s look at Spring Cloud AWS Messaging code from the latest (at the time of writing) version 2.3.2. Particularly, we’re interested a while loop in SimpleMessageListenerContainer class, which is responsible for polling a queue and pushing received messages to the @SqsListener-annotated handler method:

while (isQueueRunning(this.logicalQueueName)) {
    try {
        ReceiveMessageResult receiveMessageResult = getAmazonSqs()
            .receiveMessage(this.queueAttributes.getReceiveMessageRequest());
        final List messageGroups = queueAttributes.isFifo()
            ? groupByMessageGroupId(receiveMessageResult) : groupByMessage(receiveMessageResult);
        CountDownLatch messageBatchLatch = new CountDownLatch(messageGroups.size());
        for (MessageGroup messageGroup : messageGroups) {
            if (isQueueRunning(this.logicalQueueName)) {
                MessageGroupExecutor messageGroupExecutor = new MessageGroupExecutor(this.logicalQueueName,
                    messageGroup, this.queueAttributes);
                getTaskExecutor()
                    .execute(new SignalExecutingRunnable(messageBatchLatch, messageGroupExecutor));
            }
            else {
                messageBatchLatch.countDown();
            }
        }
        try {
            messageBatchLatch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    catch (Exception e) {
        /* ... */
    }
}

What is going on here:

  • First, it attempts to receive messages. Spring Cloud sets MaxNumberOfMessages to 10 by default[2] – the maximum value allowed by SQS
  • For each received message, it asynchronously invokes the @SqsListener-annotated handler method via a thread pool executor. By default, Spring Cloud configures the number of workers in that pool as (MaxNumberOfMessages + 1) * [number of @SqsListener annotated methods].
  • Then it waits for all messages to have been processed by messageBatchLatch.await()
  • Repeats from the beginning

For each queue defined with @SqsListener in your app, SimpleMessageListenerContainer spawns this while loop on one of the executor’s threads (this is why +1 appears in the formula above). The loop never submits more than 10 messages to the executor per iteration because of MaxNumberOfMessages. Increasing the default number of threads in the pool to more than 10 won’t make any difference. As a result, this could lead to a significant underperformance of the application.
If the @SqsListener handler method was CPU bound you would probably be okay with it, depending on the number of available CPUs and the desired concurrency. However, more often message processing involves I/O, e.g. interaction with a database or web services. I/O could be pretty time consuming, and while you’re waiting for packets to be sent or received you don’t use the CPU. This means that for I/O bound processing, you shouldn’t be limiting your concurrency by the number of available CPUs. Otherwise, your application may be doing much less than its resources allow it to. For instance, the default thread pool of embedded Tomcat in Spring Boot has 200 workers[3], which is way more than the number of CPUs on a typical node.
Things could get even worse. Imagine that 9 of the 10 received messages take 1 second each to be processed, but the 10th message takes 10 seconds. In this case, 9 of the 10 threads will process the first 9 messages in 1 second and remain idling for the next 9 seconds until the “heavy” 10th message has been processed and the next pack of messages have been received. So threads are doing nothing while there could be plenty of work available in SQS.

Quick solution

The described issue was reported to Spring Cloud AWS[4][5] and hasn’t been resolved as of the time of writing. However, there’s a pretty simple workaround one could do.

First, we create a custom thread pool executor of the desired size which will be processing messages:

@Configuration
public class AppConfig {

    private int poolSize = 100;

    @Bean
    public ThreadPoolTaskExecutor messageListenerExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(poolSize);
        executor.setQueueCapacity(0);
        executor.setRejectedExecutionHandler(new BlockingSubmissionPolicy(3000));
        return executor;
    }
}

public class BlockingSubmissionPolicy implements RejectedExecutionHandler {

    private final long timeout;

    public BlockingSubmissionPolicy(long timeout) {
        this.timeout = timeout;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            BlockingQueue queue = executor.getQueue();
            if (!queue.offer(r, this.timeout, TimeUnit.MILLISECONDS)) {
                throw new RejectedExecutionException("Timeout");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            /* ... */
        }
    }
}

Note that we supply it with a custom task rejection handler. This handler will block if the executor’s queue is full. This is necessary to ensure back pressure, i.e. not let the application read out the whole SQS queue.
We also set queue capacity to 0 so as to prevent unnecessary buffering of messages in the application memory. As a result, submission of a new task will be blocked if all of the workers are busy processing other messages.

We won’t use this executor in place of the default one in SimpleMessageListenerContainer. As was mentioned earlier, increasing the number of workers in that pool won’t solve the problem as the bottleneck is in a different place. Instead, we need to have the @SqsListener-annotated method submit the message processing into the custom executor:

public class MessageListener {

    @Autowired
    private AsyncTaskExecutor messageListenerExecutor;

    @SqsListener(value = "test-queue", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
    public void onMessage(Object message, Acknowledgment ack) {
        messageListenerExecutor.submit(() -> {
            try {
                // message processing goes here
                ack.acknowledge();
            } catch (Exception e) {
                /* */
            }
        });
    }
}

The onMessage method is still invoked on the SimpleMessageListenerContainer thread pool, but the actual message processing is now done in the custom executor. If the executor has enough free workers to process all the messages the submission will be instantaneous, letting the loop in SimpleMessageListenerContainer fetch another pack of messages. When the executor’s capacity is full the submission will block until there’s a room in the pool, delaying the next hop to the SQS.
Since messages are now processed asynchronously, it’s important to use SqsMessageDeletionPolicy.NEVER policy and delete messages on your own using the Acknowledgement handle.
Please note that this solution should not be used with FIFO queues, as the desired processing order could be violated.

References

  1. https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
  2. https://github.com/awspring/spring-cloud-aws/blob/v2.3.2/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/AbstractMessageListenerContainer.java#L63
  3. https://github.com/spring-projects/spring-boot/blob/2.5.x/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/web/ServerProperties.java#L819
  4. https://github.com/spring-cloud/spring-cloud-aws/issues/166
  5. https://github.com/awspring/spring-cloud-aws/issues/23

About Anton Shumsky

Related Posts

Leave a Reply

Your email address will not be published. Required fields are marked *

*