kiwipy.rmq package

class kiwipy.rmq.RmqCommunicator(connection: ~aio_pika.connection.Connection, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange: str = 'kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False)[source]

Bases: object

An asynchronous communicator that relies on aio_pika to make a connection to a RabbitMQ server and uses an asyncio event loop for scheduling coroutines and callbacks.

_connection = None
_default_task_queue: Optional[RmqTaskQueue] = None
_ensure_connected()[source]
_message_publisher = None
_message_subscriber = None
async add_broadcast_subscriber(subscriber, identifier=None)[source]
add_close_callback(callback: Callable[[AbstractConnection, Optional[BaseException]], Any], weak: bool = False) None[source]

Add a callable to be called each time (after) the connection is closed.

Parameters

weak – If True, the callback will be added to a WeakSet

async add_rpc_subscriber(subscriber, identifier=None)[source]
async add_task_subscriber(subscriber, identifier=None)[source]
async broadcast_send(body, sender=None, subject=None, correlation_id=None)[source]
async connect()[source]

Establish a connection if not already connected.

connected() bool[source]
async disconnect()[source]

Disconnect from the connection if connected.

async get_default_task_queue() RmqTaskQueue[source]

Get a default task queue.

If one doesn’t exist it will be created as part of this call.

async get_message_publisher() RmqPublisher[source]

Get a message publisher.

If one doesn’t exist it will be created as part of this call.

async get_message_subscriber() RmqSubscriber[source]

Get the message subscriber.

If one doesn’t exist it will be created as part of this call.

property loop

Get the event loop instance driving this communicator connection.

async remove_broadcast_subscriber(identifier)[source]
async remove_rpc_subscriber(identifier)[source]
async remove_task_subscriber(identifier)[source]
async rpc_send(recipient_id, msg)[source]

Initiate a remote procedure call on a recipient.

Parameters
  • recipient_id – The recipient identifier

  • msg – The body of the message

Returns

A future corresponding to the outcome of the call

property server_properties: Dict

A dictionary containing server properties as returned by the RMQ server at connection time.

The details are defined by the RMQ standard and can be found here:

https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties

The protocol states that this dictionary SHOULD contain at least:

‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information

Note

In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.

Returns

the server properties dictionary

async task_queue(queue_name: str, prefetch_size=0, prefetch_count=0) RmqTaskQueue[source]

Create a new task queue.

async task_send(task, no_reply=False)[source]
class kiwipy.rmq.RmqIncomingTask(subscriber: RmqTaskSubscriber, message: IncomingMessage)[source]

Bases: object

_finalise()[source]
_on_task_done(outcome)[source]

Schedule a task to call _task_done when the outcome is done.

_outcome_destroyed(outcome_ref)[source]
async _task_done(outcome: Future)[source]
property body: str
property no_reply: bool
process() Future[source]
processing() Generator[Future, None, None][source]

Processing context. The task should be done at the end otherwise it’s assumed the caller doesn’t want to process it, and it’s sent back to the queue

async requeue()[source]
property state: str
class kiwipy.rmq.RmqTaskPublisher(connection, queue_name='kiwipy.tasks', exchange_name='kiwipy.messages', exchange_params=None, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), confirm_deliveries=True, testing_mode=False)[source]

Bases: BasePublisherWithReplyQueue

Publishes messages to the RMQ task queue and gets the response

async task_send(task, no_reply: bool = False) Future[source]

Send a task for processing by a task subscriber.

All task messages will be set to be persistent by setting delivery_mode=2.

Parameters
  • task – The task payload

  • no_reply – Don’t send a reply containing the result of the task

Returns

A future representing the result of the task

class kiwipy.rmq.RmqTaskQueue(connection, exchange_name='kiwipy.messages', queue_name='kiwipy.tasks', decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0, testing_mode=False)[source]

Bases: object

Combines a task publisher and subscriber to create a work queue where you can do both

async add_task_subscriber(subscriber, identifier=None)[source]
async connect()[source]
async disconnect()[source]
next_task(no_ack=False, fail=True, timeout=5.0)[source]
async remove_task_subscriber(identifier)[source]
async task_send(task, no_reply: bool = False)[source]

Send a task to the queue

class kiwipy.rmq.RmqTaskSubscriber(connection: ~aio_pika.connection.Connection, exchange_name: str = 'kiwipy.messages', queue_name: str = 'kiwipy.tasks', testing_mode=False, decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), encoder=functools.partial(<function dump>, encoding='utf-8'), exchange_params=None, prefetch_size=0, prefetch_count=0)[source]

Bases: BaseConnectionWithExchange

Listens for tasks coming in on the RMQ task queue

TASK_QUEUE_ARGUMENTS = {'x-message-ttl': 604800000}
_build_response_message(body, incoming_message)[source]

Create a aio_pika Message as a response to a task being deal with.

Parameters
Returns

The response message

Return type

aio_pika.Message

async _create_task_queue()[source]

Create and bind the task queue

async _on_task(message: IncomingMessage)[source]
Parameters

message – The aio_pika RMQ message

async _send_response(msg_body, incoming_message)[source]
async add_task_subscriber(subscriber, identifier=None)[source]
async connect()[source]
next_task(no_ack=False, fail=True, timeout=5.0) Generator[RmqIncomingTask, None, None][source]

Get the next task from the queue.

raises:

kiwipy.exceptions.QueueEmpty: When the queue has no tasks within the timeout

async remove_task_subscriber(identifier)[source]
class kiwipy.rmq.RmqThreadCommunicator(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange='kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]

Bases: Communicator

RabbitMQ communicator that runs an event loop on a separate thread to do communication. This also means that heartbeats are not missed and the main program is free to block for as long as it wants.

TASK_TIMEOUT = 5.0
_ensure_open()[source]
_wrap_future(kiwi_future: Future)[source]
_wrap_subscriber(subscriber)[source]

” We need to convert any kiwipy.Futures we get from a subscriber call into asyncio ones for the event loop based communicator. Do this by wrapping any subscriber methods and intercepting the return values.

add_broadcast_subscriber(subscriber, identifier=None)[source]

Add a broadcast subscriber that will receive all broadcast messages

Parameters
  • subscriber – the subscriber function to be called

  • identifier – an optional identifier for the subscriber

Returns

an identifier for the subscriber and can be subsequently used to remove it

add_close_callback(callback: Callable[[AbstractConnection, Optional[BaseException]], Any], weak: bool = False) None[source]

Add a callable to be called each time (after) the connection is closed.

Parameters

weak – If True, the callback will be added to a WeakSet

add_rpc_subscriber(subscriber, identifier=None)[source]

Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.

add_task_subscriber(subscriber, identifier=None)[source]

Add a task subscriber to the communicator’s default queue. Returns the identifier.

Parameters
  • subscriber – The task callback function

  • identifier – the subscriber identifier

broadcast_send(body, sender=None, subject=None, correlation_id=None)[source]

Broadcast a message to all subscribers

close()[source]

Close down the communicator and clean up all resources.

After this call it cannot be used again.

classmethod connect(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]
is_closed() bool[source]

Return True if the communicator was closed

loop()[source]
remove_broadcast_subscriber(identifier)[source]

Remove a broadcast subscriber

Parameters

identifier – the identifier of the subscriber to remove

remove_rpc_subscriber(identifier)[source]

Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.

Parameters

identifier – The RPC subscriber identifier

remove_task_subscriber(identifier)[source]

Remove a task subscriber from the communicator’s default queue.

Parameters

identifier – the subscriber to remove

Raises

ValueError if identifier does not correspond to a known subscriber

rpc_send(recipient_id, msg)[source]

Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.

Parameters
  • recipient_id – The recipient identifier

  • msg – The body of the message

Returns

A future corresponding to the outcome of the call

Return type

kiwipy.Future

property server_properties: Dict

A dictionary containing server properties as returned by the RMQ server at connection time.

The details are defined by the RMQ standard and can be found here:

https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties

The protocol states that this dictionary SHOULD contain at least:

‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information

Note

In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.

Returns

the server properties dictionary

start()[source]

Deprecated since version 0.6.0: This will be removed in 0.7.0. This method was deprecated in v0.6.0 and will be removed in 0.7.0. There is no replacement and the class should be reinstantiated if it’s been closed.

stop()[source]

Deprecated since version 0.6.0: This will be removed in 0.7.0. Use close() instead. Unlike stop(), close() is permanent and the class should not be used again after this.

task_queue(queue_name: str, prefetch_size=0, prefetch_count=0)[source]
task_send(task, no_reply=False)[source]

Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.

Parameters
  • task – The task message

  • no_reply (bool) – Do not send a reply containing the result of the task

Returns

A future corresponding to the outcome of the task

class kiwipy.rmq.RmqThreadTaskQueue(task_queue: RmqTaskQueue, loop_scheduler: LoopScheduler, wrap_subscriber)[source]

Bases: object

Thread task queue.

add_task_subscriber(subscriber)[source]
next_task(timeout=5.0, fail=True)[source]
remove_task_subscriber(subscriber)[source]
task_send(task, no_reply=False)[source]
async kiwipy.rmq.async_connect(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange: str = 'kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqCommunicator[source]

Convenience method that returns a connected communicator.

Parameters
  • connection_params – parameters that will be passed to the connection factory to create the connection

  • connection_factory – the factory method to open the aio_pika connection with

  • message_exchange – The name of the RMQ message exchange to use

  • queue_expires – the expiry time for standard queues in milliseconds. This is the time after which, if there are no subscribers, a queue will automatically be deleted by RabbitMQ.

  • task_exchange – The name of the RMQ task exchange to use

  • task_queue – The name of the task queue to use

  • task_prefetch_count – the number of tasks this communicator can fetch simultaneously

  • task_prefetch_size – the total size of the messages that the default queue can fetch simultaneously

  • encoder – The encoder to call for encoding a message

  • decoder – The decoder to call for decoding a message

  • testing_mode – Run in testing mode: all queues and exchanges will be temporary

kiwipy.rmq.connect(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqThreadCommunicator[source]

Establish a RabbitMQ communicator connection

Submodules