kiwipy.rmq package¶
- class kiwipy.rmq.RmqCommunicator(connection: ~aio_pika.connection.Connection, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange: str = 'kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False)[source]¶
Bases:
objectAn asynchronous communicator that relies on aio_pika to make a connection to a RabbitMQ server and uses an asyncio event loop for scheduling coroutines and callbacks.
- _connection = None¶
- _default_task_queue: Optional[RmqTaskQueue] = None¶
- _message_publisher = None¶
- _message_subscriber = None¶
- add_close_callback(callback: Callable[[AbstractConnection, Optional[BaseException]], Any], weak: bool = False) None[source]¶
Add a callable to be called each time (after) the connection is closed.
- Parameters
weak – If True, the callback will be added to a WeakSet
- async get_default_task_queue() RmqTaskQueue[source]¶
Get a default task queue.
If one doesn’t exist it will be created as part of this call.
- async get_message_publisher() RmqPublisher[source]¶
Get a message publisher.
If one doesn’t exist it will be created as part of this call.
- async get_message_subscriber() RmqSubscriber[source]¶
Get the message subscriber.
If one doesn’t exist it will be created as part of this call.
- property loop¶
Get the event loop instance driving this communicator connection.
- async rpc_send(recipient_id, msg)[source]¶
Initiate a remote procedure call on a recipient.
- Parameters
recipient_id – The recipient identifier
msg – The body of the message
- Returns
A future corresponding to the outcome of the call
- property server_properties: Dict¶
A dictionary containing server properties as returned by the RMQ server at connection time.
The details are defined by the RMQ standard and can be found here:
https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties
- The protocol states that this dictionary SHOULD contain at least:
‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information
Note
In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.
- Returns
the server properties dictionary
- async task_queue(queue_name: str, prefetch_size=0, prefetch_count=0) RmqTaskQueue[source]¶
Create a new task queue.
- class kiwipy.rmq.RmqIncomingTask(subscriber: RmqTaskSubscriber, message: IncomingMessage)[source]¶
Bases:
object
- class kiwipy.rmq.RmqTaskPublisher(connection, queue_name='kiwipy.tasks', exchange_name='kiwipy.messages', exchange_params=None, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), confirm_deliveries=True, testing_mode=False)[source]¶
Bases:
BasePublisherWithReplyQueuePublishes messages to the RMQ task queue and gets the response
- async task_send(task, no_reply: bool = False) Future[source]¶
Send a task for processing by a task subscriber.
All task messages will be set to be persistent by setting delivery_mode=2.
- Parameters
task – The task payload
no_reply – Don’t send a reply containing the result of the task
- Returns
A future representing the result of the task
- class kiwipy.rmq.RmqTaskQueue(connection, exchange_name='kiwipy.messages', queue_name='kiwipy.tasks', decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0, testing_mode=False)[source]¶
Bases:
objectCombines a task publisher and subscriber to create a work queue where you can do both
- class kiwipy.rmq.RmqTaskSubscriber(connection: ~aio_pika.connection.Connection, exchange_name: str = 'kiwipy.messages', queue_name: str = 'kiwipy.tasks', testing_mode=False, decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0)[source]¶
Bases:
BaseConnectionWithExchangeListens for tasks coming in on the RMQ task queue
- TASK_QUEUE_ARGUMENTS = {'x-message-ttl': 604800000}¶
- _build_response_message(body, incoming_message)[source]¶
Create a aio_pika Message as a response to a task being deal with.
- Parameters
body (dict) – The message body dictionary
incoming_message (
aio_pika.IncomingMessage) – The original message we are responding to
- Returns
The response message
- Return type
- async _on_task(message: IncomingMessage)[source]¶
- Parameters
message – The aio_pika RMQ message
- class kiwipy.rmq.RmqThreadCommunicator(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange='kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]¶
Bases:
CommunicatorRabbitMQ communicator that runs an event loop on a separate thread to do communication. This also means that heartbeats are not missed and the main program is free to block for as long as it wants.
- TASK_TIMEOUT = 5.0¶
- _wrap_subscriber(subscriber)[source]¶
” We need to convert any kiwipy.Futures we get from a subscriber call into asyncio ones for the event loop based communicator. Do this by wrapping any subscriber methods and intercepting the return values.
- add_broadcast_subscriber(subscriber, identifier=None)[source]¶
Add a broadcast subscriber that will receive all broadcast messages
- Parameters
subscriber – the subscriber function to be called
identifier – an optional identifier for the subscriber
- Returns
an identifier for the subscriber and can be subsequently used to remove it
- add_close_callback(callback: Callable[[AbstractConnection, Optional[BaseException]], Any], weak: bool = False) None[source]¶
Add a callable to be called each time (after) the connection is closed.
- Parameters
weak – If True, the callback will be added to a WeakSet
- add_rpc_subscriber(subscriber, identifier=None)[source]¶
Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.
- add_task_subscriber(subscriber, identifier=None)[source]¶
Add a task subscriber to the communicator’s default queue. Returns the identifier.
- Parameters
subscriber – The task callback function
identifier – the subscriber identifier
- broadcast_send(body, sender=None, subject=None, correlation_id=None)[source]¶
Broadcast a message to all subscribers
- close()[source]¶
Close down the communicator and clean up all resources.
After this call it cannot be used again.
- classmethod connect(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]¶
- remove_broadcast_subscriber(identifier)[source]¶
Remove a broadcast subscriber
- Parameters
identifier – the identifier of the subscriber to remove
- remove_rpc_subscriber(identifier)[source]¶
Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.
- Parameters
identifier – The RPC subscriber identifier
- remove_task_subscriber(identifier)[source]¶
Remove a task subscriber from the communicator’s default queue.
- Parameters
identifier – the subscriber to remove
- Raises
ValueError if identifier does not correspond to a known subscriber
- rpc_send(recipient_id, msg)[source]¶
Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.
- Parameters
recipient_id – The recipient identifier
msg – The body of the message
- Returns
A future corresponding to the outcome of the call
- Return type
kiwipy.Future
- property server_properties: Dict¶
A dictionary containing server properties as returned by the RMQ server at connection time.
The details are defined by the RMQ standard and can be found here:
https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties
- The protocol states that this dictionary SHOULD contain at least:
‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information
Note
In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.
- Returns
the server properties dictionary
- start()[source]¶
Deprecated since version 0.6.0: This will be removed in 0.7.0. This method was deprecated in v0.6.0 and will be removed in 0.7.0. There is no replacement and the class should be reinstantiated if it’s been closed.
- stop()[source]¶
Deprecated since version 0.6.0: This will be removed in 0.7.0. Use close() instead. Unlike stop(), close() is permanent and the class should not be used again after this.
- task_send(task, no_reply=False)[source]¶
Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.
- Parameters
task – The task message
no_reply (bool) – Do not send a reply containing the result of the task
- Returns
A future corresponding to the outcome of the task
- class kiwipy.rmq.RmqThreadTaskQueue(task_queue: RmqTaskQueue, loop_scheduler: LoopScheduler, wrap_subscriber)[source]¶
Bases:
objectThread task queue.
- async kiwipy.rmq.async_connect(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange: str = 'kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqCommunicator[source]¶
Convenience method that returns a connected communicator.
- Parameters
connection_params – parameters that will be passed to the connection factory to create the connection
connection_factory – the factory method to open the aio_pika connection with
message_exchange – The name of the RMQ message exchange to use
queue_expires – the expiry time for standard queues in milliseconds. This is the time after which, if there are no subscribers, a queue will automatically be deleted by RabbitMQ.
task_exchange – The name of the RMQ task exchange to use
task_queue – The name of the task queue to use
task_prefetch_count – the number of tasks this communicator can fetch simultaneously
task_prefetch_size – the total size of the messages that the default queue can fetch simultaneously
encoder – The encoder to call for encoding a message
decoder – The decoder to call for decoding a message
testing_mode – Run in testing mode: all queues and exchanges will be temporary
- kiwipy.rmq.connect(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqThreadCommunicator[source]¶
Establish a RabbitMQ communicator connection
Submodules¶
- kiwipy.rmq.communicator module
RmqCommunicatorRmqCommunicator._connectionRmqCommunicator._default_task_queueRmqCommunicator._ensure_connected()RmqCommunicator._message_publisherRmqCommunicator._message_subscriberRmqCommunicator.add_broadcast_subscriber()RmqCommunicator.add_close_callback()RmqCommunicator.add_rpc_subscriber()RmqCommunicator.add_task_subscriber()RmqCommunicator.broadcast_send()RmqCommunicator.connect()RmqCommunicator.connected()RmqCommunicator.disconnect()RmqCommunicator.get_default_task_queue()RmqCommunicator.get_message_publisher()RmqCommunicator.get_message_subscriber()RmqCommunicator.loopRmqCommunicator.remove_broadcast_subscriber()RmqCommunicator.remove_rpc_subscriber()RmqCommunicator.remove_task_subscriber()RmqCommunicator.rpc_send()RmqCommunicator.server_propertiesRmqCommunicator.task_queue()RmqCommunicator.task_send()
async_connect()
- kiwipy.rmq.configure module
- kiwipy.rmq.defaults module
- kiwipy.rmq.messages module
BaseConnectionWithExchangeBasePublisherWithReplyQueueBasePublisherWithReplyQueue.DEFAULT_EXCHANGE_PARAMSBasePublisherWithReplyQueue._on_channel_close()BasePublisherWithReplyQueue._on_response()BasePublisherWithReplyQueue.action_message()BasePublisherWithReplyQueue.channel()BasePublisherWithReplyQueue.connect()BasePublisherWithReplyQueue.disconnect()BasePublisherWithReplyQueue.get_exchange_name()BasePublisherWithReplyQueue.is_closingBasePublisherWithReplyQueue.is_connectedBasePublisherWithReplyQueue.publish()BasePublisherWithReplyQueue.publish_expect_response()
BroadcastMessage
- kiwipy.rmq.tasks module
RmqIncomingTaskRmqTaskPublisherRmqTaskQueueRmqTaskSubscriberRmqTaskSubscriber.TASK_QUEUE_ARGUMENTSRmqTaskSubscriber._build_response_message()RmqTaskSubscriber._channelRmqTaskSubscriber._create_task_queue()RmqTaskSubscriber._exchangeRmqTaskSubscriber._on_task()RmqTaskSubscriber._send_response()RmqTaskSubscriber._task_queueRmqTaskSubscriber.add_task_subscriber()RmqTaskSubscriber.connect()RmqTaskSubscriber.next_task()RmqTaskSubscriber.remove_task_subscriber()
- kiwipy.rmq.threadcomms module
RmqThreadCommunicatorRmqThreadCommunicator.TASK_TIMEOUTRmqThreadCommunicator._communicatorRmqThreadCommunicator._ensure_open()RmqThreadCommunicator._wrap_future()RmqThreadCommunicator._wrap_subscriber()RmqThreadCommunicator.add_broadcast_subscriber()RmqThreadCommunicator.add_close_callback()RmqThreadCommunicator.add_rpc_subscriber()RmqThreadCommunicator.add_task_subscriber()RmqThreadCommunicator.broadcast_send()RmqThreadCommunicator.close()RmqThreadCommunicator.connect()RmqThreadCommunicator.is_closed()RmqThreadCommunicator.loop()RmqThreadCommunicator.remove_broadcast_subscriber()RmqThreadCommunicator.remove_rpc_subscriber()RmqThreadCommunicator.remove_task_subscriber()RmqThreadCommunicator.rpc_send()RmqThreadCommunicator.server_propertiesRmqThreadCommunicator.start()RmqThreadCommunicator.stop()RmqThreadCommunicator.task_queue()RmqThreadCommunicator.task_send()
RmqThreadTaskQueueconnect()
- kiwipy.rmq.utils module