kiwipy.rmq.tasks module¶
- class kiwipy.rmq.tasks.RmqIncomingTask(subscriber: RmqTaskSubscriber, message: IncomingMessage)[source]¶
Bases:
object
- class kiwipy.rmq.tasks.RmqTaskPublisher(connection, queue_name='kiwipy.tasks', exchange_name='kiwipy.messages', exchange_params=None, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), confirm_deliveries=True, testing_mode=False)[source]¶
Bases:
BasePublisherWithReplyQueue
Publishes messages to the RMQ task queue and gets the response
- _channel: Optional[aio_pika.Channel]¶
- _exchange: Optional[aio_pika.Exchange]¶
- _reply_queue: Optional[aio_pika.Queue]¶
- async task_send(task, no_reply: bool = False) Future [source]¶
Send a task for processing by a task subscriber.
All task messages will be set to be persistent by setting delivery_mode=2.
- Parameters
task – The task payload
no_reply – Don’t send a reply containing the result of the task
- Returns
A future representing the result of the task
- class kiwipy.rmq.tasks.RmqTaskQueue(connection, exchange_name='kiwipy.messages', queue_name='kiwipy.tasks', decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0, testing_mode=False)[source]¶
Bases:
object
Combines a task publisher and subscriber to create a work queue where you can do both
- class kiwipy.rmq.tasks.RmqTaskSubscriber(connection: ~aio_pika.connection.Connection, exchange_name: str = 'kiwipy.messages', queue_name: str = 'kiwipy.tasks', testing_mode=False, decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0)[source]¶
Bases:
BaseConnectionWithExchange
Listens for tasks coming in on the RMQ task queue
- TASK_QUEUE_ARGUMENTS = {'x-message-ttl': 604800000}¶
- _build_response_message(body, incoming_message)[source]¶
Create a aio_pika Message as a response to a task being deal with.
- Parameters
body (dict) – The message body dictionary
incoming_message (
aio_pika.IncomingMessage
) – The original message we are responding to
- Returns
The response message
- Return type
- _channel: Optional[aio_pika.Channel]¶
- _exchange: Optional[aio_pika.Exchange]¶
- async _on_task(message: IncomingMessage)[source]¶
- Parameters
message – The aio_pika RMQ message
- _task_queue: Optional[aio_pika.Queue]¶