Skip to content

Instantly share code, notes, and snippets.

@Hoeze
Created February 20, 2021 17:26
Show Gist options
  • Save Hoeze/26430689bb3252ebb71b9ae08242d09a to your computer and use it in GitHub Desktop.
Save Hoeze/26430689bb3252ebb71b9ae08242d09a to your computer and use it in GitHub Desktop.
dask on ray memory test
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "arranged-thesaurus",
"metadata": {},
"outputs": [],
"source": [
"import os"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "fewer-coupon",
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"\n",
"import psutil\n",
"\n",
"__all__ = (\"memory_limit\", \"MEMORY_LIMIT\")\n",
"\n",
"\n",
"def memory_limit():\n",
" \"\"\"Get the memory limit (in bytes) for this system.\n",
"\n",
" Takes the minimum value from the following locations:\n",
"\n",
" - Total system host memory\n",
" - Cgroups limit (if set)\n",
" - RSS rlimit (if set)\n",
" \"\"\"\n",
" limit = psutil.virtual_memory().total\n",
"\n",
" # Check cgroups if available\n",
" if sys.platform == \"linux\":\n",
" try:\n",
" with open(\"/sys/fs/cgroup/memory/memory.limit_in_bytes\") as f:\n",
" cgroups_limit = int(f.read())\n",
" if cgroups_limit > 0:\n",
" limit = min(limit, cgroups_limit)\n",
" except Exception:\n",
" pass\n",
"\n",
" # Check rlimit if available\n",
" try:\n",
" import resource\n",
"\n",
" hard_limit = resource.getrlimit(resource.RLIMIT_RSS)[1]\n",
" if hard_limit > 0:\n",
" limit = min(limit, hard_limit)\n",
" except (ImportError, OSError):\n",
" pass\n",
"\n",
" return limit\n",
"\n",
"\n",
"MEMORY_LIMIT = memory_limit()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "blind-emphasis",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"16777216000"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"MEMORY_LIMIT"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "another-contractor",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import joblib\n",
"joblib.cpu_count()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "suffering-excuse",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2021-02-20 03:51:37,157\tINFO services.py:1174 -- View the Ray dashboard at \u001b[1m\u001b[32mhttp://127.0.0.1:8265\u001b[39m\u001b[22m\n"
]
},
{
"data": {
"text/plain": [
"<dask.config.set at 0x2b7594f12d90>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import joblib\n",
"import ray\n",
"import dask\n",
"import json\n",
"\n",
"# Start Ray.\n",
"# Tip: If you're connecting to an existing cluster, use ray.init(address=\"auto\").\n",
"\n",
"spill_dir = os.path.join(os.environ[\"TMPDIR\"], \"ray_spill\")\n",
"try:\n",
" os.mkdir(spill_dir)\n",
"except:\n",
" pass\n",
"\n",
"ray.init(\n",
" _memory=MEMORY_LIMIT * 0.7,\n",
" object_store_memory=MEMORY_LIMIT * 0.3,\n",
" num_cpus=joblib.cpu_count(),\n",
" _temp_dir=os.environ[\"TMPDIR\"],\n",
" _system_config={\n",
" \"automatic_object_spilling_enabled\": True,\n",
" \"object_spilling_config\": json.dumps(\n",
" {\"type\": \"filesystem\", \"params\": {\"directory_path\": spill_dir}},\n",
" )\n",
" },\n",
")\n",
"\n",
"from ray.util.dask import ray_dask_get\n",
"from ray.util.joblib import register_ray\n",
"register_ray()\n",
"\n",
"dask.config.set(scheduler=ray_dask_get)\n",
"\n",
"# The Dask scheduler submits the underlying task graph to Ray.\n",
"# z.compute(scheduler=ray_dask_get)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "referenced-december",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'node:192.168.16.13': 1.0,\n",
" 'memory': 224.0,\n",
" 'CPU': 4.0,\n",
" 'object_store_memory': 66.0}"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ray.cluster_resources()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "temporal-topic",
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import dask\n",
"import dask.array as da"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "useful-bernard",
"metadata": {},
"outputs": [],
"source": [
"x = np.random.randint(0, 100, [1024, 1024, 1024], dtype=\"int32\")"
]
},
{
"cell_type": "code",
"execution_count": 40,
"id": "weighted-dealing",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"array size: 4GB\n"
]
}
],
"source": [
"print(f\"array size: {x.nbytes // 1024**3}GB\")"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "streaming-medicare",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table>\n",
"<tr>\n",
"<td>\n",
"<table>\n",
" <thead>\n",
" <tr><td> </td><th> Array </th><th> Chunk </th></tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr><th> Bytes </th><td> 4.29 GB </td> <td> 67.11 MB </td></tr>\n",
" <tr><th> Shape </th><td> (1024, 1024, 1024) </td> <td> (256, 256, 256) </td></tr>\n",
" <tr><th> Count </th><td> 64 Tasks </td><td> 64 Chunks </td></tr>\n",
" <tr><th> Type </th><td> int32 </td><td> numpy.ndarray </td></tr>\n",
" </tbody>\n",
"</table>\n",
"</td>\n",
"<td>\n",
"<svg width=\"250\" height=\"240\" style=\"stroke:rgb(0,0,0);stroke-width:1\" >\n",
"\n",
" <!-- Horizontal lines -->\n",
" <line x1=\"10\" y1=\"0\" x2=\"80\" y2=\"70\" style=\"stroke-width:2\" />\n",
" <line x1=\"10\" y1=\"30\" x2=\"80\" y2=\"100\" />\n",
" <line x1=\"10\" y1=\"60\" x2=\"80\" y2=\"130\" />\n",
" <line x1=\"10\" y1=\"90\" x2=\"80\" y2=\"160\" />\n",
" <line x1=\"10\" y1=\"120\" x2=\"80\" y2=\"190\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Vertical lines -->\n",
" <line x1=\"10\" y1=\"0\" x2=\"10\" y2=\"120\" style=\"stroke-width:2\" />\n",
" <line x1=\"27\" y1=\"17\" x2=\"27\" y2=\"137\" />\n",
" <line x1=\"45\" y1=\"35\" x2=\"45\" y2=\"155\" />\n",
" <line x1=\"62\" y1=\"52\" x2=\"62\" y2=\"172\" />\n",
" <line x1=\"80\" y1=\"70\" x2=\"80\" y2=\"190\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Colored Rectangle -->\n",
" <polygon points=\"10.0,0.0 80.58823529411765,70.58823529411765 80.58823529411765,190.58823529411765 10.0,120.0\" style=\"fill:#ECB172A0;stroke-width:0\"/>\n",
"\n",
" <!-- Horizontal lines -->\n",
" <line x1=\"10\" y1=\"0\" x2=\"130\" y2=\"0\" style=\"stroke-width:2\" />\n",
" <line x1=\"27\" y1=\"17\" x2=\"147\" y2=\"17\" />\n",
" <line x1=\"45\" y1=\"35\" x2=\"165\" y2=\"35\" />\n",
" <line x1=\"62\" y1=\"52\" x2=\"182\" y2=\"52\" />\n",
" <line x1=\"80\" y1=\"70\" x2=\"200\" y2=\"70\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Vertical lines -->\n",
" <line x1=\"10\" y1=\"0\" x2=\"80\" y2=\"70\" style=\"stroke-width:2\" />\n",
" <line x1=\"40\" y1=\"0\" x2=\"110\" y2=\"70\" />\n",
" <line x1=\"70\" y1=\"0\" x2=\"140\" y2=\"70\" />\n",
" <line x1=\"100\" y1=\"0\" x2=\"170\" y2=\"70\" />\n",
" <line x1=\"130\" y1=\"0\" x2=\"200\" y2=\"70\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Colored Rectangle -->\n",
" <polygon points=\"10.0,0.0 130.0,0.0 200.58823529411765,70.58823529411765 80.58823529411765,70.58823529411765\" style=\"fill:#ECB172A0;stroke-width:0\"/>\n",
"\n",
" <!-- Horizontal lines -->\n",
" <line x1=\"80\" y1=\"70\" x2=\"200\" y2=\"70\" style=\"stroke-width:2\" />\n",
" <line x1=\"80\" y1=\"100\" x2=\"200\" y2=\"100\" />\n",
" <line x1=\"80\" y1=\"130\" x2=\"200\" y2=\"130\" />\n",
" <line x1=\"80\" y1=\"160\" x2=\"200\" y2=\"160\" />\n",
" <line x1=\"80\" y1=\"190\" x2=\"200\" y2=\"190\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Vertical lines -->\n",
" <line x1=\"80\" y1=\"70\" x2=\"80\" y2=\"190\" style=\"stroke-width:2\" />\n",
" <line x1=\"110\" y1=\"70\" x2=\"110\" y2=\"190\" />\n",
" <line x1=\"140\" y1=\"70\" x2=\"140\" y2=\"190\" />\n",
" <line x1=\"170\" y1=\"70\" x2=\"170\" y2=\"190\" />\n",
" <line x1=\"200\" y1=\"70\" x2=\"200\" y2=\"190\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Colored Rectangle -->\n",
" <polygon points=\"80.58823529411765,70.58823529411765 200.58823529411765,70.58823529411765 200.58823529411765,190.58823529411765 80.58823529411765,190.58823529411765\" style=\"fill:#ECB172A0;stroke-width:0\"/>\n",
"\n",
" <!-- Text -->\n",
" <text x=\"140.588235\" y=\"210.588235\" font-size=\"1.0rem\" font-weight=\"100\" text-anchor=\"middle\" >1024</text>\n",
" <text x=\"220.588235\" y=\"130.588235\" font-size=\"1.0rem\" font-weight=\"100\" text-anchor=\"middle\" transform=\"rotate(-90,220.588235,130.588235)\">1024</text>\n",
" <text x=\"35.294118\" y=\"175.294118\" font-size=\"1.0rem\" font-weight=\"100\" text-anchor=\"middle\" transform=\"rotate(45,35.294118,175.294118)\">1024</text>\n",
"</svg>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"dask.array<array, shape=(1024, 1024, 1024), dtype=int32, chunksize=(256, 256, 256), chunktype=numpy.ndarray>"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"x = da.from_array(x)\n",
"x"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "fabulous-princeton",
"metadata": {},
"outputs": [],
"source": [
"persisted = x.rechunk([128, 1024, 1024]).persist()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "common-shepherd",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table>\n",
"<tr>\n",
"<td>\n",
"<table>\n",
" <thead>\n",
" <tr><td> </td><th> Array </th><th> Chunk </th></tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr><th> Bytes </th><td> 4.29 GB </td> <td> 536.87 MB </td></tr>\n",
" <tr><th> Shape </th><td> (1024, 1024, 1024) </td> <td> (128, 1024, 1024) </td></tr>\n",
" <tr><th> Count </th><td> 8 Tasks </td><td> 8 Chunks </td></tr>\n",
" <tr><th> Type </th><td> int32 </td><td> numpy.ndarray </td></tr>\n",
" </tbody>\n",
"</table>\n",
"</td>\n",
"<td>\n",
"<svg width=\"250\" height=\"240\" style=\"stroke:rgb(0,0,0);stroke-width:1\" >\n",
"\n",
" <!-- Horizontal lines -->\n",
" <line x1=\"10\" y1=\"0\" x2=\"80\" y2=\"70\" style=\"stroke-width:2\" />\n",
" <line x1=\"10\" y1=\"120\" x2=\"80\" y2=\"190\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Vertical lines -->\n",
" <line x1=\"10\" y1=\"0\" x2=\"10\" y2=\"120\" style=\"stroke-width:2\" />\n",
" <line x1=\"18\" y1=\"8\" x2=\"18\" y2=\"128\" />\n",
" <line x1=\"27\" y1=\"17\" x2=\"27\" y2=\"137\" />\n",
" <line x1=\"36\" y1=\"26\" x2=\"36\" y2=\"146\" />\n",
" <line x1=\"45\" y1=\"35\" x2=\"45\" y2=\"155\" />\n",
" <line x1=\"54\" y1=\"44\" x2=\"54\" y2=\"164\" />\n",
" <line x1=\"62\" y1=\"52\" x2=\"62\" y2=\"172\" />\n",
" <line x1=\"71\" y1=\"61\" x2=\"71\" y2=\"181\" />\n",
" <line x1=\"80\" y1=\"70\" x2=\"80\" y2=\"190\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Colored Rectangle -->\n",
" <polygon points=\"10.0,0.0 80.58823529411765,70.58823529411765 80.58823529411765,190.58823529411765 10.0,120.0\" style=\"fill:#ECB172A0;stroke-width:0\"/>\n",
"\n",
" <!-- Horizontal lines -->\n",
" <line x1=\"10\" y1=\"0\" x2=\"130\" y2=\"0\" style=\"stroke-width:2\" />\n",
" <line x1=\"18\" y1=\"8\" x2=\"138\" y2=\"8\" />\n",
" <line x1=\"27\" y1=\"17\" x2=\"147\" y2=\"17\" />\n",
" <line x1=\"36\" y1=\"26\" x2=\"156\" y2=\"26\" />\n",
" <line x1=\"45\" y1=\"35\" x2=\"165\" y2=\"35\" />\n",
" <line x1=\"54\" y1=\"44\" x2=\"174\" y2=\"44\" />\n",
" <line x1=\"62\" y1=\"52\" x2=\"182\" y2=\"52\" />\n",
" <line x1=\"71\" y1=\"61\" x2=\"191\" y2=\"61\" />\n",
" <line x1=\"80\" y1=\"70\" x2=\"200\" y2=\"70\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Vertical lines -->\n",
" <line x1=\"10\" y1=\"0\" x2=\"80\" y2=\"70\" style=\"stroke-width:2\" />\n",
" <line x1=\"130\" y1=\"0\" x2=\"200\" y2=\"70\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Colored Rectangle -->\n",
" <polygon points=\"10.0,0.0 130.0,0.0 200.58823529411765,70.58823529411765 80.58823529411765,70.58823529411765\" style=\"fill:#ECB172A0;stroke-width:0\"/>\n",
"\n",
" <!-- Horizontal lines -->\n",
" <line x1=\"80\" y1=\"70\" x2=\"200\" y2=\"70\" style=\"stroke-width:2\" />\n",
" <line x1=\"80\" y1=\"190\" x2=\"200\" y2=\"190\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Vertical lines -->\n",
" <line x1=\"80\" y1=\"70\" x2=\"80\" y2=\"190\" style=\"stroke-width:2\" />\n",
" <line x1=\"200\" y1=\"70\" x2=\"200\" y2=\"190\" style=\"stroke-width:2\" />\n",
"\n",
" <!-- Colored Rectangle -->\n",
" <polygon points=\"80.58823529411765,70.58823529411765 200.58823529411765,70.58823529411765 200.58823529411765,190.58823529411765 80.58823529411765,190.58823529411765\" style=\"fill:#ECB172A0;stroke-width:0\"/>\n",
"\n",
" <!-- Text -->\n",
" <text x=\"140.588235\" y=\"210.588235\" font-size=\"1.0rem\" font-weight=\"100\" text-anchor=\"middle\" >1024</text>\n",
" <text x=\"220.588235\" y=\"130.588235\" font-size=\"1.0rem\" font-weight=\"100\" text-anchor=\"middle\" transform=\"rotate(-90,220.588235,130.588235)\">1024</text>\n",
" <text x=\"35.294118\" y=\"175.294118\" font-size=\"1.0rem\" font-weight=\"100\" text-anchor=\"middle\" transform=\"rotate(45,35.294118,175.294118)\">1024</text>\n",
"</svg>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"dask.array<rechunk-merge, shape=(1024, 1024, 1024), dtype=int32, chunksize=(128, 1024, 1024), chunktype=numpy.ndarray>"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"persisted"
]
},
{
"cell_type": "code",
"execution_count": 31,
"id": "russian-profession",
"metadata": {},
"outputs": [],
"source": [
"def ranges(N, nb):\n",
" step = N / nb\n",
" return [slice(round(step*i), round(step*(i+1))) for i in range(nb)]\n"
]
},
{
"cell_type": "code",
"execution_count": 32,
"id": "heavy-insurance",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[slice(0, 128, None),\n",
" slice(128, 256, None),\n",
" slice(256, 384, None),\n",
" slice(384, 512, None),\n",
" slice(512, 640, None),\n",
" slice(640, 768, None),\n",
" slice(768, 896, None),\n",
" slice(896, 1024, None)]"
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"idx = ranges(1024, 8)\n",
"idx"
]
},
{
"cell_type": "code",
"execution_count": 33,
"id": "distant-methodology",
"metadata": {},
"outputs": [],
"source": [
"def get_chunk(data, idxs):\n",
" for idx in idxs:\n",
" yield data[idx]"
]
},
{
"cell_type": "code",
"execution_count": 37,
"id": "physical-correction",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[dask.array<getitem, shape=(128, 1024, 1024), dtype=int32, chunksize=(128, 1024, 1024), chunktype=numpy.ndarray>,\n",
" dask.array<getitem, shape=(128, 1024, 1024), dtype=int32, chunksize=(128, 1024, 1024), chunktype=numpy.ndarray>,\n",
" dask.array<getitem, shape=(128, 1024, 1024), dtype=int32, chunksize=(128, 1024, 1024), chunktype=numpy.ndarray>,\n",
" dask.array<getitem, shape=(128, 1024, 1024), dtype=int32, chunksize=(128, 1024, 1024), chunktype=numpy.ndarray>,\n",
" dask.array<getitem, shape=(128, 1024, 1024), dtype=int32, chunksize=(128, 1024, 1024), chunktype=numpy.ndarray>,\n",
" dask.array<getitem, shape=(128, 1024, 1024), dtype=int32, chunksize=(128, 1024, 1024), chunktype=numpy.ndarray>,\n",
" dask.array<getitem, shape=(128, 1024, 1024), dtype=int32, chunksize=(128, 1024, 1024), chunktype=numpy.ndarray>,\n",
" dask.array<getitem, shape=(128, 1024, 1024), dtype=int32, chunksize=(128, 1024, 1024), chunktype=numpy.ndarray>]"
]
},
"execution_count": 37,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"chunk_list = list(get_chunk(persisted, idx))\n",
"chunk_list"
]
},
{
"cell_type": "code",
"execution_count": 49,
"id": "unlikely-allah",
"metadata": {},
"outputs": [
{
"ename": "ObjectStoreFullError",
"evalue": "Failed to put object ffffffffffffffffffffffffffffffffffffffff0100000004000000 in object store because it is full. Object size is 4294968945 bytes.\nThe local object store is full of objects that are still in scope and cannot be evicted. Tip: Use the `ray memory` command to list active objects in the cluster.",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mObjectStoreFullError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-49-d3e9eb661909>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mit\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mray\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mutil\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0miter\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfrom_items\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mchunk_list\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m4\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/opt/anaconda/envs/ray/lib/python3.7/site-packages/ray/util/iter.py\u001b[0m in \u001b[0;36mfrom_items\u001b[0;34m(items, num_shards, repeat)\u001b[0m\n\u001b[1;32m 31\u001b[0m \u001b[0mitems\u001b[0m \u001b[0;32mand\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mitems\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__name__\u001b[0m \u001b[0;32mor\u001b[0m \u001b[0;34m\"None\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mlen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mitems\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mnum_shards\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 32\u001b[0m \", repeat=True\" if repeat else \"\")\n\u001b[0;32m---> 33\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mfrom_iterators\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mshards\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mrepeat\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mrepeat\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mname\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mname\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 34\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 35\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/opt/anaconda/envs/ray/lib/python3.7/site-packages/ray/util/iter.py\u001b[0m in \u001b[0;36mfrom_iterators\u001b[0;34m(generators, repeat, name)\u001b[0m\n\u001b[1;32m 93\u001b[0m \"\"\"\n\u001b[1;32m 94\u001b[0m \u001b[0mworker_cls\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mray\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mremote\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mParallelIteratorWorker\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 95\u001b[0;31m \u001b[0mactors\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mworker_cls\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mremote\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mg\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mrepeat\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mgenerators\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 96\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mname\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 97\u001b[0m name = \"from_iterators[shards={}{}]\".format(\n",
"\u001b[0;32m/opt/anaconda/envs/ray/lib/python3.7/site-packages/ray/util/iter.py\u001b[0m in \u001b[0;36m<listcomp>\u001b[0;34m(.0)\u001b[0m\n\u001b[1;32m 93\u001b[0m \"\"\"\n\u001b[1;32m 94\u001b[0m \u001b[0mworker_cls\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mray\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mremote\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mParallelIteratorWorker\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 95\u001b[0;31m \u001b[0mactors\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mworker_cls\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mremote\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mg\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mrepeat\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mgenerators\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 96\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mname\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 97\u001b[0m name = \"from_iterators[shards={}{}]\".format(\n",
"\u001b[0;32m/opt/anaconda/envs/ray/lib/python3.7/site-packages/ray/actor.py\u001b[0m in \u001b[0;36mremote\u001b[0;34m(self, *args, **kwargs)\u001b[0m\n\u001b[1;32m 408\u001b[0m \u001b[0mA\u001b[0m \u001b[0mhandle\u001b[0m \u001b[0mto\u001b[0m \u001b[0mthe\u001b[0m \u001b[0mnewly\u001b[0m \u001b[0mcreated\u001b[0m \u001b[0mactor\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 409\u001b[0m \"\"\"\n\u001b[0;32m--> 410\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_remote\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 411\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 412\u001b[0m def options(self,\n",
"\u001b[0;32m/opt/anaconda/envs/ray/lib/python3.7/site-packages/ray/actor.py\u001b[0m in \u001b[0;36m_remote\u001b[0;34m(self, args, kwargs, num_cpus, num_gpus, memory, object_store_memory, resources, accelerator_type, max_concurrency, max_restarts, max_task_retries, name, lifetime, placement_group, placement_group_bundle_index, placement_group_capture_child_tasks, override_environment_variables)\u001b[0m\n\u001b[1;32m 678\u001b[0m \u001b[0mextension_data\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mstr\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mactor_method_cpu\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 679\u001b[0m \u001b[0moverride_environment_variables\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0moverride_environment_variables\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 680\u001b[0;31m or dict())\n\u001b[0m\u001b[1;32m 681\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 682\u001b[0m actor_handle = ActorHandle(\n",
"\u001b[0;32mpython/ray/_raylet.pyx\u001b[0m in \u001b[0;36mray._raylet.CoreWorker.create_actor\u001b[0;34m()\u001b[0m\n",
"\u001b[0;32mpython/ray/_raylet.pyx\u001b[0m in \u001b[0;36mray._raylet.CoreWorker.create_actor\u001b[0;34m()\u001b[0m\n",
"\u001b[0;32mpython/ray/_raylet.pyx\u001b[0m in \u001b[0;36mray._raylet.prepare_args\u001b[0;34m()\u001b[0m\n",
"\u001b[0;32mpython/ray/_raylet.pyx\u001b[0m in \u001b[0;36mray._raylet.CoreWorker.put_serialized_object\u001b[0;34m()\u001b[0m\n",
"\u001b[0;32mpython/ray/_raylet.pyx\u001b[0m in \u001b[0;36mray._raylet.CoreWorker._create_put_buffer\u001b[0;34m()\u001b[0m\n",
"\u001b[0;32mpython/ray/_raylet.pyx\u001b[0m in \u001b[0;36mray._raylet.check_status\u001b[0;34m()\u001b[0m\n",
"\u001b[0;31mObjectStoreFullError\u001b[0m: Failed to put object ffffffffffffffffffffffffffffffffffffffff0100000004000000 in object store because it is full. Object size is 4294968945 bytes.\nThe local object store is full of objects that are still in scope and cannot be evicted. Tip: Use the `ray memory` command to list active objects in the cluster."
]
}
],
"source": [
"it = ray.util.iter.from_items(chunk_list, 4)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "scheduled-township",
"metadata": {},
"outputs": [],
"source": [
"data_it = it.for_each(lambda data: np.sum(data))\n",
"data_it = data_it.gather_sync()\n",
"\n",
"data_list = list(data_it)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "center-gothic",
"metadata": {},
"outputs": [],
"source": [
"data_list"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "graphic-dictionary",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:anaconda-ray]",
"language": "python",
"name": "conda-env-anaconda-ray-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment