kiwipy package¶
Robust, high-volume, message based communication made easy.
- class kiwipy.BroadcastFilter(subscriber, subject=None, sender=None)[source]¶
Bases:
objectA filter that can be used to limit the subjects and/or senders that will be received
- class kiwipy.Communicator[source]¶
Bases:
objectThe interface for a communicator used to both send and receive various types of message.
- abstract add_broadcast_subscriber(subscriber: Callable[[Communicator, Any, Any, Any, Any], Any], identifier=None) Any[source]¶
Add a broadcast subscriber that will receive all broadcast messages
- Parameters:
subscriber – the subscriber function to be called
identifier – an optional identifier for the subscriber
- Returns:
an identifier for the subscriber and can be subsequently used to remove it
- abstract add_rpc_subscriber(subscriber: Callable[[Communicator, Any], Any], identifier=None) Any[source]¶
Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.
- abstract add_task_subscriber(subscriber: Callable[[Communicator, Any], Any], identifier=None) Any[source]¶
Add a task subscriber to the communicator’s default queue. Returns the identifier.
- Parameters:
subscriber – The task callback function
identifier – the subscriber identifier
- abstract broadcast_send(body, sender=None, subject=None, correlation_id=None) bool[source]¶
Broadcast a message to all subscribers
- abstract close()[source]¶
Close a communicator, free up all resources and do not allow any further operations
- abstract remove_broadcast_subscriber(identifier)[source]¶
Remove a broadcast subscriber
- Parameters:
identifier – the identifier of the subscriber to remove
- abstract remove_rpc_subscriber(identifier)[source]¶
Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.
- Parameters:
identifier – The RPC subscriber identifier
- abstract remove_task_subscriber(identifier)[source]¶
Remove a task subscriber from the communicator’s default queue.
- Parameters:
identifier – the subscriber to remove
- Raises:
ValueError if identifier does not correspond to a known subscriber
- abstract rpc_send(recipient_id, msg)[source]¶
Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.
- Parameters:
recipient_id – The recipient identifier
msg – The body of the message
- Returns:
A future corresponding to the outcome of the call
- Return type:
kiwipy.Future
- abstract task_send(task, no_reply=False) Future[source]¶
Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.
- Parameters:
task – The task message
no_reply (bool) – Do not send a reply containing the result of the task
- Returns:
A future corresponding to the outcome of the task
- exception kiwipy.CommunicatorClosed[source]¶
Bases:
ExceptionRaised when an operation is attempted on a closed communicator
- class kiwipy.CommunicatorHelper[source]¶
Bases:
Communicator- add_broadcast_subscriber(subscriber: Callable[[Communicator, Any, Any, Any, Any], Any], identifier=None) Any[source]¶
Add a broadcast subscriber that will receive all broadcast messages
- Parameters:
subscriber – the subscriber function to be called
identifier – an optional identifier for the subscriber
- Returns:
an identifier for the subscriber and can be subsequently used to remove it
- add_rpc_subscriber(subscriber, identifier=None) Any[source]¶
Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.
- add_task_subscriber(subscriber, identifier=None)[source]¶
Register a task subscriber
- Parameters:
subscriber – The task callback function
identifier – the subscriber identifier
- close()[source]¶
Close a communicator, free up all resources and do not allow any further operations
- remove_broadcast_subscriber(identifier)[source]¶
Remove a broadcast subscriber
- Parameters:
identifier – the identifier of the subscriber to remove
- exception kiwipy.DuplicateSubscriberIdentifier[source]¶
Bases:
ExceptionFailed to add a subscriber because the identifier supplied is already in use
- exception kiwipy.InvalidStateError[source]¶
Bases:
ExceptionRaise when an object is in an invalid state to perform the desired action
- class kiwipy.LocalCommunicator[source]¶
Bases:
CommunicatorHelper- broadcast_send(body, sender=None, subject=None, correlation_id=None)[source]¶
Broadcast a message to all subscribers
- rpc_send(recipient_id, msg)[source]¶
Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.
- Parameters:
recipient_id – The recipient identifier
msg – The body of the message
- Returns:
A future corresponding to the outcome of the call
- Return type:
kiwipy.Future
- task_send(task, no_reply=False)[source]¶
Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.
- Parameters:
task – The task message
no_reply (bool) – Do not send a reply containing the result of the task
- Returns:
A future corresponding to the outcome of the task
- exception kiwipy.QueueEmpty[source]¶
Bases:
ExceptionCould not get the next message from the queue because it is empty
- exception kiwipy.RemoteException[source]¶
Bases:
ExceptionAn exception occurred at the remote end of the call
- exception kiwipy.UnroutableError[source]¶
Bases:
DeliveryFailedThe messages was unroutable
- kiwipy.capture_exceptions(future, ignore=())[source]¶
Capture any exceptions in the context and set them as the result of the given future
- Parameters:
future (
kiwipy.Future) – The future to the exception onignore – An optional list of exception types to ignore, these will be raised and not set on the future
- kiwipy.chain(source, target)[source]¶
Chain two futures together so that when one completes, so does the other.
The result (success or failure) of
awill be copied tob, unlessbhas already been completed or cancelled by the timeafinishes.
- kiwipy.connect(uri: str = 'amqp://guest:guest@127.0.0.1/', **kwargs)[source]¶
Create a connection using a URI
- kiwipy.copy_future(source, target)[source]¶
Copy the status of future a to b unless b is already done in which case return
- Parameters:
source (
kiwipy.Future) – The source futuretarget (
kiwipy.Future) – The target future
Subpackages¶
- kiwipy.rmq package
RmqCommunicatorRmqCommunicator._connectionRmqCommunicator._default_task_queueRmqCommunicator._ensure_connected()RmqCommunicator._message_publisherRmqCommunicator._message_subscriberRmqCommunicator.add_broadcast_subscriber()RmqCommunicator.add_close_callback()RmqCommunicator.add_rpc_subscriber()RmqCommunicator.add_task_subscriber()RmqCommunicator.broadcast_send()RmqCommunicator.connect()RmqCommunicator.connected()RmqCommunicator.disconnect()RmqCommunicator.get_default_task_queue()RmqCommunicator.get_message_publisher()RmqCommunicator.get_message_subscriber()RmqCommunicator.loopRmqCommunicator.remove_broadcast_subscriber()RmqCommunicator.remove_rpc_subscriber()RmqCommunicator.remove_task_subscriber()RmqCommunicator.rpc_send()RmqCommunicator.server_propertiesRmqCommunicator.task_queue()RmqCommunicator.task_send()
RmqIncomingTaskRmqTaskPublisherRmqTaskQueueRmqTaskSubscriberRmqTaskSubscriber.TASK_QUEUE_ARGUMENTSRmqTaskSubscriber._build_response_message()RmqTaskSubscriber._create_task_queue()RmqTaskSubscriber._on_task()RmqTaskSubscriber._send_response()RmqTaskSubscriber.add_task_subscriber()RmqTaskSubscriber.connect()RmqTaskSubscriber.next_task()RmqTaskSubscriber.remove_task_subscriber()
RmqThreadCommunicatorRmqThreadCommunicator.TASK_TIMEOUTRmqThreadCommunicator._ensure_open()RmqThreadCommunicator._wrap_future()RmqThreadCommunicator._wrap_subscriber()RmqThreadCommunicator.add_broadcast_subscriber()RmqThreadCommunicator.add_close_callback()RmqThreadCommunicator.add_rpc_subscriber()RmqThreadCommunicator.add_task_subscriber()RmqThreadCommunicator.broadcast_send()RmqThreadCommunicator.close()RmqThreadCommunicator.connect()RmqThreadCommunicator.is_closed()RmqThreadCommunicator.loop()RmqThreadCommunicator.remove_broadcast_subscriber()RmqThreadCommunicator.remove_rpc_subscriber()RmqThreadCommunicator.remove_task_subscriber()RmqThreadCommunicator.rpc_send()RmqThreadCommunicator.server_propertiesRmqThreadCommunicator.task_queue()RmqThreadCommunicator.task_send()
RmqThreadTaskQueueasync_connect()connect()- Submodules
Submodules¶
- kiwipy.communicate module
- kiwipy.communications module
CommunicatorCommunicator.add_broadcast_subscriber()Communicator.add_rpc_subscriber()Communicator.add_task_subscriber()Communicator.broadcast_send()Communicator.close()Communicator.is_closed()Communicator.remove_broadcast_subscriber()Communicator.remove_rpc_subscriber()Communicator.remove_task_subscriber()Communicator.rpc_send()Communicator.task_send()
CommunicatorHelperCommunicatorHelper._ensure_open()CommunicatorHelper.add_broadcast_subscriber()CommunicatorHelper.add_rpc_subscriber()CommunicatorHelper.add_task_subscriber()CommunicatorHelper.close()CommunicatorHelper.fire_broadcast()CommunicatorHelper.fire_rpc()CommunicatorHelper.fire_task()CommunicatorHelper.is_closed()CommunicatorHelper.remove_broadcast_subscriber()CommunicatorHelper.remove_rpc_subscriber()CommunicatorHelper.remove_task_subscriber()
- kiwipy.exceptions module
- kiwipy.filters module
- kiwipy.futures module
- kiwipy.local module
- kiwipy.utils module