Tasks Example
=============

Let's start off as usual.

In [1]:
import kiwipy
comm = kiwipy.connect('amqp://127.0.0.1')

Ok, now let's create our worker and subscribe

In [2]:
import time

def worker(_comm, task):
 print((" [x] Received %r" % task))
 result = task**0.5
 time.sleep(1.)
 print(" [x] Done")
 return result

worker_id = comm.add_task_subscriber(worker)

Fantastic, now let's square root some numbers!

In [3]:
future = comm.task_send(1024**2)

 [x] Received 1048576
 [x] Done


*Ok...so where's my square root?*

Woah, you're impatient! It may take a long time to calculate, that's why you were given a future. But if you insist:

In [4]:
print(future.result())

1024.0


*Nice, nice. But what if something goes wrong?*

Well, let's have a look

In [5]:
fut = comm.task_send('1024')
fut.result()

 [x] Received '1024'


RemoteException: unsupported operand type(s) for ** or pow(): 'str' and 'float'

*Exceptional... And what if I don't expect a result back?*

Got you covered:

In [6]:
comm.task_send(1024, no_reply=True)

 [x] Received 1024
 [x] Done


*Bangin'. How 'bout multiple workers?*

No problem, add as many as you need. If one is busy, another will be given the task.

In [7]:
from concurrent.futures import ThreadPoolExecutor, wait
from functools import partial

comm.remove_task_subscriber(worker_id)

# Let's set up some parallel workers
with ThreadPoolExecutor(max_workers=2) as executor:
 comm.add_task_subscriber(partial(executor.submit, worker))
 comm.add_task_subscriber(partial(executor.submit, worker))

 # Submit and wait for them all to finish
 futs = list((comm.task_send(idx ** 2)) for idx in range(4))
 wait(futs)

 [x] Received 0
 [x] Received 1
 [x] Done
 [x] Received 4
 [x] Done
 [x] Received 9
 [x] Done
 [x] Done


And there you have it. Finally, if any of your workers fail or loose their connection - no problem. KiwiPy will just
requeue the message and deliver it to the next available worker - even if one isn't online for a long time!'

In [8]:
comm.close()