kiwipy.rmq.threadcomms module

class kiwipy.rmq.threadcomms.RmqThreadCommunicator(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange='kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]

Bases: Communicator

RabbitMQ communicator that runs an event loop on a separate thread to do communication. This also means that heartbeats are not missed and the main program is free to block for as long as it wants.

TASK_TIMEOUT = 5.0
_communicator: RmqCommunicator
_ensure_open()[source]
_wrap_future(kiwi_future: Future)[source]
_wrap_subscriber(subscriber)[source]

” We need to convert any kiwipy.Futures we get from a subscriber call into asyncio ones for the event loop based communicator. Do this by wrapping any subscriber methods and intercepting the return values.

add_broadcast_subscriber(subscriber, identifier=None)[source]

Add a broadcast subscriber that will receive all broadcast messages

Parameters
  • subscriber – the subscriber function to be called

  • identifier – an optional identifier for the subscriber

Returns

an identifier for the subscriber and can be subsequently used to remove it

add_close_callback(callback: Callable[[AbstractConnection, Optional[BaseException]], Any], weak: bool = False) None[source]

Add a callable to be called each time (after) the connection is closed.

Parameters

weak – If True, the callback will be added to a WeakSet

add_rpc_subscriber(subscriber, identifier=None)[source]

Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.

add_task_subscriber(subscriber, identifier=None)[source]

Add a task subscriber to the communicator’s default queue. Returns the identifier.

Parameters
  • subscriber – The task callback function

  • identifier – the subscriber identifier

broadcast_send(body, sender=None, subject=None, correlation_id=None)[source]

Broadcast a message to all subscribers

close()[source]

Close down the communicator and clean up all resources.

After this call it cannot be used again.

classmethod connect(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]
is_closed() bool[source]

Return True if the communicator was closed

loop()[source]
remove_broadcast_subscriber(identifier)[source]

Remove a broadcast subscriber

Parameters

identifier – the identifier of the subscriber to remove

remove_rpc_subscriber(identifier)[source]

Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.

Parameters

identifier – The RPC subscriber identifier

remove_task_subscriber(identifier)[source]

Remove a task subscriber from the communicator’s default queue.

Parameters

identifier – the subscriber to remove

Raises

ValueError if identifier does not correspond to a known subscriber

rpc_send(recipient_id, msg)[source]

Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.

Parameters
  • recipient_id – The recipient identifier

  • msg – The body of the message

Returns

A future corresponding to the outcome of the call

Return type

kiwipy.Future

property server_properties: Dict

A dictionary containing server properties as returned by the RMQ server at connection time.

The details are defined by the RMQ standard and can be found here:

https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties

The protocol states that this dictionary SHOULD contain at least:

‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information

Note

In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.

Returns

the server properties dictionary

start()[source]

Deprecated since version 0.6.0: This will be removed in 0.7.0. This method was deprecated in v0.6.0 and will be removed in 0.7.0. There is no replacement and the class should be reinstantiated if it’s been closed.

stop()[source]

Deprecated since version 0.6.0: This will be removed in 0.7.0. Use close() instead. Unlike stop(), close() is permanent and the class should not be used again after this.

task_queue(queue_name: str, prefetch_size=0, prefetch_count=0)[source]
task_send(task, no_reply=False)[source]

Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.

Parameters
  • task – The task message

  • no_reply (bool) – Do not send a reply containing the result of the task

Returns

A future corresponding to the outcome of the task

class kiwipy.rmq.threadcomms.RmqThreadTaskQueue(task_queue: RmqTaskQueue, loop_scheduler: LoopScheduler, wrap_subscriber)[source]

Bases: object

Thread task queue.

add_task_subscriber(subscriber)[source]
next_task(timeout=5.0, fail=True)[source]
remove_task_subscriber(subscriber)[source]
task_send(task, no_reply=False)[source]
kiwipy.rmq.threadcomms.connect(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqThreadCommunicator[source]

Establish a RabbitMQ communicator connection