Created
May 11, 2023 22:43
-
-
Save simon-mo/cda85194f4bcfcfaaf0779851aef440f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# s3://feature-store-datasets/wikipedia/diffs/\n", | |
"\n", | |
"# NOTE: this doesn't deal with nested prefix right now (i.e. no / in the name). It assume a flat layout.\n", | |
"# You can use https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#AmazonS3-ListObjectsV2-response-CommonPrefixes to find\n", | |
"# the next / delimiter." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import boto3\n", | |
"import string\n", | |
"\n", | |
"# can we use regex that enumerate. If we do alphanumeric the search space blows up quickly\n", | |
"# another approach is preioritize the pattern we have seem before using some sort of episilon greedy style exploration\n", | |
"alphanumeric = string.digits\n", | |
"bucket_name = \"feature-store-datasets\"\n", | |
"search_prefix = \"wikipedia/diffs/\"\n", | |
"\n", | |
"s3 = boto3.client(\"s3\")\n", | |
"\n", | |
"\n", | |
"def probe_once(prefix):\n", | |
" resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, MaxKeys=1)\n", | |
" return {\"prefix_searched\": prefix, \"has_objects\": resp[\"KeyCount\"] > 0}" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Searching 10 prefixes\n", | |
"Found 1 prefixes\n", | |
"Searching 10 prefixes\n", | |
"Found 1 prefixes\n", | |
"Searching 10 prefixes\n", | |
"Found 4 prefixes\n", | |
"Searching 40 prefixes\n", | |
"Found 14 prefixes\n", | |
"Searching 140 prefixes\n", | |
"Found 66 prefixes\n", | |
"Searching 660 prefixes\n", | |
"Found 497 prefixes\n", | |
"CPU times: user 5.62 s, sys: 3.91 s, total: 9.53 s\n", | |
"Wall time: 2.67 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"\n", | |
"from concurrent.futures import ThreadPoolExecutor, as_completed\n", | |
"\n", | |
"search_round = 10 # we will search [0-9]{search_length} for hits\n", | |
"stop_at_num_found = 400 # stop searching when we found this many prefixes\n", | |
"\n", | |
"thread_pool = ThreadPoolExecutor(max_workers=100)\n", | |
"\n", | |
"to_search_tasks = [search_prefix + char for char in alphanumeric]\n", | |
"found_prefix = []\n", | |
"\n", | |
"for _ in range(search_round):\n", | |
" found_prefix.clear()\n", | |
"\n", | |
" # submit all the tasks\n", | |
" print(f\"Searching {len(to_search_tasks)} prefixes\")\n", | |
" tasks = [thread_pool.submit(probe_once, prefix) for prefix in to_search_tasks]\n", | |
"\n", | |
" # iterate over the task as they complete\n", | |
" for future in as_completed(tasks):\n", | |
" assert future.done()\n", | |
" if future.exception():\n", | |
" print(f\"Exception {future.exception()}\")\n", | |
" raise future.exception()\n", | |
" elif future.result() and future.result()[\"has_objects\"]:\n", | |
" found_prefix.append(future.result()[\"prefix_searched\"])\n", | |
" print(f\"Found {len(found_prefix)} prefixes\")\n", | |
" if len(found_prefix) > stop_at_num_found:\n", | |
" break\n", | |
"\n", | |
" # update the search space\n", | |
" to_search_tasks = [prefix + char for prefix in found_prefix for char in alphanumeric]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Working on 497 prefixes\n", | |
"Number of objects found 0\n", | |
"Number of objects found 9476\n", | |
"Number of objects found 29973\n", | |
"Number of objects found 58030\n", | |
"CPU times: user 5.61 s, sys: 2.63 s, total: 8.23 s\n", | |
"Wall time: 4.8 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"\n", | |
"from concurrent.futures import ThreadPoolExecutor, as_completed\n", | |
"from queue import Queue\n", | |
"import time\n", | |
"\n", | |
"# full listing!\n", | |
"print(f\"Working on {len(found_prefix)} prefixes\")\n", | |
"\n", | |
"thread_pool = ThreadPoolExecutor(max_workers=100)\n", | |
"queue = Queue()\n", | |
"total_obj_count = 0\n", | |
"\n", | |
"\n", | |
"def list_objects_full(prefix):\n", | |
" continuation_token = None\n", | |
"\n", | |
" while True:\n", | |
" resp = s3.list_objects_v2(\n", | |
" Bucket=bucket_name, Prefix=prefix, MaxKeys=1000, **({\"ContinuationToken\": continuation_token} if continuation_token else {})\n", | |
" )\n", | |
" # print(f\"Found {len(resp['Contents'])} objects for {prefix}\")\n", | |
"\n", | |
" # put the content in the queue\n", | |
" queue.put(resp[\"Contents\"])\n", | |
"\n", | |
" global total_obj_count\n", | |
" total_obj_count += len(resp[\"Contents\"])\n", | |
"\n", | |
" # check if we need to continue\n", | |
" if not resp[\"IsTruncated\"]:\n", | |
" break\n", | |
"\n", | |
" continuation_token = resp[\"NextContinuationToken\"]\n", | |
"\n", | |
"\n", | |
"tasks = [thread_pool.submit(list_objects_full, prefix) for prefix in found_prefix]\n", | |
"while not all(task.done() for task in tasks):\n", | |
" print(f\"Number of objects found {total_obj_count}\")\n", | |
" time.sleep(1)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"66102" | |
] | |
}, | |
"execution_count": 5, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"full_result = []\n", | |
"while not queue.empty():\n", | |
" full_result.extend(queue.get())\n", | |
"len(full_result)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "base", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.9.13" | |
}, | |
"orig_nbformat": 4 | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment