Source code for kiwipy.filters

# -*- coding: utf-8 -*-
import re
import typing

__all__ = ('BroadcastFilter',)


[docs]class BroadcastFilter: """A filter that can be used to limit the subjects and/or senders that will be received""" def __init__(self, subscriber, subject=None, sender=None): self._subscriber = subscriber self._subject_filters = [] self._sender_filters = [] if subject is not None: self.add_subject_filter(subject) if sender is not None: self.add_sender_filter(sender) @property def __name__(self): return 'BroadcastFilter' def __call__(self, communicator, body, sender=None, subject=None, correlation_id=None): if self.is_filtered(sender, subject): return None return self._subscriber(communicator, body, sender, subject, correlation_id)
[docs] def is_filtered(self, sender, subject) -> bool: if subject is not None and self._subject_filters and \ not any(check(subject) for check in self._subject_filters): return True if sender is not None and self._sender_filters and \ not any(check(sender) for check in self._sender_filters): return True return False
[docs] def add_subject_filter(self, subject_filter): self._subject_filters.append(self._ensure_filter(subject_filter))
[docs] def add_sender_filter(self, sender_filter): self._sender_filters.append(self._ensure_filter(sender_filter))
[docs] @classmethod def _ensure_filter(cls, filter_value): if isinstance(filter_value, str): return re.compile(filter_value.replace('.', '[.]').replace('*', '.*')).match if isinstance(filter_value, typing.Pattern): # pylint: disable=isinstance-second-argument-not-valid-type return filter_value.match return lambda val: val == filter_value
[docs] @classmethod def _make_regex(cls, filter_str): """ :param filter_str: The filter string :type filter_str: str :return: The regular expression object """ return re.compile(filter_str.replace('.', '[.]'))