kiwipy.rmq.communicator module¶
- class kiwipy.rmq.communicator.RmqCommunicator(connection: ~aio_pika.connection.Connection, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange: str = 'kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False)[source]¶
Bases:
object
An asynchronous communicator that relies on aio_pika to make a connection to a RabbitMQ server and uses an asyncio event loop for scheduling coroutines and callbacks.
- _connection = None¶
- _default_task_queue: RmqTaskQueue | None = None¶
- _message_publisher = None¶
- _message_subscriber = None¶
- add_close_callback(callback: Callable[[AbstractConnection, BaseException | None], Any], weak: bool = False) None [source]¶
Add a callable to be called each time (after) the connection is closed.
- Parameters:
weak – If True, the callback will be added to a WeakSet
- async get_default_task_queue() RmqTaskQueue [source]¶
Get a default task queue.
If one doesn’t exist it will be created as part of this call.
- async get_message_publisher() RmqPublisher [source]¶
Get a message publisher.
If one doesn’t exist it will be created as part of this call.
- async get_message_subscriber() RmqSubscriber [source]¶
Get the message subscriber.
If one doesn’t exist it will be created as part of this call.
- property loop¶
Get the event loop instance driving this communicator connection.
- async rpc_send(recipient_id, msg)[source]¶
Initiate a remote procedure call on a recipient.
- Parameters:
recipient_id – The recipient identifier
msg – The body of the message
- Returns:
A future corresponding to the outcome of the call
- property server_properties: Dict¶
A dictionary containing server properties as returned by the RMQ server at connection time.
The details are defined by the RMQ standard and can be found here:
https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties
- The protocol states that this dictionary SHOULD contain at least:
‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information
Note
In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.
- Returns:
the server properties dictionary
- async task_queue(queue_name: str, prefetch_size=0, prefetch_count=0) RmqTaskQueue [source]¶
Create a new task queue.
- async kiwipy.rmq.communicator.async_connect(connection_params: str | dict | None = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange: str = 'kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqCommunicator [source]¶
Convenience method that returns a connected communicator.
- Parameters:
connection_params – parameters that will be passed to the connection factory to create the connection
connection_factory – the factory method to open the aio_pika connection with
message_exchange – The name of the RMQ message exchange to use
queue_expires – the expiry time for standard queues in milliseconds. This is the time after which, if there are no subscribers, a queue will automatically be deleted by RabbitMQ.
task_exchange – The name of the RMQ task exchange to use
task_queue – The name of the task queue to use
task_prefetch_count – the number of tasks this communicator can fetch simultaneously
task_prefetch_size – the total size of the messages that the default queue can fetch simultaneously
encoder – The encoder to call for encoding a message
decoder – The decoder to call for decoding a message
testing_mode – Run in testing mode: all queues and exchanges will be temporary