API Reference#

class squids.App(name: str, boto_config: ~typing.Dict | None = None, serde: ~typing.Type[~squids.serde.Serde] = <class 'squids.serde.JSONSerde'>, delete_late: bool = False)#

Bases: object

The central object for registering tasks and creating consumers.

Parameters:
  • name – An identifier for the application.

  • boto_config – An optional configuration dict which takes the same values as boto3.session.Session.client.

  • serde – An optional Serde subclass that is used to serialize and deserialize message bodies when sending and consuming. If not provided, JSONSerde will be used.

  • delete_late – An optional boolean that determines when a message is deleted from SQS. If the value is True then the message is deleted after processing the task. This means an unhandled exception in the task could lead to the message not being deleted and re-delivered when the visibility timeout expires. It also means a task must finish processing it’s task within the visibility timeout window or it could be re-delivered. The default value of False is generally encouraged unless your tasks are idempotent or you have appropriate DLQ handling and or de-duping configured.

add_task(task_cls: Type[Task]) Task#

Provides a way for custom Task subclasses to be registered and created as runnable tasks. Usage looks like:

class MyTask(squids.Task):
    queue = "some-queue"

    def run(some_arg):
        # The task body goes here
        ...

my_task_instance = app.add_task(MyTask)
my_task.send('some_value')
Parameters:

task_cls – A subclass of Task which usually will have overridden the Task.run() method.

Returns:

An instance of task_cls that can be used for sending tasks to task_cls.queue.

create_consumer(queue_name: str) Consumer#

A convenience method for creating a Consumer instance.

Parameters:

queue_name – A string identifying the queue.

Returns:

An instance of Consumer.

get_queue_by_name(queue_name: str) str#

A convenience method for getting a queue URL by name.

Parameters:

queue_name – A string identifying the queue.

Returns:

The queue URL.

post_send(func: PostSendCallback) PostSendCallback#

Decorator for registering a callback that is invoked producer side after the message is sent to the queue.

The callback takes three arguments:

  • queue - A string indicating the queue the message will be sent to.

  • body - The dict that was serialized and sent into the queue.

  • response - A dict response which is the return value from

    SQS.Client.send_message.

Parameters:

func – The function to be decorated.

Returns:

The decorated function.

post_task(func: Callable[[Task], None]) Callable[[Task], None]#

Decorator for registering a callback that is invoked consumer side after the message is consumed and the task is run.

The callback takes a single argument, task, which is an instance of Task.

Parameters:

func – The function to be decorated.

Returns:

The decorated function.

pre_send(func: Callable[[str, Dict], None]) Callable[[str, Dict], None]#

Decorator for registering a callback that is invoked producer side before the message is sent to the queue.

The callback takes two arguments:

  • queue - A string indicating the queue the message will be sent to.

  • body - A dict that will be serialized and sent into the queue.

Parameters:

func – The function to be decorated.

Returns:

The decorated function.

pre_task(func: Callable[[Task], None]) Callable[[Task], None]#

Decorator for registering a callback that is invoked consumer side after the message is consumed, but right before the task is run.

The callback takes a single argument, task, which is an instance of Task.

Parameters:

func – The function to be decorated.

Returns:

The decorated function.

task(queue: str) Callable#

A decorator method which takes a queue name and registers the decorated function with the app.

Parameters:

queue – The name of the queue that this task should go to by default when being sent.

Returns:

The decorated function which is augmented to have a send and send_job attribute.

class squids.Consumer(app: App, queue_url: str)#

Bases: object

Object which consumes messages from the provided SQS queue and executes the appropriate Task for a message. Usually you’ll create a consumer instance through the App.create_consumer() method instead of directly instantiating one yourself.

Parameters:
  • app – An instance of App.

  • queue_url – The queue URL.

consume(options: Dict | None = None) None#

Consumes messages from the associated queue and runs the appropriate Task for each message. Calling this only consumes as many messages as SQS.Client.receive_message returns. If you want to consume continuously then you’ll need to put calls to in a loop.

Parameters:

options – A dict of optional values to pass to SQS.Client.receive_message.

Returns:

consume_messages(options: Dict | None = None) Iterator[MessageTypeDef]#

Consume messages from the associated queue. Unlike Consumer.consume() this method returns a generator over all the messages returned by calling SQS.Client.receive_message.

Parameters:

options – A dict of optional values to pass to SQS.Client.receive_message.

Returns:

A generator that yields message dicts.

run_task(message: MessageTypeDef) Any#
class squids.Task(app: App, queue: str, func: Callable | None = None, pre_task: Callable[[Task], None] | None = None, post_task: Callable[[Task], None] | None = None)#

Bases: object

An object that wraps some task to be done, usually a function, or some callable. You’ll rarely, if ever ever, need to instantiate an instance of this yourself, but instead will use App.task() or App.add_task() to handle instantiating it.

Parameters:
  • app – An instance of App.

  • queue – A queue name that this task should be sent to by default.

  • func – The job to be done. If this is None then Task.run() should be overridden.

  • pre_task – An optional callback function to be invoked right before the task is run. See App.pre_task() for more details on the callback.

  • post_task – An optional callback function to be invoked right after the task is run. See App.post_task() for more details on the callback.

run(*args, **kwargs) Any#

Executes the provided task. If you are creating your own Task subclass then this method should be overridden.

Parameters:
  • args – Positional arguments for the task.

  • kwargs – Keyword arguments for the task.

Returns:

Any

send(*args, **kwargs) SendMessageResultTypeDef#

Splat args and kwargs version of Task.send_job() which does not support the extra options or queue argument provided by Task.send_job().

Parameters:
  • args – Positional arguments for the task.

  • kwargs – Keyword arguments for the task.

Returns:

The response from SQS which is the same returned by SQS.Client.send_message.

send_job(args: Tuple | None = None, kwargs: Dict | None = None, options: Dict | None = None, queue: str | None = None) SendMessageResultTypeDef#

Send a task into the associated SQS queue.

Parameters:
  • args – Positional arguments for the task.

  • kwargs – Keyword arguments for the task.

  • options – A dict of optional arguments when sending into the queue. Takes the same values as SQS.Client.send_message minus the MessageBody.

  • queue – Forces sending a message to specific queue. This overrides the default queue.

Returns:

The response from SQS which is the same returned by SQS.Client.send_message.

class squids.serde.JSONSerde#

Bases: Serde

classmethod deserialize(body: str) Dict[str, Any]#
classmethod serialize(body: Dict[str, Any]) str#
class squids.serde.Serde#

Bases: object

classmethod deserialize(body: str) Dict[str, Any]#
classmethod serialize(body: Dict[str, Any]) str#