kiwipy.rmq package¶
- class kiwipy.rmq.RmqCommunicator(connection: ~aio_pika.connection.Connection, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange: str = 'kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False)[source]¶
Bases:
object
An asynchronous communicator that relies on aio_pika to make a connection to a RabbitMQ server and uses an asyncio event loop for scheduling coroutines and callbacks.
- _connection = None¶
- _default_task_queue: RmqTaskQueue | None = None¶
- _message_publisher = None¶
- _message_subscriber = None¶
- add_close_callback(callback: Callable[[AbstractConnection, BaseException | None], Any], weak: bool = False) None [source]¶
Add a callable to be called each time (after) the connection is closed.
- Parameters:
weak – If True, the callback will be added to a WeakSet
- async get_default_task_queue() RmqTaskQueue [source]¶
Get a default task queue.
If one doesn’t exist it will be created as part of this call.
- async get_message_publisher() RmqPublisher [source]¶
Get a message publisher.
If one doesn’t exist it will be created as part of this call.
- async get_message_subscriber() RmqSubscriber [source]¶
Get the message subscriber.
If one doesn’t exist it will be created as part of this call.
- property loop¶
Get the event loop instance driving this communicator connection.
- async rpc_send(recipient_id, msg)[source]¶
Initiate a remote procedure call on a recipient.
- Parameters:
recipient_id – The recipient identifier
msg – The body of the message
- Returns:
A future corresponding to the outcome of the call
- property server_properties: Dict¶
A dictionary containing server properties as returned by the RMQ server at connection time.
The details are defined by the RMQ standard and can be found here:
https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties
- The protocol states that this dictionary SHOULD contain at least:
‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information
Note
In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.
- Returns:
the server properties dictionary
- async task_queue(queue_name: str, prefetch_size=0, prefetch_count=0) RmqTaskQueue [source]¶
Create a new task queue.
- class kiwipy.rmq.RmqIncomingTask(subscriber: RmqTaskSubscriber, message: IncomingMessage)[source]¶
Bases:
object
- class kiwipy.rmq.RmqTaskPublisher(connection, queue_name='kiwipy.tasks', exchange_name='kiwipy.messages', exchange_params=None, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), confirm_deliveries=True, testing_mode=False)[source]¶
Bases:
BasePublisherWithReplyQueue
Publishes messages to the RMQ task queue and gets the response
- async task_send(task, no_reply: bool = False) Future [source]¶
Send a task for processing by a task subscriber.
All task messages will be set to be persistent by setting delivery_mode=2.
- Parameters:
task – The task payload
no_reply – Don’t send a reply containing the result of the task
- Returns:
A future representing the result of the task
- class kiwipy.rmq.RmqTaskQueue(connection, exchange_name='kiwipy.messages', queue_name='kiwipy.tasks', decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0, testing_mode=False)[source]¶
Bases:
object
Combines a task publisher and subscriber to create a work queue where you can do both
- class kiwipy.rmq.RmqTaskSubscriber(connection: ~aio_pika.connection.Connection, exchange_name: str = 'kiwipy.messages', queue_name: str = 'kiwipy.tasks', testing_mode=False, decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0)[source]¶
Bases:
BaseConnectionWithExchange
Listens for tasks coming in on the RMQ task queue
- TASK_QUEUE_ARGUMENTS = {'x-message-ttl': 604800000}¶
- _build_response_message(body, incoming_message)[source]¶
Create a aio_pika Message as a response to a task being deal with.
- Parameters:
body (dict) – The message body dictionary
incoming_message (
aio_pika.IncomingMessage
) – The original message we are responding to
- Returns:
The response message
- Return type:
- async _on_task(message: IncomingMessage)[source]¶
- Parameters:
message – The aio_pika RMQ message
- class kiwipy.rmq.RmqThreadCommunicator(connection_params: str | dict | None = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange='kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]¶
Bases:
Communicator
RabbitMQ communicator that runs an event loop on a separate thread to do communication. This also means that heartbeats are not missed and the main program is free to block for as long as it wants.
- TASK_TIMEOUT = 5.0¶
- _wrap_subscriber(subscriber)[source]¶
” We need to convert any kiwipy.Futures we get from a subscriber call into asyncio ones for the event loop based communicator. Do this by wrapping any subscriber methods and intercepting the return values.
- add_broadcast_subscriber(subscriber, identifier=None)[source]¶
Add a broadcast subscriber that will receive all broadcast messages
- Parameters:
subscriber – the subscriber function to be called
identifier – an optional identifier for the subscriber
- Returns:
an identifier for the subscriber and can be subsequently used to remove it
- add_close_callback(callback: Callable[[AbstractConnection, BaseException | None], Any], weak: bool = False) None [source]¶
Add a callable to be called each time (after) the connection is closed.
- Parameters:
weak – If True, the callback will be added to a WeakSet
- add_rpc_subscriber(subscriber, identifier=None)[source]¶
Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.
- add_task_subscriber(subscriber, identifier=None)[source]¶
Add a task subscriber to the communicator’s default queue. Returns the identifier.
- Parameters:
subscriber – The task callback function
identifier – the subscriber identifier
- broadcast_send(body, sender=None, subject=None, correlation_id=None)[source]¶
Broadcast a message to all subscribers
- close()[source]¶
Close down the communicator and clean up all resources.
After this call it cannot be used again.
- classmethod connect(connection_params: str | dict | None = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]¶
- remove_broadcast_subscriber(identifier)[source]¶
Remove a broadcast subscriber
- Parameters:
identifier – the identifier of the subscriber to remove
- remove_rpc_subscriber(identifier)[source]¶
Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.
- Parameters:
identifier – The RPC subscriber identifier
- remove_task_subscriber(identifier)[source]¶
Remove a task subscriber from the communicator’s default queue.
- Parameters:
identifier – the subscriber to remove
- Raises:
ValueError if identifier does not correspond to a known subscriber
- rpc_send(recipient_id, msg)[source]¶
Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.
- Parameters:
recipient_id – The recipient identifier
msg – The body of the message
- Returns:
A future corresponding to the outcome of the call
- Return type:
kiwipy.Future
- property server_properties: Dict¶
A dictionary containing server properties as returned by the RMQ server at connection time.
The details are defined by the RMQ standard and can be found here:
https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties
- The protocol states that this dictionary SHOULD contain at least:
‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information
Note
In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.
- Returns:
the server properties dictionary
- task_send(task, no_reply=False)[source]¶
Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.
- Parameters:
task – The task message
no_reply (bool) – Do not send a reply containing the result of the task
- Returns:
A future corresponding to the outcome of the task
- class kiwipy.rmq.RmqThreadTaskQueue(task_queue: RmqTaskQueue, loop_scheduler: LoopScheduler, wrap_subscriber)[source]¶
Bases:
object
Thread task queue.
- async kiwipy.rmq.async_connect(connection_params: str | dict | None = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange: str = 'kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqCommunicator [source]¶
Convenience method that returns a connected communicator.
- Parameters:
connection_params – parameters that will be passed to the connection factory to create the connection
connection_factory – the factory method to open the aio_pika connection with
message_exchange – The name of the RMQ message exchange to use
queue_expires – the expiry time for standard queues in milliseconds. This is the time after which, if there are no subscribers, a queue will automatically be deleted by RabbitMQ.
task_exchange – The name of the RMQ task exchange to use
task_queue – The name of the task queue to use
task_prefetch_count – the number of tasks this communicator can fetch simultaneously
task_prefetch_size – the total size of the messages that the default queue can fetch simultaneously
encoder – The encoder to call for encoding a message
decoder – The decoder to call for decoding a message
testing_mode – Run in testing mode: all queues and exchanges will be temporary
- kiwipy.rmq.connect(connection_params: str | dict | None = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqThreadCommunicator [source]¶
Establish a RabbitMQ communicator connection
Submodules¶
- kiwipy.rmq.communicator module
RmqCommunicator
RmqCommunicator._connection
RmqCommunicator._default_task_queue
RmqCommunicator._ensure_connected()
RmqCommunicator._message_publisher
RmqCommunicator._message_subscriber
RmqCommunicator.add_broadcast_subscriber()
RmqCommunicator.add_close_callback()
RmqCommunicator.add_rpc_subscriber()
RmqCommunicator.add_task_subscriber()
RmqCommunicator.broadcast_send()
RmqCommunicator.connect()
RmqCommunicator.connected()
RmqCommunicator.disconnect()
RmqCommunicator.get_default_task_queue()
RmqCommunicator.get_message_publisher()
RmqCommunicator.get_message_subscriber()
RmqCommunicator.loop
RmqCommunicator.remove_broadcast_subscriber()
RmqCommunicator.remove_rpc_subscriber()
RmqCommunicator.remove_task_subscriber()
RmqCommunicator.rpc_send()
RmqCommunicator.server_properties
RmqCommunicator.task_queue()
RmqCommunicator.task_send()
async_connect()
- kiwipy.rmq.configure module
- kiwipy.rmq.defaults module
- kiwipy.rmq.messages module
BaseConnectionWithExchange
BasePublisherWithReplyQueue
BasePublisherWithReplyQueue.DEFAULT_EXCHANGE_PARAMS
BasePublisherWithReplyQueue._on_channel_close()
BasePublisherWithReplyQueue._on_response()
BasePublisherWithReplyQueue.action_message()
BasePublisherWithReplyQueue.channel()
BasePublisherWithReplyQueue.connect()
BasePublisherWithReplyQueue.disconnect()
BasePublisherWithReplyQueue.get_exchange_name()
BasePublisherWithReplyQueue.is_closing
BasePublisherWithReplyQueue.is_connected
BasePublisherWithReplyQueue.publish()
BasePublisherWithReplyQueue.publish_expect_response()
BroadcastMessage
- kiwipy.rmq.tasks module
RmqIncomingTask
RmqTaskPublisher
RmqTaskQueue
RmqTaskSubscriber
RmqTaskSubscriber.TASK_QUEUE_ARGUMENTS
RmqTaskSubscriber._build_response_message()
RmqTaskSubscriber._channel
RmqTaskSubscriber._create_task_queue()
RmqTaskSubscriber._exchange
RmqTaskSubscriber._on_task()
RmqTaskSubscriber._send_response()
RmqTaskSubscriber._task_queue
RmqTaskSubscriber.add_task_subscriber()
RmqTaskSubscriber.connect()
RmqTaskSubscriber.next_task()
RmqTaskSubscriber.remove_task_subscriber()
- kiwipy.rmq.threadcomms module
RmqThreadCommunicator
RmqThreadCommunicator.TASK_TIMEOUT
RmqThreadCommunicator._communicator
RmqThreadCommunicator._ensure_open()
RmqThreadCommunicator._wrap_future()
RmqThreadCommunicator._wrap_subscriber()
RmqThreadCommunicator.add_broadcast_subscriber()
RmqThreadCommunicator.add_close_callback()
RmqThreadCommunicator.add_rpc_subscriber()
RmqThreadCommunicator.add_task_subscriber()
RmqThreadCommunicator.broadcast_send()
RmqThreadCommunicator.close()
RmqThreadCommunicator.connect()
RmqThreadCommunicator.is_closed()
RmqThreadCommunicator.loop()
RmqThreadCommunicator.remove_broadcast_subscriber()
RmqThreadCommunicator.remove_rpc_subscriber()
RmqThreadCommunicator.remove_task_subscriber()
RmqThreadCommunicator.rpc_send()
RmqThreadCommunicator.server_properties
RmqThreadCommunicator.task_queue()
RmqThreadCommunicator.task_send()
RmqThreadTaskQueue
connect()
- kiwipy.rmq.utils module