Tasks Example¶
Let’s start off as usual.
[1]:
import kiwipy
comm = kiwipy.connect('amqp://127.0.0.1')
Ok, now let’s create our worker and subscribe
[2]:
import time
def worker(_comm, task):
print((" [x] Received %r" % task))
result = task**0.5
time.sleep(1.)
print(" [x] Done")
return result
worker_id = comm.add_task_subscriber(worker)
Fantastic, now let’s square root some numbers!
[3]:
future = comm.task_send(1024**2)
[x] Received 1048576
[x] Done
Ok…so where’s my square root?
Woah, you’re impatient! It may take a long time to calculate, that’s why you were given a future. But if you insist:
[4]:
print(future.result())
1024.0
Nice, nice. But what if something goes wrong?
Well, let’s have a look
[5]:
fut = comm.task_send('1024')
fut.result()
[x] Received '1024'
---------------------------------------------------------------------------
RemoteException Traceback (most recent call last)
<ipython-input-5-eb2d01cb4f6b> in <module>
1 fut = comm.task_send('1024')
----> 2 fut.result()
3
/usr/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
430 raise CancelledError()
431 elif self._state == FINISHED:
--> 432 return self.__get_result()
433
434 self._condition.wait(timeout)
/usr/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
386 def __get_result(self):
387 if self._exception:
--> 388 raise self._exception
389 else:
390 return self._result
~/.virtualenvs/kiwi/lib/python3.8/site-packages/pytray/aiothreads.py in done(done_future)
33 # Copy over the future
34 try:
---> 35 result = done_future.result()
36 if asyncio.isfuture(result):
37 # Change the aio future to a thread future
~/src/kiwipy/kiwipy/rmq/messages.py in _on_response(self, message)
246 # If the response was a future it means we should get another message that
247 # resolves that future
--> 248 if asyncio.isfuture(response_future.result()):
249 self._awaiting_response[correlation_id] = response_future.result()
250 except Exception: # pylint: disable=broad-except
RemoteException: unsupported operand type(s) for ** or pow(): 'str' and 'float'
Exceptional… And what if I don’t expect a result back?
Got you covered:
[6]:
comm.task_send(1024, no_reply=True)
[x] Received 1024
[x] Done
Bangin’. How ‘bout multiple workers?
No problem, add as many as you need. If one is busy, another will be given the task.
[7]:
from concurrent.futures import ThreadPoolExecutor, wait
from functools import partial
comm.remove_task_subscriber(worker_id)
# Let's set up some parallel workers
with ThreadPoolExecutor(max_workers=2) as executor:
comm.add_task_subscriber(partial(executor.submit, worker))
comm.add_task_subscriber(partial(executor.submit, worker))
# Submit and wait for them all to finish
futs = list((comm.task_send(idx ** 2)) for idx in range(4))
wait(futs)
[x] Received 0
[x] Received 1
[x] Done
[x] Received 4
[x] Done
[x] Received 9
[x] Done
[x] Done
And there you have it. Finally, if any of your workers fail or loose their connection - no problem. KiwiPy will just requeue the message and deliver it to the next available worker - even if one isn’t online for a long time!’
[8]:
comm.close()