Source code for kiwipy.communicate
# -*- coding: utf-8 -*-
__all__ = ('connect',)
DEFAULT_COMM_URI = 'amqp://guest:guest@127.0.0.1/'
[docs]def connect(uri: str = DEFAULT_COMM_URI, **kwargs):
"""Create a connection using a URI"""
if uri.startswith('amqp'):
from . import rmq # Avoid circular ref: pylint: disable=import-outside-toplevel
return rmq.connect(connection_params=uri, **kwargs)
raise ValueError(f"Uknown communicator uri '{uri}'")