API Reference#
- class squids.App(name: str, boto_config: ~typing.Dict | None = None, serde: ~typing.Type[~squids.serde.Serde] = <class 'squids.serde.JSONSerde'>, delete_late: bool = False)#
Bases:
objectThe central object for registering tasks and creating consumers.
- Parameters:
name – An identifier for the application.
boto_config – An optional configuration dict which takes the same values as boto3.session.Session.client.
serde – An optional
Serdesubclass that is used to serialize and deserialize message bodies when sending and consuming. If not provided,JSONSerdewill be used.delete_late – An optional boolean that determines when a message is deleted from SQS. If the value is
Truethen the message is deleted after processing the task. This means an unhandled exception in the task could lead to the message not being deleted and re-delivered when the visibility timeout expires. It also means a task must finish processing it’s task within the visibility timeout window or it could be re-delivered. The default value ofFalseis generally encouraged unless your tasks are idempotent or you have appropriate DLQ handling and or de-duping configured.
- add_task(task_cls: Type[Task]) Task#
Provides a way for custom
Tasksubclasses to be registered and created as runnable tasks. Usage looks like:class MyTask(squids.Task): queue = "some-queue" def run(some_arg): # The task body goes here ... my_task_instance = app.add_task(MyTask) my_task.send('some_value')
- Parameters:
task_cls – A subclass of
Taskwhich usually will have overridden theTask.run()method.- Returns:
An instance of
task_clsthat can be used for sending tasks totask_cls.queue.
- create_consumer(queue_name: str) Consumer#
A convenience method for creating a
Consumerinstance.- Parameters:
queue_name – A string identifying the queue.
- Returns:
An instance of
Consumer.
- get_queue_by_name(queue_name: str) str#
A convenience method for getting a queue URL by name.
- Parameters:
queue_name – A string identifying the queue.
- Returns:
The queue URL.
- post_send(func: PostSendCallback) PostSendCallback#
Decorator for registering a callback that is invoked producer side after the message is sent to the queue.
The callback takes three arguments:
queue- A string indicating the queue the message will be sent to.body- The dict that was serialized and sent into the queue.response- A dict response which is the return value from
- Parameters:
func – The function to be decorated.
- Returns:
The decorated function.
- post_task(func: Callable[[Task], None]) Callable[[Task], None]#
Decorator for registering a callback that is invoked consumer side after the message is consumed and the task is run.
The callback takes a single argument,
task, which is an instance ofTask.- Parameters:
func – The function to be decorated.
- Returns:
The decorated function.
- pre_send(func: Callable[[str, Dict], None]) Callable[[str, Dict], None]#
Decorator for registering a callback that is invoked producer side before the message is sent to the queue.
The callback takes two arguments:
queue- A string indicating the queue the message will be sent to.body- A dict that will be serialized and sent into the queue.
- Parameters:
func – The function to be decorated.
- Returns:
The decorated function.
- pre_task(func: Callable[[Task], None]) Callable[[Task], None]#
Decorator for registering a callback that is invoked consumer side after the message is consumed, but right before the task is run.
The callback takes a single argument,
task, which is an instance ofTask.- Parameters:
func – The function to be decorated.
- Returns:
The decorated function.
- task(queue: str) Callable#
A decorator method which takes a queue name and registers the decorated function with the app.
- Parameters:
queue – The name of the queue that this task should go to by default when being sent.
- Returns:
The decorated function which is augmented to have a
sendandsend_jobattribute.
- class squids.Consumer(app: App, queue_url: str)#
Bases:
objectObject which consumes messages from the provided SQS queue and executes the appropriate
Taskfor a message. Usually you’ll create a consumer instance through theApp.create_consumer()method instead of directly instantiating one yourself.- Parameters:
app – An instance of
App.queue_url – The queue URL.
- consume(options: Dict | None = None) None#
Consumes messages from the associated queue and runs the appropriate
Taskfor each message. Calling this only consumes as many messages as SQS.Client.receive_message returns. If you want to consume continuously then you’ll need to put calls to in a loop.- Parameters:
options – A dict of optional values to pass to SQS.Client.receive_message.
- Returns:
- consume_messages(options: Dict | None = None) Iterator[MessageTypeDef]#
Consume messages from the associated queue. Unlike
Consumer.consume()this method returns a generator over all the messages returned by calling SQS.Client.receive_message.- Parameters:
options – A dict of optional values to pass to SQS.Client.receive_message.
- Returns:
A generator that yields message dicts.
- run_task(message: MessageTypeDef) Any#
- class squids.Task(app: App, queue: str, func: Callable | None = None, pre_task: Callable[[Task], None] | None = None, post_task: Callable[[Task], None] | None = None)#
Bases:
objectAn object that wraps some task to be done, usually a function, or some callable. You’ll rarely, if ever ever, need to instantiate an instance of this yourself, but instead will use
App.task()orApp.add_task()to handle instantiating it.- Parameters:
app – An instance of
App.queue – A queue name that this task should be sent to by default.
func – The job to be done. If this is None then
Task.run()should be overridden.pre_task – An optional callback function to be invoked right before the task is run. See
App.pre_task()for more details on the callback.post_task – An optional callback function to be invoked right after the task is run. See
App.post_task()for more details on the callback.
- run(*args, **kwargs) Any#
Executes the provided task. If you are creating your own Task subclass then this method should be overridden.
- Parameters:
args – Positional arguments for the task.
kwargs – Keyword arguments for the task.
- Returns:
Any
- send(*args, **kwargs) SendMessageResultTypeDef#
Splat args and kwargs version of
Task.send_job()which does not support the extraoptionsorqueueargument provided byTask.send_job().- Parameters:
args – Positional arguments for the task.
kwargs – Keyword arguments for the task.
- Returns:
The response from SQS which is the same returned by SQS.Client.send_message.
- send_job(args: Tuple | None = None, kwargs: Dict | None = None, options: Dict | None = None, queue: str | None = None) SendMessageResultTypeDef#
Send a task into the associated SQS queue.
- Parameters:
args – Positional arguments for the task.
kwargs – Keyword arguments for the task.
options – A dict of optional arguments when sending into the queue. Takes the same values as SQS.Client.send_message minus the
MessageBody.queue – Forces sending a message to specific queue. This overrides the default queue.
- Returns:
The response from SQS which is the same returned by SQS.Client.send_message.