Source code for kiwipy.local

# -*- coding: utf-8 -*-
from . import communications

__all__ = ('LocalCommunicator',)


[docs]class LocalCommunicator(communications.CommunicatorHelper):
[docs] def task_send(self, task, no_reply=False): return self.fire_task(task, no_reply)
[docs] def rpc_send(self, recipient_id, msg): return self.fire_rpc(recipient_id, msg)
[docs] def broadcast_send(self, body, sender=None, subject=None, correlation_id=None): return self.fire_broadcast(body, sender=sender, subject=subject, correlation_id=correlation_id)