User’s Guide#
Application#
The squids.App serves as the central object for configuration, registering tasks, and creating consumers. It is
also responsible for knowing how to connect to SQS. It takes several arguments, but the ones you will use most often are
name and boto_config, the later of which is optional. name is a string identifier for your application while
boto_config is a dictionary of configuration values that are passed to boto3.session.Session.client.
app = squids.App(
"my-app",
boto_config={"aws_access_key_id": "abc", "aws_secret_access_key": "secret"}
)
Once you have created a squids.App instance then you can begin registering tasks, sending
tasks, and consuming tasks. Registering a task looks like this:
@app.task("emails")
def email_customer(to_addr, body):
...
This will register the email_customer function as a task with the app. The App.task()
decorator takes a queue argument, which is the name of the SQS queue where the task should be sent to.
Sending Tasks#
Once you have registered your tasks with the application you can begin to send your task into the
specified queue using Task.send() or Task.send_job() as demonstrated below:
email_customer.send("foo@domain.com", "Hello!")
email_customer.send_job(
args=("foo@domain.com", "Hello!"),
kwargs={},
options={"DelaySeconds": 5}
)
Calling send or send_job will ensure that any arguments and keyword arguments match the
signature of the function. If they don’t a TypeError will be raised. Both send and send_job
will return a response of the same form, a SQS.Client.send_message.
The difference between send and send_job is that send_job also accepts an options
dict which accepts all the same arguments as SQS.Client.send_message
except for the MessageBody.
When you send a task it will, by default, serialize the arguments and keyword arguments using json. This means that anything unable to be json serialized cannot be passed to
sendorsend_job. This behavior can be customized by providing theserdekeyword argument tosquids.Appwith a class that is of typesquids.serde.Serde.
You can still run your functions synchronously if you want.
email_customer("foo@domain.com", "Hello!")
Doing this will not send a task through the SQS queue, but instead simply call the function and execute it in the calling process like normal.
Consuming Tasks#
Once you have sent a task into an SQS queue you’ll likely want to run it eventually. To run the task
you need to consume it. We can get a consumer for a queue by calling App.create_consumer().
create_consumer takes a single argument which is the queue name. Once we have the consumer we
can begin to consume and run our tasks like so:
consumer = app.create_consumer("emails")
while True:
consumer.consume(
options={"WaitTimeSeconds": 5, "MaxNumberOfMessages": 10, "VisibilityTimeout": 30}
)
Consumer.consume() will fetch messages from the emails SQS queue and run the function
associated with each received message. In our case it’ll run the email_customer function. The
options keyword argument is an optional dict that takes the same values as SQS.Client.receive_message.
Often you’ll want to be consuming your tasks in another process to keep from blocking your main
program. If you don’t want to build your own consumer clients you can look at using the squids command line consumer
tool which makes simple consuming of tasks easy.
Application Hooks#
There are a couple of hooks you can register with your application.
App.pre_send()- Runs producer side just before the task is sent to the SQS queue.App.post_send()- Runs producer side just after the task is sent to the SQS queue.App.pre_task()- Runs consumer side after the message is consumed, but just before the task is run.App.post_task()- Runs consumer side after the message is consumed and the task is run.
@app.pre_send
def before_send(queue_name, body):
...
@app.post_send
def after_send(queue_name, body, response):
...
@app.pre_task
def before_task(task):
...
@app.after_task
def after_task(task):
...
These hooks provide a good opportunity for performing logging or metrics related to the production and consumption of tasks.
Command Line Consumer#
SQuidS ships with a command line consumer, squids. You can always build your own consumers
(See Consuming Tasks), but this one provides a great starting point that you can use to quickly
scale out your rate of consumption.
usage: squids [-h] -q QUEUE [-w WORKERS] -a APP [--polling-wait-time {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}] [--visibility-timeout VISIBILITY_TIMEOUT]
[--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}]
optional arguments:
-h, --help show this help message and exit
-q QUEUE, --queue QUEUE
The name of the SQS queue to process.
-w WORKERS, --workers WORKERS
The number of workers to run. Defaults to the number of CPUs in the system
-a APP, --app APP Path to the application class something like package.module:app where app is an instance of squids.App
--polling-wait-time {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}
The WaitTimeSeconds for polling for messages from the queue. Consult the AWS SQS docs on long polling for more information about this setting. https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-
short-and-long-polling.html#sqs-long-polling
--visibility-timeout VISIBILITY_TIMEOUT
The VisibilityTimeout duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}
Set the logging level for the consumer. Logs will be handled using the logging.SteamHandler with the stream set to stdout
It works by creating a pool of worker processes. The consumer then passes the tasks it receives to be run by the workers. This allows for increased consumption throughput. The consumer will never consumer more than 2x the number of workers to prevent feeding tasks faster than the workers can process them.
If you need to increase the consumption rate then you can run the consumer on additonal machines or pods.