Tasks Example

Let’s start off as usual.

import kiwipy
comm = kiwipy.connect('amqp://')

Ok, now let’s create our worker and subscribe

import time

def worker(_comm, task):
    print((" [x] Received %r" % task))
    result = task**0.5
    print(" [x] Done")
    return result

worker_id = comm.add_task_subscriber(worker)

Fantastic, now let’s square root some numbers!

future = comm.task_send(1024**2)
 [x] Received 1048576
 [x] Done

Ok…so where’s my square root?

Woah, you’re impatient! It may take a long time to calculate, that’s why you were given a future. But if you insist:


Nice, nice. But what if something goes wrong?

Well, let’s have a look

fut = comm.task_send('1024')
 [x] Received '1024'
RemoteException                           Traceback (most recent call last)
<ipython-input-5-eb2d01cb4f6b> in <module>
      1 fut = comm.task_send('1024')
----> 2 fut.result()

/usr/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    434             self._condition.wait(timeout)

/usr/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/.virtualenvs/kiwi/lib/python3.8/site-packages/pytray/aiothreads.py in done(done_future)
     33         # Copy over the future
     34         try:
---> 35             result = done_future.result()
     36             if asyncio.isfuture(result):
     37                 # Change the aio future to a thread future

~/src/kiwipy/kiwipy/rmq/messages.py in _on_response(self, message)
    246                     # If the response was a future it means we should get another message that
    247                     # resolves that future
--> 248                     if asyncio.isfuture(response_future.result()):
    249                         self._awaiting_response[correlation_id] = response_future.result()
    250                 except Exception:  # pylint: disable=broad-except

RemoteException: unsupported operand type(s) for ** or pow(): 'str' and 'float'

Exceptional… And what if I don’t expect a result back?

Got you covered:

comm.task_send(1024, no_reply=True)
 [x] Received 1024
 [x] Done

Bangin’. How ‘bout multiple workers?

No problem, add as many as you need. If one is busy, another will be given the task.

from concurrent.futures import ThreadPoolExecutor, wait
from functools import partial


# Let's set up some parallel workers
with ThreadPoolExecutor(max_workers=2) as executor:
    comm.add_task_subscriber(partial(executor.submit, worker))
    comm.add_task_subscriber(partial(executor.submit, worker))

    # Submit and wait for them all to finish
    futs = list((comm.task_send(idx ** 2)) for idx in range(4))
 [x] Received 0
 [x] Received 1
 [x] Done
 [x] Received 4
 [x] Done
 [x] Received 9
 [x] Done
 [x] Done

And there you have it. Finally, if any of your workers fail or loose their connection - no problem. KiwiPy will just requeue the message and deliver it to the next available worker - even if one isn’t online for a long time!’
