kiwipy package

Robust, high-volume, message based communication made easy.

class kiwipy.BroadcastFilter(subscriber, subject=None, sender=None)[source]

Bases: object

A filter that can be used to limit the subjects and/or senders that will be received

classmethod _ensure_filter(filter_value)[source]
classmethod _make_regex(filter_str)[source]
Parameters:

filter_str (str) – The filter string

Returns:

The regular expression object

add_sender_filter(sender_filter)[source]
add_subject_filter(subject_filter)[source]
is_filtered(sender, subject) bool[source]
class kiwipy.Communicator[source]

Bases: object

The interface for a communicator used to both send and receive various types of message.

abstract add_broadcast_subscriber(subscriber: Callable[[Communicator, Any, Any, Any, Any], Any], identifier=None) Any[source]

Add a broadcast subscriber that will receive all broadcast messages

Parameters:
  • subscriber – the subscriber function to be called

  • identifier – an optional identifier for the subscriber

Returns:

an identifier for the subscriber and can be subsequently used to remove it

abstract add_rpc_subscriber(subscriber: Callable[[Communicator, Any], Any], identifier=None) Any[source]

Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.

abstract add_task_subscriber(subscriber: Callable[[Communicator, Any], Any], identifier=None) Any[source]

Add a task subscriber to the communicator’s default queue. Returns the identifier.

Parameters:
  • subscriber – The task callback function

  • identifier – the subscriber identifier

abstract broadcast_send(body, sender=None, subject=None, correlation_id=None) bool[source]

Broadcast a message to all subscribers

abstract close()[source]

Close a communicator, free up all resources and do not allow any further operations

abstract is_closed() bool[source]

Return True if the communicator was closed

abstract remove_broadcast_subscriber(identifier)[source]

Remove a broadcast subscriber

Parameters:

identifier – the identifier of the subscriber to remove

abstract remove_rpc_subscriber(identifier)[source]

Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.

Parameters:

identifier – The RPC subscriber identifier

abstract remove_task_subscriber(identifier)[source]

Remove a task subscriber from the communicator’s default queue.

Parameters:

identifier – the subscriber to remove

Raises:

ValueError if identifier does not correspond to a known subscriber

abstract rpc_send(recipient_id, msg)[source]

Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.

Parameters:
  • recipient_id – The recipient identifier

  • msg – The body of the message

Returns:

A future corresponding to the outcome of the call

Return type:

kiwipy.Future

abstract task_send(task, no_reply=False) Future[source]

Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.

Parameters:
  • task – The task message

  • no_reply (bool) – Do not send a reply containing the result of the task

Returns:

A future corresponding to the outcome of the task

exception kiwipy.CommunicatorClosed[source]

Bases: Exception

Raised when an operation is attempted on a closed communicator

class kiwipy.CommunicatorHelper[source]

Bases: Communicator

_ensure_open()[source]
add_broadcast_subscriber(subscriber: Callable[[Communicator, Any, Any, Any, Any], Any], identifier=None) Any[source]

Add a broadcast subscriber that will receive all broadcast messages

Parameters:
  • subscriber – the subscriber function to be called

  • identifier – an optional identifier for the subscriber

Returns:

an identifier for the subscriber and can be subsequently used to remove it

add_rpc_subscriber(subscriber, identifier=None) Any[source]

Add an RPC subscriber to the communicator with an optional identifier. If an identifier is not provided the communicator will generate a unique one. In all cases the identifier will be returned.

add_task_subscriber(subscriber, identifier=None)[source]

Register a task subscriber

Parameters:
  • subscriber – The task callback function

  • identifier – the subscriber identifier

close()[source]

Close a communicator, free up all resources and do not allow any further operations

fire_broadcast(body, sender=None, subject=None, correlation_id=None)[source]
fire_rpc(recipient_id, msg)[source]
fire_task(msg, no_reply=False)[source]
is_closed() bool[source]

Return True if the communicator was closed

remove_broadcast_subscriber(identifier)[source]

Remove a broadcast subscriber

Parameters:

identifier – the identifier of the subscriber to remove

remove_rpc_subscriber(identifier)[source]

Remove an RPC subscriber given the identifier. Raises a ValueError if there is no such subscriber.

Parameters:

identifier – The RPC subscriber identifier

remove_task_subscriber(identifier)[source]

Remove a task subscriber

Parameters:

identifier – the subscriber to remove

Raises:

ValueError if identifier does not correspond to a known subscriber

exception kiwipy.DeliveryFailed[source]

Bases: Exception

Failed to deliver a message

exception kiwipy.DuplicateSubscriberIdentifier[source]

Bases: Exception

Failed to add a subscriber because the identifier supplied is already in use

class kiwipy.EventHelper(listener_type)[source]

Bases: object

add_listener(listener)[source]
fire_event(event_function, *args, **kwargs)[source]
property listeners
remove_all_listeners()[source]
remove_listener(listener)[source]
exception kiwipy.InvalidStateError[source]

Bases: Exception

Raise when an object is in an invalid state to perform the desired action

class kiwipy.LocalCommunicator[source]

Bases: CommunicatorHelper

broadcast_send(body, sender=None, subject=None, correlation_id=None)[source]

Broadcast a message to all subscribers

rpc_send(recipient_id, msg)[source]

Initiate a remote procedure call on a recipient. This method returns a future representing the outcome of the call.

Parameters:
  • recipient_id – The recipient identifier

  • msg – The body of the message

Returns:

A future corresponding to the outcome of the call

Return type:

kiwipy.Future

task_send(task, no_reply=False)[source]

Send a task messages, this will be queued and picked up by a worker at some point in the future. The method returns a future representing the outcome of the task.

Parameters:
  • task – The task message

  • no_reply (bool) – Do not send a reply containing the result of the task

Returns:

A future corresponding to the outcome of the task

exception kiwipy.QueueEmpty[source]

Bases: Exception

Could not get the next message from the queue because it is empty

exception kiwipy.RemoteException[source]

Bases: Exception

An exception occurred at the remote end of the call

exception kiwipy.TaskRejected[source]

Bases: Exception

A task was rejected at the remote end

exception kiwipy.UnroutableError[source]

Bases: DeliveryFailed

The messages was unroutable

kiwipy.capture_exceptions(future, ignore=())[source]

Capture any exceptions in the context and set them as the result of the given future

Parameters:
  • future (kiwipy.Future) – The future to the exception on

  • ignore – An optional list of exception types to ignore, these will be raised and not set on the future

kiwipy.chain(source, target)[source]

Chain two futures together so that when one completes, so does the other.

The result (success or failure) of a will be copied to b, unless b has already been completed or cancelled by the time a finishes.

kiwipy.connect(uri: str = 'amqp://guest:guest@127.0.0.1/', **kwargs)[source]

Create a connection using a URI

kiwipy.copy_future(source, target)[source]

Copy the status of future a to b unless b is already done in which case return

Parameters:
  • source (kiwipy.Future) – The source future

  • target (kiwipy.Future) – The target future

Subpackages

Submodules