Skip to content

Instantly share code, notes, and snippets.

@thanakijwanavit
Last active May 11, 2021 17:02
Show Gist options
  • Save thanakijwanavit/d473c84a498a8b37dfdff0a5991a9710 to your computer and use it in GitHub Desktop.
Save thanakijwanavit/d473c84a498a8b37dfdff0a5991a9710 to your computer and use it in GitHub Desktop.
async demo
Display the source blob
Display the rendered blob
Raw
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "async demo",
"private_outputs": true,
"provenance": [],
"collapsed_sections": [],
"authorship_tag": "ABX9TyPcsjr1l75LGTZBqErN4YO+",
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/thanakijwanavit/d473c84a498a8b37dfdff0a5991a9710/async-demo.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "code",
"metadata": {
"id": "8PhepkTsbfqf"
},
"source": [
"!pip install -q --upgrade nicHelper==0.0.61"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "I2Jtm22SLMGZ"
},
"source": [
"from time import sleep"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "QXxXObKnb6U_"
},
"source": [
"def aSlowFunc(input_:str):\n",
" sleep(2)\n",
" return input_"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "Od-I_V45cHz2"
},
"source": [
"# 1. asyncMap (threading backend)"
]
},
{
"cell_type": "code",
"metadata": {
"id": "OF10fuS0cFAK"
},
"source": [
"from nicHelper.asyncUtil import asyncThreadMap\n",
"%time r = asyncThreadMap(aSlowFunc, range(100), threads=100)\n",
"print(list(r)[:10])"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "1AsbzuqPkY_A"
},
"source": [
"%%time\n",
"### multiple inputs\n",
"input_ = list(zip(range(10), range(1,11)))\n",
"print(input_)\n",
"\n",
"list(asyncThreadMap(lambda x: (lambda x,y: x+y )(x[0],x[1]), input_))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "WvMINT9g3vxO"
},
"source": [
"# 2. multiProcessing (use multiple CPU), unlock GIL"
]
},
{
"cell_type": "code",
"metadata": {
"id": "2GxDjBoz34IG"
},
"source": [
"from nicHelper.asyncUtil import asyncProcessMap\n",
"%time r = asyncProcessMap(aSlowFunc, range(100), threads=100)\n",
"print(list(r)[:10])"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "ZFAzbIh3ezGi"
},
"source": [
"# 3. use async await"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "GNdPTmUIkEEE"
},
"source": [
"# async await helper (cheating)\n",
"exactly the same as asyncMap but under the hood its using async await instead\n",
"\n",
"> its not as fast as you can see, but threads are automatically determined \\n\n",
"> Note that this thread is subjected to GIl and will only run on 1 core"
]
},
{
"cell_type": "code",
"metadata": {
"id": "um-mfH2IoCZ_"
},
"source": [
"from nicHelper.asyncUtil import asyncAwaitMap\n",
"import nest_asyncio\n",
"nest_asyncio.apply() # unlock nested event loop"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "URVLB_zQkLGP"
},
"source": [
"%time r = asyncAwaitMap(aSlowFunc, range(100))\n",
"print(r[:10])"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "npYZrkpF5nUH"
},
"source": [
"# doing things from scratch"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "rn5Kk7JOkLyU"
},
"source": [
"## async await from scratch"
]
},
{
"cell_type": "code",
"metadata": {
"id": "fj8WTLyEe14J"
},
"source": [
"from nicHelper.asyncUtil import async_wrap\n",
"import asyncio, requests, time, nest_asyncio"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "vm_iboT1f5dV"
},
"source": [
"## unlock the nested event loop"
]
},
{
"cell_type": "code",
"metadata": {
"id": "w7J8VJtKO6Yg"
},
"source": [
"nest_asyncio.apply()"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "i_o9MtQlgBXA"
},
"source": [
"if you have a non-async function, need to decorate it to convert it into an async function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "DPF_Kq4SLUqg"
},
"source": [
"@async_wrap\n",
"def aSlowFunc(input_:str):\n",
" time.sleep(2)\n",
" return input_"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "bgFGt-hugYPn"
},
"source": [
"Otherwise you can just write it like this"
]
},
{
"cell_type": "code",
"metadata": {
"id": "X7QV7PbQL76F"
},
"source": [
"async def asyncSlowFunction(input_):\n",
" r = await aSlowFunc(input_)\n",
" return r"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "2IWT4a6Hgnkx"
},
"source": [
"set up a run loop function (or can call it main)"
]
},
{
"cell_type": "code",
"metadata": {
"id": "9QfrrkH6MdXu"
},
"source": [
"async def runLoop():\n",
" rtup = (aSlowFunc(i) for i in range (10))\n",
" r = await asyncio.gather(*rtup)\n",
" return r"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "nwvp4fM-NBnm"
},
"source": [
"%%time \n",
"asyncio.run(runLoop())"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "Y06Fg1VY5tG6"
},
"source": [
"# ThreadPool Executor"
]
},
{
"cell_type": "code",
"metadata": {
"id": "KgNqoP5wU_cj"
},
"source": [
"%%time\n",
"from concurrent.futures import ThreadPoolExecutor\n",
"def aSlowFunc(input_:str):\n",
" time.sleep(2)\n",
" return input\n",
"with ThreadPoolExecutor(100) as tr:\n",
" tr.map(aSlowFunc, range(100))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "u7dA32NF5wnN"
},
"source": [
"# ProcessPoolExecutor"
]
},
{
"cell_type": "code",
"metadata": {
"id": "mDueZ8XtzXkQ"
},
"source": [
"%%time\n",
"from concurrent.futures import ProcessPoolExecutor\n",
"def aSlowFunc(input_:str):\n",
" time.sleep(2)\n",
" return input\n",
"with ProcessPoolExecutor(100) as tr:\n",
" tr.map(aSlowFunc, range(100))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "OXrES_Rfz-HM"
},
"source": [
""
],
"execution_count": null,
"outputs": []
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment