Coming from Pika

KiwiPy comes natively with a RabbitMQ communicator (others can be added by extending the Communicator interface) and thus it may be useful to see how to achieve the same things in Pika (the standard library used in the RabbitMQ tutorials) and kiwiPy. This also shows some of the differences, particularly where kiwiPy is less verbose by setting sensible defaults for the user.

Work Queues

Let’s start with RabbitMQ’s work queues example.

Pika

The code for sending a task:

import sys

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or 'Hello World!'
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    )
)
print(f' [x] Sent {message!r}')
connection.close()

The code for running a worker:

import time

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(f' [x] Received {body!r}')
    time.sleep(body.count(b'.'))
    print(' [x] Done')
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

KiwiPy

And now, in kiwiPy. The code for sending the same task:

import sys

import kiwipy

message = ' '.join(sys.argv[1:]) or 'Hello World!'

with kiwipy.connect('amqp://localhost') as comm:
    queue = comm.task_queue('task_queue')  # Durable by default
    queue.task_send(message)
    print(f' [x] Sent {message!r}')

Here, compared to the Pika snippet, we see that we don’t have to decide or know about an exchange, routing_key, properties, channel or durability of the message. Task queues in kiwiPy are always durable and the corresponding messages persistent. The routing_key is simply the name of the queue that the user declared and an exchange is selected automatically for the user (set to kiwipy.tasks by default). The exchange and some other defaults can be changed when constructing a RmqThreadCommunicator.

Here we have explicitly created a task queue called task_queue, however, even this has a default version if the user doesn’t need multiple queues. In which case the code would simply be:

with kiwipy.connect('amqp://localhost') as comm:
    comm.task_send(message)
    print(" [x] Sent %r" % message)

Now to run a worker it’s just:

import threading
import time

import kiwipy


def callback(_comm, body):
    print(f' [x] Received {body!r}')
    time.sleep(body.count('.'))
    print(' [x] Done')
    return True


with kiwipy.connect('amqp://localhost') as comm:
    queue = comm.task_queue('task_queue', prefetch_count=1)
    queue.add_task_subscriber(callback)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    threading.Event().wait()  # Wait for incoming messages

This is fairly straightforward and the only decision the user has to make is about the prefetch count (the number of tasks and worker can be working on simultaneously).

Topics

Moving on to the topics example (known as broadcasts with filters in kiwiPy).

Pika

The Pika code to emit a log topic is:

import sys

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(f' [x] Sent {routing_key!r}:{message!r}')
connection.close()

And to receive, the code is:

import sys

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write(f'Usage: {sys.argv[0]} [binding_key]...\n')
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(f' [x] {method.routing_key!r}:{body!r}')


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

KiwiPy

Emitting in kiwiPy:

import sys

import kiwipy

message = ' '.join(sys.argv[2:]) or 'Hello World!'

with kiwipy.connect('amqp://localhost') as comm:
    subject = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
    comm.broadcast_send(message, subject=subject)
    print(f' [x] Sent {subject!r}:{message!r}')

As we’ve come to see in kiwiPy there’s no need to worry about the channel, exchange, exchange type while the routing key is known as the subject of the message.

And to receive the log:

import sys
import threading

import kiwipy


def callback(_comm, body, _sender, subject, _msg_id):
    print(f' [x] {subject!r}:{body!r}')


with kiwipy.connect('amqp://localhost') as comm:
    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write(f'Usage: {sys.argv[0]} [binding_key]...\n')
        sys.exit(1)

    for binding_key in binding_keys:
        comm.add_broadcast_subscriber(kiwipy.BroadcastFilter(callback, binding_key))

    print(' [*] Waiting for logs. To exit press CTRL+C')
    threading.Event().wait()

Here we can see that the filtering is performed by using a BroadcastFilter which can match a string, that optionally includes wildcards *.

RPC

Finally, let’s end with the RPC example.

Pika

The code to run an RPC server is:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(f' [.] fib({n})')
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id= \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(' [x] Awaiting RPC requests')
channel.start_consuming()

The client side is:

import uuid

import pika


class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n)
        )
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(' [x] Requesting fib(30)')
response = fibonacci_rpc.call(30)
print(f' [.] Got {response!r}')

KiwiPy

Now, in kiwiPy the server becomes:

import threading

import kiwipy


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(_comm, body):
    n = int(body)

    print(f' [.] fib({n})')
    return fib(n)


with kiwipy.connect('amqp://localhost') as comm:
    comm.add_rpc_subscriber(on_request, identifier='rpc_queue')
    print(' [x] Awaiting RPC requests')
    threading.Event().wait()

As usual, the routing of messages is automatically set up by kiwiPy and the identifier of the RPC subscriber is the only thing needed to link the client and server.

The client code is:

import kiwipy

with kiwipy.connect('amqp://localhost') as comm:
    print(' [x] Requesting fib(30)')
    response = comm.rpc_send('rpc_queue', 30).result()
    print(f' [.] Got {response!r}')

Here, we send the RPC request to the identifier we used before. The rpc_send call gives us back a future from which we can get the result once it has been received from the server.

Roundup

As these examples demonstrate, kiwiPy tries to use sensible defaults and keep many of the routing details hidden from the user. This has the advantage of bringing the power of a dedicated message broker such as RabbitMQ to less expert users. Additionally, it allows messaging to be done using a simple interface which can be implemented by other messaging protocols, allowing clients to be protocol agnostic. The trade-off, of course, is reduced control over the finer details of how the messaging is performed. For users that need such fine-grained control, pika and the excellent aio-pika may be better alternatives.