{ "cells": [ { "cell_type": "markdown", "metadata": { "collapsed": true, "pycharm": { "name": "#%% md\n" } }, "source": [ "Tasks Example\n", "=============\n", "\n", "Let's start off as usual." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import kiwipy\n", "comm = kiwipy.connect('amqp://127.0.0.1')" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "Ok, now let's create our worker and subscribe" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import time\n", "\n", "def worker(_comm, task):\n", " print((\" [x] Received %r\" % task))\n", " result = task**0.5\n", " time.sleep(1.)\n", " print(\" [x] Done\")\n", " return result\n", "\n", "worker_id = comm.add_task_subscriber(worker)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Fantastic, now let's square root some numbers!" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " [x] Received 1048576\n", " [x] Done\n" ] } ], "source": [ "future = comm.task_send(1024**2)" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "*Ok...so where's my square root?*\n", "\n", "Woah, you're impatient! It may take a long time to calculate, that's why you were given a future. But if you insist:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1024.0\n" ] } ], "source": [ "print(future.result())" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "*Nice, nice. But what if something goes wrong?*\n", "\n", "Well, let's have a look" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " [x] Received '1024'\n" ] }, { "ename": "RemoteException", "evalue": "unsupported operand type(s) for ** or pow(): 'str' and 'float'", "output_type": "error", "traceback": [ "\u001B[0;31m---------------------------------------------------------------------------\u001B[0m", "\u001B[0;31mRemoteException\u001B[0m Traceback (most recent call last)", "\u001B[0;32m\u001B[0m in \u001B[0;36m\u001B[0;34m\u001B[0m\n\u001B[1;32m 1\u001B[0m \u001B[0mfut\u001B[0m \u001B[0;34m=\u001B[0m \u001B[0mcomm\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0mtask_send\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0;34m'1024'\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0;32m----> 2\u001B[0;31m \u001B[0mfut\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0mresult\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0m\u001B[1;32m 3\u001B[0m \u001B[0;34m\u001B[0m\u001B[0m\n", "\u001B[0;32m/usr/lib/python3.8/concurrent/futures/_base.py\u001B[0m in \u001B[0;36mresult\u001B[0;34m(self, timeout)\u001B[0m\n\u001B[1;32m 430\u001B[0m \u001B[0;32mraise\u001B[0m \u001B[0mCancelledError\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[1;32m 431\u001B[0m \u001B[0;32melif\u001B[0m \u001B[0mself\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0m_state\u001B[0m \u001B[0;34m==\u001B[0m \u001B[0mFINISHED\u001B[0m\u001B[0;34m:\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0;32m--> 432\u001B[0;31m \u001B[0;32mreturn\u001B[0m \u001B[0mself\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0m__get_result\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0m\u001B[1;32m 433\u001B[0m \u001B[0;34m\u001B[0m\u001B[0m\n\u001B[1;32m 434\u001B[0m \u001B[0mself\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0m_condition\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0mwait\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0mtimeout\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n", "\u001B[0;32m/usr/lib/python3.8/concurrent/futures/_base.py\u001B[0m in \u001B[0;36m__get_result\u001B[0;34m(self)\u001B[0m\n\u001B[1;32m 386\u001B[0m \u001B[0;32mdef\u001B[0m \u001B[0m__get_result\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0mself\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m:\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[1;32m 387\u001B[0m \u001B[0;32mif\u001B[0m \u001B[0mself\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0m_exception\u001B[0m\u001B[0;34m:\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0;32m--> 388\u001B[0;31m \u001B[0;32mraise\u001B[0m \u001B[0mself\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0m_exception\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0m\u001B[1;32m 389\u001B[0m \u001B[0;32melse\u001B[0m\u001B[0;34m:\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[1;32m 390\u001B[0m \u001B[0;32mreturn\u001B[0m \u001B[0mself\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0m_result\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n", "\u001B[0;32m~/.virtualenvs/kiwi/lib/python3.8/site-packages/pytray/aiothreads.py\u001B[0m in \u001B[0;36mdone\u001B[0;34m(done_future)\u001B[0m\n\u001B[1;32m 33\u001B[0m \u001B[0;31m# Copy over the future\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[1;32m 34\u001B[0m \u001B[0;32mtry\u001B[0m\u001B[0;34m:\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0;32m---> 35\u001B[0;31m \u001B[0mresult\u001B[0m \u001B[0;34m=\u001B[0m \u001B[0mdone_future\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0mresult\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0m\u001B[1;32m 36\u001B[0m \u001B[0;32mif\u001B[0m \u001B[0masyncio\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0misfuture\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0mresult\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m:\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[1;32m 37\u001B[0m \u001B[0;31m# Change the aio future to a thread future\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n", "\u001B[0;32m~/src/kiwipy/kiwipy/rmq/messages.py\u001B[0m in \u001B[0;36m_on_response\u001B[0;34m(self, message)\u001B[0m\n\u001B[1;32m 246\u001B[0m \u001B[0;31m# If the response was a future it means we should get another message that\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[1;32m 247\u001B[0m \u001B[0;31m# resolves that future\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0;32m--> 248\u001B[0;31m \u001B[0;32mif\u001B[0m \u001B[0masyncio\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0misfuture\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0mresponse_future\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0mresult\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m:\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0m\u001B[1;32m 249\u001B[0m \u001B[0mself\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0m_awaiting_response\u001B[0m\u001B[0;34m[\u001B[0m\u001B[0mcorrelation_id\u001B[0m\u001B[0;34m]\u001B[0m \u001B[0;34m=\u001B[0m \u001B[0mresponse_future\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0mresult\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[1;32m 250\u001B[0m \u001B[0;32mexcept\u001B[0m \u001B[0mException\u001B[0m\u001B[0;34m:\u001B[0m \u001B[0;31m# pylint: disable=broad-except\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n", "\u001B[0;31mRemoteException\u001B[0m: unsupported operand type(s) for ** or pow(): 'str' and 'float'" ] } ], "source": [ "fut = comm.task_send('1024')\n", "fut.result()" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "*Exceptional... And what if I don't expect a result back?*\n", "\n", "Got you covered:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " [x] Received 1024\n", " [x] Done\n" ] } ], "source": [ "comm.task_send(1024, no_reply=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*Bangin'. How 'bout multiple workers?*\n", "\n", "No problem, add as many as you need. If one is busy, another will be given the task." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " [x] Received 0\n", " [x] Received 1\n", " [x] Done\n", " [x] Received 4\n", " [x] Done\n", " [x] Received 9\n", " [x] Done\n", " [x] Done\n" ] } ], "source": [ "from concurrent.futures import ThreadPoolExecutor, wait\n", "from functools import partial\n", "\n", "comm.remove_task_subscriber(worker_id)\n", "\n", "# Let's set up some parallel workers\n", "with ThreadPoolExecutor(max_workers=2) as executor:\n", " comm.add_task_subscriber(partial(executor.submit, worker))\n", " comm.add_task_subscriber(partial(executor.submit, worker))\n", "\n", " # Submit and wait for them all to finish\n", " futs = list((comm.task_send(idx ** 2)) for idx in range(4))\n", " wait(futs)" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "And there you have it. Finally, if any of your workers fail or loose their connection - no problem. KiwiPy will just\n", "requeue the message and deliver it to the next available worker - even if one isn't online for a long time!'" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "comm.close()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5" } }, "nbformat": 4, "nbformat_minor": 1 }