kiwipy.rmq.tasks module

class kiwipy.rmq.tasks.RmqIncomingTask(subscriber: RmqTaskSubscriber, message: IncomingMessage)[source]

Bases: object

_finalise()[source]
_on_task_done(outcome)[source]

Schedule a task to call _task_done when the outcome is done.

_outcome_destroyed(outcome_ref)[source]
async _task_done(outcome: Future)[source]
property body: str
property no_reply: bool
process() Future[source]
processing() Generator[Future, None, None][source]

Processing context. The task should be done at the end otherwise it’s assumed the caller doesn’t want to process it, and it’s sent back to the queue

async requeue()[source]
property state: str
class kiwipy.rmq.tasks.RmqTaskPublisher(connection, queue_name='kiwipy.tasks', exchange_name='kiwipy.messages', exchange_params=None, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), confirm_deliveries=True, testing_mode=False)[source]

Bases: BasePublisherWithReplyQueue

Publishes messages to the RMQ task queue and gets the response

_channel: Optional[aio_pika.Channel]
_exchange: Optional[aio_pika.Exchange]
_reply_queue: Optional[aio_pika.Queue]
async task_send(task, no_reply: bool = False) Future[source]

Send a task for processing by a task subscriber.

All task messages will be set to be persistent by setting delivery_mode=2.

Parameters
  • task – The task payload

  • no_reply – Don’t send a reply containing the result of the task

Returns

A future representing the result of the task

class kiwipy.rmq.tasks.RmqTaskQueue(connection, exchange_name='kiwipy.messages', queue_name='kiwipy.tasks', decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0, testing_mode=False)[source]

Bases: object

Combines a task publisher and subscriber to create a work queue where you can do both

async add_task_subscriber(subscriber, identifier=None)[source]
async connect()[source]
async disconnect()[source]
next_task(no_ack=False, fail=True, timeout=5.0)[source]
async remove_task_subscriber(identifier)[source]
async task_send(task, no_reply: bool = False)[source]

Send a task to the queue

class kiwipy.rmq.tasks.RmqTaskSubscriber(connection: ~aio_pika.connection.Connection, exchange_name: str = 'kiwipy.messages', queue_name: str = 'kiwipy.tasks', testing_mode=False, decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0)[source]

Bases: BaseConnectionWithExchange

Listens for tasks coming in on the RMQ task queue

TASK_QUEUE_ARGUMENTS = {'x-message-ttl': 604800000}
_build_response_message(body, incoming_message)[source]

Create a aio_pika Message as a response to a task being deal with.

Parameters
Returns

The response message

Return type

aio_pika.Message

_channel: Optional[aio_pika.Channel]
async _create_task_queue()[source]

Create and bind the task queue

_exchange: Optional[aio_pika.Exchange]
async _on_task(message: IncomingMessage)[source]
Parameters

message – The aio_pika RMQ message

async _send_response(msg_body, incoming_message)[source]
_task_queue: Optional[aio_pika.Queue]
async add_task_subscriber(subscriber, identifier=None)[source]
async connect()[source]
next_task(no_ack=False, fail=True, timeout=5.0) Generator[RmqIncomingTask, None, None][source]

Get the next task from the queue.

raises:

kiwipy.exceptions.QueueEmpty: When the queue has no tasks within the timeout

async remove_task_subscriber(identifier)[source]