kiwipy.rmq package

class kiwipy.rmq.RmqCommunicator(connection: ~aio_pika.connection.Connection, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange: str = 'kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False)[source]

Bases: object

An asynchronous communicator that relies on aio_pika to make a connection to a RabbitMQ server and uses an asyncio event loop for scheduling coroutines and callbacks.

_connection = None
_default_task_queue: RmqTaskQueue | None = None
_ensure_connected()[source]
_message_publisher = None
_message_subscriber = None
async add_broadcast_subscriber(subscriber, identifier=None)[source]
add_close_callback(callback: Callable[[AbstractConnection, BaseException | None], Any], weak: bool = False) None[source]

Add a callable to be called each time (after) the connection is closed.

Parameters:

weak – If True, the callback will be added to a WeakSet

async add_rpc_subscriber(subscriber, identifier=None)[source]
async add_task_subscriber(subscriber, identifier=None)[source]
async broadcast_send(body, sender=None, subject=None, correlation_id=None)[source]
async connect()[source]

Establish a connection if not already connected.

connected() bool[source]
async disconnect()[source]

Disconnect from the connection if connected.

async get_default_task_queue() RmqTaskQueue[source]

Get a default task queue.

If one doesn’t exist it will be created as part of this call.

async get_message_publisher() RmqPublisher[source]

Get a message publisher.

If one doesn’t exist it will be created as part of this call.

async get_message_subscriber() RmqSubscriber[source]

Get the message subscriber.

If one doesn’t exist it will be created as part of this call.

property loop

Get the event loop instance driving this communicator connection.

async remove_broadcast_subscriber(identifier)[source]
async remove_rpc_subscriber(identifier)[source]
async remove_task_subscriber(identifier)[source]
async rpc_send(recipient_id, msg)[source]

Initiate a remote procedure call on a recipient.

Parameters:
  • recipient_id – The recipient identifier

  • msg – The body of the message

Returns:

A future corresponding to the outcome of the call

property server_properties: Dict

A dictionary containing server properties as returned by the RMQ server at connection time.

The details are defined by the RMQ standard and can be found here:

https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties

The protocol states that this dictionary SHOULD contain at least:

‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information

Note

In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.

Returns:

the server properties dictionary

async task_queue(queue_name: str, prefetch_size=0, prefetch_count=0) RmqTaskQueue[source]

Create a new task queue.

async task_send(task, no_reply=False)[source]
class kiwipy.rmq.RmqIncomingTask(subscriber: RmqTaskSubscriber, message: IncomingMessage)[source]

Bases: object

_finalise()[source]
_on_task_done(outcome)[source]

Schedule a task to call _task_done when the outcome is done.

_outcome_destroyed(outcome_ref)[source]
async _task_done(outcome: Future)[source]
property body: str
property no_reply: bool
process() Future[source]
processing() Generator[Future, None, None][source]

Processing context. The task should be done at the end otherwise it’s assumed the caller doesn’t want to process it, and it’s sent back to the queue

async requeue()[source]
property state: str
class kiwipy.rmq.RmqTaskPublisher(connection, queue_name='kiwipy.tasks', exchange_name='kiwipy.messages', exchange_params=None, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), confirm_deliveries=True, testing_mode=False)[source]

Bases: BasePublisherWithReplyQueue

Publishes messages to the RMQ task queue and gets the response

async task_send(task, no_reply: bool = False) Future[source]

Send a task for processing by a task subscriber.

All task messages will be set to be persistent by setting delivery_mode=2.

Parameters:
  • task – The task payload

  • no_reply – Don’t send a reply containing the result of the task

Returns:

A future representing the result of the task

class kiwipy.rmq.RmqTaskQueue(connection, exchange_name='kiwipy.messages', queue_name='kiwipy.tasks', decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0, testing_mode=False)[source]

Bases: object

Combines a task publisher and subscriber to create a work queue where you can do both

async add_task_subscriber(subscriber, identifier=None)[source]
async connect()[source]
async disconnect()[source]
next_task(no_ack=False, fail=True, timeout=5.0)[source]
async remove_task_subscriber(identifier)[source]
async task_send(task, no_reply: bool = False)[source]

Send a task to the queue

class kiwipy.rmq.RmqTaskSubscriber(connection: ~aio_pika.connection.Connection, exchange_name: str = 'kiwipy.messages', queue_name: str = 'kiwipy.tasks', testing_mode=False, decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0)[source]

Bases: BaseConnectionWithExchange

Listens for tasks coming in on the RMQ task queue

TASK_QUEUE_ARGUMENTS = {'x-message-ttl': 604800000}
_build_response_message(body, incoming_message)[source]

Create a aio_pika Message as a response to a task being deal with.

Parameters:
Returns:

The response message

Return type:

aio_pika.Message

async _create_task_queue()[source]

Create and bind the task queue

async _on_task(message: IncomingMessage)[source]
Parameters:

message – The aio_pika RMQ message

