Coming from Pika¶
KiwiPy comes natively with a RabbitMQ communicator (others can be added by extending the Communicator
interface) and thus it may be useful to see how to achieve the same things in Pika (the standard library used in the RabbitMQ tutorials) and kiwiPy. This also shows some of the differences, particularly where kiwiPy is less verbose by setting sensible defaults for the user.
Work Queues¶
Let’s start with RabbitMQ’s work queues example.
Pika¶
The code for sending a task:
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or 'Hello World!'
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(f' [x] Sent {message!r}')
connection.close()
The code for running a worker:
import time
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f' [x] Received {body!r}')
time.sleep(body.count(b'.'))
print(' [x] Done')
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
KiwiPy¶
And now, in kiwiPy. The code for sending the same task:
import sys
import kiwipy
message = ' '.join(sys.argv[1:]) or 'Hello World!'
with kiwipy.connect('amqp://localhost') as comm:
queue = comm.task_queue('task_queue') # Durable by default
queue.task_send(message)
print(f' [x] Sent {message!r}')
Here, compared to the Pika snippet, we see that we don’t have to decide or know about an exchange
, routing_key
, properties
, channel
or durability of the message.
Task queues in kiwiPy are always durable and the corresponding messages persistent.
The routing_key
is simply the name of the queue that the user declared and an exchange
is selected automatically for the user (set to kiwipy.tasks
by default).
The exchange
and some other defaults can be changed when constructing a RmqThreadCommunicator
.
Here we have explicitly created a task queue called task_queue
, however, even this has a default version if the user doesn’t need multiple queues. In which case the code would simply be:
with kiwipy.connect('amqp://localhost') as comm:
comm.task_send(message)
print(" [x] Sent %r" % message)
Now to run a worker it’s just:
import threading
import time
import kiwipy
def callback(_comm, body):
print(f' [x] Received {body!r}')
time.sleep(body.count('.'))
print(' [x] Done')
return True
with kiwipy.connect('amqp://localhost') as comm:
queue = comm.task_queue('task_queue', prefetch_count=1)
queue.add_task_subscriber(callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
threading.Event().wait() # Wait for incoming messages
This is fairly straightforward and the only decision the user has to make is about the prefetch count (the number of tasks and worker can be working on simultaneously).
Topics¶
Moving on to the topics example (known as broadcasts with filters in kiwiPy).
Pika¶
The Pika code to emit a log topic is:
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(f' [x] Sent {routing_key!r}:{message!r}')
connection.close()
And to receive, the code is:
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write(f'Usage: {sys.argv[0]} [binding_key]...\n')
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f' [x] {method.routing_key!r}:{body!r}')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
KiwiPy¶
Emitting in kiwiPy:
import sys
import kiwipy
message = ' '.join(sys.argv[2:]) or 'Hello World!'
with kiwipy.connect('amqp://localhost') as comm:
subject = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
comm.broadcast_send(message, subject=subject)
print(f' [x] Sent {subject!r}:{message!r}')
As we’ve come to see in kiwiPy there’s no need to worry about the channel, exchange, exchange type while the routing key is known as the subject of the message.
And to receive the log:
import sys
import threading
import kiwipy
def callback(_comm, body, _sender, subject, _msg_id):
print(f' [x] {subject!r}:{body!r}')
with kiwipy.connect('amqp://localhost') as comm:
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write(f'Usage: {sys.argv[0]} [binding_key]...\n')
sys.exit(1)
for binding_key in binding_keys:
comm.add_broadcast_subscriber(kiwipy.BroadcastFilter(callback, binding_key))
print(' [*] Waiting for logs. To exit press CTRL+C')
threading.Event().wait()
Here we can see that the filtering is performed by using a BroadcastFilter
which can match a string, that optionally includes wildcards *
.
RPC¶
Finally, let’s end with the RPC example.
Pika¶
The code to run an RPC server is:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(f' [.] fib({n})')
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(' [x] Awaiting RPC requests')
channel.start_consuming()
The client side is:
import uuid
import pika
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n)
)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(' [x] Requesting fib(30)')
response = fibonacci_rpc.call(30)
print(f' [.] Got {response!r}')
KiwiPy¶
Now, in kiwiPy the server becomes:
import threading
import kiwipy
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(_comm, body):
n = int(body)
print(f' [.] fib({n})')
return fib(n)
with kiwipy.connect('amqp://localhost') as comm:
comm.add_rpc_subscriber(on_request, identifier='rpc_queue')
print(' [x] Awaiting RPC requests')
threading.Event().wait()
As usual, the routing of messages is automatically set up by kiwiPy and the identifier
of the RPC subscriber is the only thing needed to link the client and server.
The client code is:
import kiwipy
with kiwipy.connect('amqp://localhost') as comm:
print(' [x] Requesting fib(30)')
response = comm.rpc_send('rpc_queue', 30).result()
print(f' [.] Got {response!r}')
Here, we send the RPC request to the identifier we used before. The rpc_send
call gives us back a future from which we can get the result once it has been received from the server.
Roundup¶
As these examples demonstrate, kiwiPy tries to use sensible defaults and keep many of the routing details hidden from the user. This has the advantage of bringing the power of a dedicated message broker such as RabbitMQ to less expert users. Additionally, it allows messaging to be done using a simple interface which can be implemented by other messaging protocols, allowing clients to be protocol agnostic. The trade-off, of course, is reduced control over the finer details of how the messaging is performed. For users that need such fine-grained control, pika and the excellent aio-pika may be better alternatives.