kiwipy.rmq.messages module

class kiwipy.rmq.messages.BaseConnectionWithExchange(connection, exchange_name='kiwipy.messages', exchange_params=None, testing_mode=False)[source]

Bases: object

An RMQ connection with a channel and exchange

DEFAULT_EXCHANGE_PARAMS = {'type': ExchangeType.TOPIC}
channel()[source]
async connect()[source]
async disconnect()[source]
get_exchange_name()[source]
property is_closing
loop() BaseEventLoop[source]
class kiwipy.rmq.messages.BasePublisherWithReplyQueue(connection, exchange_name='kiwipy.messages', exchange_params=None, encoder=functools.partial(<function dump>, encoding='utf-8'), decoder=functools.partial(<function load>, Loader=<class 'yaml.loader.FullLoader'>), confirm_deliveries=True, testing_mode=False)[source]

Bases: object

A base class for any object that needs to be able to publish a message and to potentially expect a reply.

DEFAULT_EXCHANGE_PARAMS = {'type': ExchangeType.TOPIC}
_on_channel_close(_closing_future, *_, **__)[source]

Reset all channel specific members

async _on_response(message)[source]

Called when we get a message on our response queue

Parameters

message (aio_pika.IncomingMessage) – The response message

action_message(message)[source]

Execute a message that involves communication. This could mean that the message gets queued first and then sent as soon as the connection is open. In any case the method returns a future for the message.

Parameters

message – The message to execute

Returns

A future corresponding to action

Return type

kiwipy.Future

channel()[source]
async connect()[source]
async disconnect()[source]
get_exchange_name()[source]
property is_closing
property is_connected
async publish(message, routing_key, mandatory=True)[source]

Send a fire-and-forget message i.e. no response expected.

Parameters
  • message – The message to send

  • routing_key – The routing key

  • mandatory – If the message cannot be routed this will raise an UnroutableException

Returns

async publish_expect_response(message, routing_key, mandatory=True)[source]
class kiwipy.rmq.messages.BroadcastMessage[source]

Bases: object

BODY = 'body'
CORRELATION_ID = 'correlation_id'
SENDER = 'sender'
SUBJECT = 'subject'
static create(body, sender=None, subject=None, correlation_id=None)[source]