async _send_response(msg_body, incoming_message)[source]
async add_task_subscriber(subscriber, identifier=None)[source]
async connect()[source]
next_task(no_ack=False, fail=True, timeout=5.0) Generator[RmqIncomingTask, None, None][source]

Get the next task from the queue.

raises:

kiwipy.exceptions.QueueEmpty: When the queue has no tasks within the timeout

async remove_task_subscriber(identifier)[source]
class kiwipy.rmq.RmqThreadCommunicator(connection_params: str | dict | None = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange='kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]

Bases: Communicator

RabbitMQ communicator that runs an event loop on a separate thread to do communication. This also means that heartbeats are not missed and the main program is free to block for as long as it wants.

TASK_TIMEOUT = 5.0
_ensure_open()[source]
_wrap_future(kiwi_future: Future)[source]
_wrap_subscriber(subscriber)[source]

” We need to convert any kiwipy.Futures we get from a subscriber call into asyncio ones for the event loop based communicator. Do this by wrapping any subscriber methods and intercepting the return values.

add_broadcast_subscriber(subscriber, identifier=None)[source]

Add a broadcast subscriber that will receive all broadcast messages

Parameters:
  • subscriber – the subscriber function to be called

  • identifier – an optional identifier for the subscriber

Returns:

an identifier for the subscriber and can be subsequently used to remove it

add_close_callback(callback: Callable[[AbstractConnection, BaseException | None], Any], weak: bool = False) None[source]

Add a callable to be called each time (after) the connection is closed.

Parameters:

weak – If True, the callback will be added to a WeakSet

add_rpc_subscriber(subscriber, identifier=None)[source]

Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.

add_task_subscriber(subscriber, identifier=None)[source]

Add a task subscriber to the communicator’s default queue. Returns the identifier.

Parameters:
  • subscriber – The task callback function

  • identifier – the subscriber identifier

broadcast_send(body, sender=None, subject=None, correlation_id=None)[source]

Broadcast a message to all subscribers

close()[source]

Close down the communicator and clean up all resources.

After this call it cannot be used again.

classmethod connect(connection_params: str | dict | None = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]
is_closed() bool[source]

Return True if the communicator was closed

loop()[source]
remove_broadcast_subscriber(identifier)[source]

Remove a broadcast subscriber

Parameters:

identifier – the identifier of the subscriber to remove

remove_rpc_subscriber(identifier)[source]

Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.

Parameters:

identifier – The RPC subscriber identifier

remove_task_subscriber(identifier)[source]

Remove a task subscriber from the communicator’s default queue.

Parameters:

identifier – the subscriber to remove

Raises:

ValueError if identifier does not correspond to a known subscriber

rpc_send(recipient_id, msg)[source]

Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.

Parameters:
  • recipient_id – The recipient identifier

  • msg – The body of the message

Returns:

A future corresponding to the outcome of the call

Return type:

kiwipy.Future

property server_properties: Dict

A dictionary containing server properties as returned by the RMQ server at connection time.

The details are defined by the RMQ standard and can be found here:

https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties

The protocol states that this dictionary SHOULD contain at least:

‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information

Note

In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.

Returns:

the server properties dictionary

task_queue(queue_name: str, prefetch_size=0, prefetch_count=0)[source]
task_send(task, no_reply=False)[source]

Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.

Parameters:
  • task – The task message

  • no_reply (bool) – Do not send a reply containing the result of the task

Returns:

A future corresponding to the outcome of the task

class kiwipy.rmq.RmqThreadTaskQueue(task_queue: RmqTaskQueue, loop_scheduler: LoopScheduler, wrap_subscriber)[source]

Bases: object

Thread task queue.

add_task_subscriber(subscriber)[source]
next_task(timeout=5.0, fail=True)[source]
remove_task_subscriber(subscriber)[source]
task_send(task, no_reply=False)[source]
async kiwipy.rmq.async_connect(connection_params: str | dict | None = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange: str = 'kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqCommunicator[source]

Convenience method that returns a connected communicator.

Parameters:
  • connection_params – parameters that will be passed to the connection factory to create the connection

  • connection_factory – the factory method to open the aio_pika connection with

  • message_exchange – The name of the RMQ message exchange to use

  • queue_expires – the expiry time for standard queues in milliseconds. This is the time after which, if there are no subscribers, a queue will automatically be deleted by RabbitMQ.

  • task_exchange – The name of the RMQ task exchange to use

  • task_queue – The name of the task queue to use

  • task_prefetch_count – the number of tasks this communicator can fetch simultaneously

  • task_prefetch_size – the total size of the messages that the default queue can fetch simultaneously

  • encoder – The encoder to call for encoding a message

  • decoder – The decoder to call for decoding a message

  • testing_mode – Run in testing mode: all queues and exchanges will be temporary

kiwipy.rmq.connect(connection_params: str | dict | None = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqThreadCommunicator[source]

Establish a RabbitMQ communicator connection

Submodules