kiwipy.rmq.threadcomms module¶
- class kiwipy.rmq.threadcomms.RmqThreadCommunicator(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange: str = 'kiwipy.messages', queue_expires: int = 60000, task_exchange='kiwipy.tasks', task_queue: str = 'kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]¶
Bases:
Communicator
RabbitMQ communicator that runs an event loop on a separate thread to do communication. This also means that heartbeats are not missed and the main program is free to block for as long as it wants.
- TASK_TIMEOUT = 5.0¶
- _communicator: RmqCommunicator¶
- _wrap_subscriber(subscriber)[source]¶
” We need to convert any kiwipy.Futures we get from a subscriber call into asyncio ones for the event loop based communicator. Do this by wrapping any subscriber methods and intercepting the return values.
- add_broadcast_subscriber(subscriber, identifier=None)[source]¶
Add a broadcast subscriber that will receive all broadcast messages
- Parameters
subscriber – the subscriber function to be called
identifier – an optional identifier for the subscriber
- Returns
an identifier for the subscriber and can be subsequently used to remove it
- add_close_callback(callback: Callable[[AbstractConnection, Optional[BaseException]], Any], weak: bool = False) None [source]¶
Add a callable to be called each time (after) the connection is closed.
- Parameters
weak – If True, the callback will be added to a WeakSet
- add_rpc_subscriber(subscriber, identifier=None)[source]¶
Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.
- add_task_subscriber(subscriber, identifier=None)[source]¶
Add a task subscriber to the communicator’s default queue. Returns the identifier.
- Parameters
subscriber – The task callback function
identifier – the subscriber identifier
- broadcast_send(body, sender=None, subject=None, correlation_id=None)[source]¶
Broadcast a message to all subscribers
- close()[source]¶
Close down the communicator and clean up all resources.
After this call it cannot be used again.
- classmethod connect(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False, async_task_timeout=5.0)[source]¶
- remove_broadcast_subscriber(identifier)[source]¶
Remove a broadcast subscriber
- Parameters
identifier – the identifier of the subscriber to remove
- remove_rpc_subscriber(identifier)[source]¶
Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.
- Parameters
identifier – The RPC subscriber identifier
- remove_task_subscriber(identifier)[source]¶
Remove a task subscriber from the communicator’s default queue.
- Parameters
identifier – the subscriber to remove
- Raises
ValueError if identifier does not correspond to a known subscriber
- rpc_send(recipient_id, msg)[source]¶
Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.
- Parameters
recipient_id – The recipient identifier
msg – The body of the message
- Returns
A future corresponding to the outcome of the call
- Return type
kiwipy.Future
- property server_properties: Dict¶
A dictionary containing server properties as returned by the RMQ server at connection time.
The details are defined by the RMQ standard and can be found here:
https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.start.server-properties
- The protocol states that this dictionary SHOULD contain at least:
‘host’ - specifying the server host name or address ‘product’ - giving the name of the server product ‘version’ - giving the name of the server version ‘platform’ - giving the name of the operating system ‘copyright’ - if appropriate, and, ‘information’ - giving other general information
Note
In testing it seems like ‘host’ is not always returned. Host information may be found in ‘cluster_name’ but clients shouldn’t rely on this.
- Returns
the server properties dictionary
- start()[source]¶
Deprecated since version 0.6.0: This will be removed in 0.7.0. This method was deprecated in v0.6.0 and will be removed in 0.7.0. There is no replacement and the class should be reinstantiated if it’s been closed.
- stop()[source]¶
Deprecated since version 0.6.0: This will be removed in 0.7.0. Use close() instead. Unlike stop(), close() is permanent and the class should not be used again after this.
- task_send(task, no_reply=False)[source]¶
Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.
- Parameters
task – The task message
no_reply (bool) – Do not send a reply containing the result of the task
- Returns
A future corresponding to the outcome of the task
- class kiwipy.rmq.threadcomms.RmqThreadTaskQueue(task_queue: RmqTaskQueue, loop_scheduler: LoopScheduler, wrap_subscriber)[source]¶
Bases:
object
Thread task queue.
- kiwipy.rmq.threadcomms.connect(connection_params: ~typing.Optional[~typing.Union[str, dict]] = None, connection_factory=<function connect_robust>, message_exchange='kiwipy.messages', task_exchange='kiwipy.tasks', task_queue='kiwipy.tasks', task_prefetch_size=0, task_prefetch_count=0, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), testing_mode=False) RmqThreadCommunicator [source]¶
Establish a RabbitMQ communicator connection