How and why to poll longer than 20 seconds with AWS SQS.

Posted: 12/22/2020 Last updated: 12/27/2020

SQS is a pretty good message queue for a distributed system (not to confuse it with a message broker). It is another tool in a microservices arch to It is a great solution to scale HTTP requests or run a data/jobs pipeline. Another use case would be if you just have a ton of workers and producers and you want to scale and process the both sides. For what it claims to be it works very well. You can make sure jobs run once via settings message invisibility (which I will talk about later).

What is a poll time?

When you are in the GUI/AWS Console you can click on a queue > send and receive message > receive messages > edit poll settings.

Poll duration: how long a process sits and waits for messages and receives them. How many messages will be accepted by the process? Well that’s the:

Maximum message count: the number of messages that the process accepts to run/execute/ do something with.

If you have 1 process with 1 thread and it is intended to execute the message it might make sense to have max message count to 1 then poll again for another message after it's done. If you accept at least 2 messages you need to make sure you process the first one to completion with exception handling so that the 2nd one can get processed with it being skipped (unless your invisibility time is short but more on that later).

Now if we take a look at the boto3 library for python here is the call/poll to a queue.

response = sqs.receive_message(
        QueueUrl=QUEUE_URL,
        MaxNumberOfMessages=1,
        MessageAttributeNames=["message_type"],
        VisibilityTimeout=VISIBILITY_TIMEOUT,  # 0 - 43200 seconds
        WaitTimeSeconds=0,  # 0 - 20 seconds (delay of message retrieval/how long it actually polls for)
)

So here we see that we can add a WaitTimeSeconds which is the time the process waits for a message. In most cases it makes sense to keep it 0 so you can instantly get a message. However, if your program is continuously polling the queue server it will cause unnecessary cost especially if the total time of your job is small. In other words if your implementation is something like this:

while True:
    response = sqs.receive_message(
        QueueUrl=QUEUE_URL,
        MaxNumberOfMessages=1,
        MessageAttributeNames=["message_type"],
        VisibilityTimeout=VISIBILITY_TIMEOUT,  # 0 - 43200 seconds
        WaitTimeSeconds=0,  # 0 - 20 seconds (delay of message retrieval/how long it actually polls for)
    )
    # Do something with response/message some kind of processing
total time of each message to be processed = WaitTimeSeconds + processing time

Implementation of consistent poll time

Let’s say the average messages take 1 second to process so the python process will poll every 1 second. Now if there is great variability of how long each message takes to poll it becomes difficult to determine the true count and duration of each poll. Basically you have an undeterminitic system here and you can incur lots of instantaneous empty polling which is extra cost and a waste (unless you know your system will always have messages).

One solution is to process the jobs in another thread or process to keep poll times consistent. You can long poll and just keep firing the jobs off in a thread pool/asyncio/whatever multi process lib that works for you. Lei Mo’s post on the different options for concurrent python is excellent and this realpython article. (note: concurecery might have different implementations and results based on Python interpreter; default to CPython). For example:

while True:
    response = sqs.receive_message(
        QueueUrl=QUEUE_URL,
        MaxNumberOfMessages=1,
        MessageAttributeNames=["message_type"],
        VisibilityTimeout=VISIBILITY_TIMEOUT,  # 0 - 43200 seconds
        WaitTimeSeconds=20,  # 0 - 20 seconds (delay of message retrieval/how long it actually polls for)
    )
    message = response.get("Messages")
    t = threading.Thread(target=process_the_message, args(message,))
    t.start()

Now this will poll consistently every 20 seconds and process the message on another thread (note: process_the_message would run concurrently because of the GIL in CPython). If you want it to run on another CPU do something like (essentially make sure the implementation and library are doing parallel processing via PyPy or Pyston):

with multiprocessing.Pool() as pool:
    pool.map(process_the_message, message)

Another implementation is to use a proper cron-like scheduler to consistently poll for a new message which is what I ended up going with for a particular microservice once.

APSchechuler is a powerful cron-like package where you can run the scheduler in the background and processes in the foreground.

def poll_for_messsage() -> None:
    """Primary polling function that pulls messages off of SQS."""
    response = sqs.receive_message(
        QueueUrl=QUEUE_URL,
        MaxNumberOfMessages=1,
        MessageAttributeNames=["message_type"],
        VisibilityTimeout=VISIBILITY_TIMEOUT,  # 0 - 43200
        WaitTimeSeconds=0,  # 0 - 20 (delay of message retrieval)
    )
    message = response.get("Messages")
    process_the_message(message)

# APSchuler
s_lg.info(f"Starting polling from Ingestion to {QUEUE_URL}")
scheduler = BackgroundScheduler()  # set time zone
scheduler.start()
scheduler.add_job(
    func=poll_for_messsage,
    trigger="interval",
    seconds=POLL_INTERVAL,
    id="poller",
)
# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())

If the the process is still processing when it tries to poll you might get a log message like this:

Execution of job "<name of your job> (trigger: interval[0:00:30], next run at: 2020-12-17 23:21:53 UTC)" skipped: maximum number of running instances reached (1)

Most likely if you are trying to poll you are using a web framework like Flask, Django, FastAPI etc. You can actually do this with one of the frameworks and have a callback function to process it as if it was an API call. This is actually how Elastic Beanstalk works, in that it polls and then does an HTTP Loopback call.

Why poll longer than 20 seconds?

Currently you can long poll at max wait interval of 20 seconds or min poll interval (1 sec). However, what if you want your system to poll for a variable length of time? And you are not using “serverless” architecture where you can raise compute as needed because your jobs run on the scale of minutes and hours not seconds.

This is a good use case to customize and make your polling time. If 80% of the time your queue is empty you would want polling to be very high. Especially if you have a large volume of compute each process polling incurs cost. Then for the 20% of time when a job/etl/something is running you have your system shorten the polling duration AND skip polling while a job is being processed.

In short you may want to poll longer than 20 seconds to: save on incurring empty polling costs, if you are processing jobs synchronously then you might increase the poll wait interval to make sure you jobs are finished before change the poll time to a shorter time when you know you have some jobs to run.