Tasks Example

Let’s start off as usual.

[1]:
import kiwipy
comm = kiwipy.connect('amqp://127.0.0.1')

Ok, now let’s create our worker and subscribe

[2]:
import time

def worker(_comm, task):
    print((" [x] Received %r" % task))
    result = task**0.5
    time.sleep(1.)
    print(" [x] Done")
    return result

worker_id = comm.add_task_subscriber(worker)

Fantastic, now let’s square root some numbers!

[3]:
future = comm.task_send(1024**2)
 [x] Received 1048576
 [x] Done

Ok…so where’s my square root?

Woah, you’re impatient! It may take a long time to calculate, that’s why you were given a future. But if you insist:

[4]:
print(future.result())
1024.0

Nice, nice. But what if something goes wrong?

Well, let’s have a look

[5]:
fut = comm.task_send('1024')
fut.result()
 [x] Received '1024'
---------------------------------------------------------------------------
RemoteException                           Traceback (most recent call last)
<ipython-input-5-eb2d01cb4f6b> in <module>
      1 fut = comm.task_send('1024')
----> 2 fut.result()
      3

/usr/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433
    434             self._condition.wait(timeout)

/usr/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/.virtualenvs/kiwi/lib/python3.8/site-packages/pytray/aiothreads.py in done(done_future)
     33         # Copy over the future
     34         try:
---> 35             result = done_future.result()
     36             if asyncio.isfuture(result):
     37                 # Change the aio future to a thread future

~/src/kiwipy/kiwipy/rmq/messages.py in _on_response(self, message)
    246                     # If the response was a future it means we should get another message that
    247                     # resolves that future
--> 248                     if asyncio.isfuture(response_future.result()):
    249                         self._awaiting_response[correlation_id] = response_future.result()
    250                 except Exception:  # pylint: disable=broad-except

RemoteException: unsupported operand type(s) for ** or pow(): 'str' and 'float'

Exceptional… And what if I don’t expect a result back?

Got you covered:

[6]:
comm.task_send(1024, no_reply=True)
 [x] Received 1024
 [x] Done

Bangin’. How ‘bout multiple workers?

No problem, add as many as you need. If one is busy, another will be given the task.

[7]:
from concurrent.futures import ThreadPoolExecutor, wait
from functools import partial

comm.remove_task_subscriber(worker_id)

# Let's set up some parallel workers
with ThreadPoolExecutor(max_workers=2) as executor:
    comm.add_task_subscriber(partial(executor.submit, worker))
    comm.add_task_subscriber(partial(executor.submit, worker))

    # Submit and wait for them all to finish
    futs = list((comm.task_send(idx ** 2)) for idx in range(4))
    wait(futs)
 [x] Received 0
 [x] Received 1
 [x] Done
 [x] Received 4
 [x] Done
 [x] Received 9
 [x] Done
 [x] Done

And there you have it. Finally, if any of your workers fail or loose their connection - no problem. KiwiPy will just requeue the message and deliver it to the next available worker - even if one isn’t online for a long time!’

[8]:
comm.close()