Source code for kiwipy.local
# -*- coding: utf-8 -*-
from . import communications
__all__ = ('LocalCommunicator',)
[docs]class LocalCommunicator(communications.CommunicatorHelper):
[docs] def task_send(self, task, no_reply=False):
return self.fire_task(task, no_reply)
[docs] def rpc_send(self, recipient_id, msg):
return self.fire_rpc(recipient_id, msg)
[docs] def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
return self.fire_broadcast(body, sender=sender, subject=subject, correlation_id=correlation_id)