Source code for kiwipy.rmq.messages

# -*- coding: utf-8 -*-
import asyncio
from collections import deque
import copy
import logging
import traceback
import typing
import uuid

import aio_pika

from . import defaults, utils

_LOGGER = logging.getLogger(__name__)


[docs]class BroadcastMessage: BODY = 'body' SENDER = 'sender' SUBJECT = 'subject' CORRELATION_ID = 'correlation_id'
[docs] @staticmethod def create(body, sender=None, subject=None, correlation_id=None): message_dict = { BroadcastMessage.BODY: body, BroadcastMessage.SENDER: sender, BroadcastMessage.SUBJECT: subject, BroadcastMessage.CORRELATION_ID: correlation_id, } return message_dict
[docs]class BaseConnectionWithExchange: """ An RMQ connection with a channel and exchange """ DEFAULT_EXCHANGE_PARAMS = {'type': aio_pika.ExchangeType.TOPIC} def __init__(self, connection, exchange_name=defaults.MESSAGE_EXCHANGE, exchange_params=None, testing_mode=False): """ :type connection: :class:`aio_pika.Connection` :type exchange_name: str :type exchange_params: dict or NoneType """ super().__init__() if exchange_params is None: exchange_params = copy.copy(self.DEFAULT_EXCHANGE_PARAMS) if testing_mode: exchange_params.setdefault('auto_delete', testing_mode) self._connection = connection self._exchange_name = exchange_name self._exchange_params = exchange_params self._loop = self._connection.loop self._channel = None # type: typing.Optional[aio_pika.Channel] self._exchange = None # type: typing.Optional[aio_pika.Exchange] self._is_closing = False @property def is_closing(self): return self._is_closing
[docs] def loop(self) -> asyncio.BaseEventLoop: return self._loop
[docs] def get_exchange_name(self): return self._exchange_name
[docs] def channel(self): return self._channel
[docs] async def connect(self): if self._channel: return # Create the channel self._channel = await self._connection.channel() # Create the exchange self._exchange = await self._channel.declare_exchange(name=self.get_exchange_name(), **self._exchange_params)
[docs] async def disconnect(self): if not self.is_closing: self._is_closing = True if self.channel() is not None: await self.channel().close() self._channel = None
[docs]class BasePublisherWithReplyQueue: """ A base class for any object that needs to be able to publish a message and to potentially expect a reply. """ # pylint: disable=too-many-instance-attributes DEFAULT_EXCHANGE_PARAMS = {'type': aio_pika.ExchangeType.TOPIC} def __init__( self, connection, exchange_name=defaults.MESSAGE_EXCHANGE, exchange_params=None, encoder=defaults.ENCODER, decoder=defaults.DECODER, confirm_deliveries=True, testing_mode=False ): # pylint: disable=too-many-arguments """ :param connection: The aio_pika RMQ connection :type connection: :class:`aio_pika.connection.Connection` :param exchange_name: :param exchange_params: :param encoder: :param decoder: :param confirm_deliveries: """ super().__init__() if exchange_params is None: exchange_params = copy.copy(self.DEFAULT_EXCHANGE_PARAMS) if testing_mode: exchange_params.setdefault('auto_delete', testing_mode) self._exchange_name = exchange_name self._exchange_params = exchange_params self._encode = encoder self._response_decode = decoder self._confirm_deliveries = confirm_deliveries if self._confirm_deliveries: self._num_published = 0 self._delivery_info = deque() self._testing_mode = testing_mode self._awaiting_response = {} self._connection = connection self._channel = None # type: typing.Optional[aio_pika.Channel] self._exchange = None # type: typing.Optional[aio_pika.Exchange] self._reply_queue = None # type: typing.Optional[aio_pika.Queue] self._is_closing = False @property def is_closing(self): return self._is_closing @property def is_connected(self): return self._channel
[docs] async def connect(self): if self.is_connected: return self._channel = await self._connection.channel( publisher_confirms=self._confirm_deliveries, on_return_raises=True ) self._channel.close_callbacks.add(self._on_channel_close) self._exchange = await self._channel.declare_exchange(name=self.get_exchange_name(), **self._exchange_params) # Declare the reply queue reply_queue_name = f'{self._exchange_name}-reply-{str(uuid.uuid4())}' self._reply_queue = await self._channel.declare_queue( name=reply_queue_name, exclusive=True, auto_delete=self._testing_mode, arguments={'x-expires': defaults.REPLY_QUEUE_EXPIRES} ) await self._reply_queue.bind(self._exchange, routing_key=reply_queue_name) await self._reply_queue.consume(self._on_response, no_ack=True)
[docs] async def disconnect(self): if not self.is_closing: self._is_closing = True if self.channel() is not None and not self.channel().is_closed: await self._channel.close() self._channel = None
[docs] def action_message(self, message): """ Execute a message that involves communication. This could mean that the message gets queued first and then sent as soon as the connection is open. In any case the method returns a future for the message. :param message: The message to execute :return: A future corresponding to action :rtype: :class:`kiwipy.Future` """ message.send(self) return message.future
[docs] async def publish(self, message, routing_key, mandatory=True): """ Send a fire-and-forget message i.e. no response expected. :param message: The message to send :param routing_key: The routing key :param mandatory: If the message cannot be routed this will raise an UnroutableException :return: """ result = await self._exchange.publish(message, routing_key=routing_key, mandatory=mandatory) return result
[docs] async def publish_expect_response(self, message, routing_key, mandatory=True): # If there is no correlation id we have to set on so that we know what the response will be to if not message.correlation_id: message.correlation_id = str(uuid.uuid4()) correlation_id = message.correlation_id response_future = asyncio.Future() self._awaiting_response[correlation_id] = response_future result = await self.publish(message, routing_key=routing_key, mandatory=mandatory) return result, response_future
[docs] def get_exchange_name(self): return self._exchange_name
[docs] def channel(self): return self._channel
# region RMQ communications
[docs] async def _on_response(self, message): """ Called when we get a message on our response queue :param message: The response message :type message: :class:`aio_pika.IncomingMessage` """ correlation_id = message.correlation_id try: response_future = self._awaiting_response.pop(correlation_id) except KeyError: _LOGGER.error("Got a response for an unknown id '%s':\n%s", correlation_id, message) else: try: response = self._response_decode(message.body) except Exception: _LOGGER.error('Failed to decode message body:\n%s%s', message.body, traceback.format_exc()) raise else: utils.response_to_future(response, response_future) try: # If the response was a future it means we should get another message that # resolves that future if asyncio.isfuture(response_future.result()): self._awaiting_response[correlation_id] = response_future.result() except Exception: # pylint: disable=broad-except pass
[docs] def _on_channel_close(self, _closing_future, *_, **__): """ Reset all channel specific members """ if self._confirm_deliveries: self._num_published = 0 self._delivery_info = deque()
# endregion