Created
February 2, 2022 21:08
-
-
Save jbusecke/9f5efdf91f88b0bfc0cad230e4a50761 to your computer and use it in GitHub Desktop.
Trying to crawl the entire CMIP6 cloud catalog and check for retractions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"id": "5655c97c", | |
"metadata": {}, | |
"source": [ | |
"# Identify retracted CMIP6 datasets in the Pangeo cloud bucket\n", | |
"\n", | |
"Prompted by [this issue](https://github.com/pangeo-data/pangeo-cmip6-cloud/issues/30) I was experimenting here how we could identify datasets that have been retracted in our current holdings.\n", | |
"\n", | |
"TLDR; Using queries to a webserver for all our zarr stores (~500k) and the associated tracking_ids (~3 million), is not feasible. It would be much more efficient to get a weekly? dump of all the retracted instances/datasets, and compare our holdings with that." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "e1c06cd2", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import zarr\n", | |
"import pathlib\n", | |
"import json\n", | |
"import time\n", | |
"\n", | |
"import aiohttp\n", | |
"import asyncio\n", | |
"import time\n", | |
"\n", | |
"import dask.bag as db\n", | |
"from cmip6_preprocessing.utils import google_cmip_col\n", | |
"from tqdm.notebook import tqdm" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "38f186cb", | |
"metadata": {}, | |
"source": [ | |
"## Steps\n", | |
"- Crawl catalog for store locations (each store is uniquely mapped to an `instance_id`). \n", | |
" - Grab `tracking_id` from each store. Each store can be associated with one or more ids (some have 700!)\n", | |
" - Match each `tracking_id` to the respective `instance_id`\n", | |
"- Make an http request for each id found\n", | |
"- Check if valid and print list of defintely retracted datasets" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "061181e9", | |
"metadata": {}, | |
"source": [ | |
"### Get all handle ids from stores\n", | |
"\n", | |
"This can be parallelized but still takes a lot of time. In future catalog versions we should write the 'tracking_id' attribute into the csv." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "bb4f6067", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"CMIP6_TRACKING_PREFIX = \"21.14100/\"\n", | |
"HANDLE_FORMAT = \"https://hdl.handle.net/api/handles/{}\"\n", | |
"\n", | |
"def instance_id_from_store(store):\n", | |
" store = str(store)\n", | |
" instance_id = \".\".join(store.split(\"/\")[3:-1])\n", | |
" return instance_id \n", | |
"\n", | |
"def extract_tracking_id(store):\n", | |
" try:\n", | |
" tracking_id = zarr.open(store, mode='r').attrs.get('tracking_id')\n", | |
" except Exception as e:\n", | |
" tracking_id = f'error:{e}'\n", | |
" print(e)\n", | |
" return tracking_id\n", | |
"\n", | |
"def store_to_ids(store):\n", | |
" instance_id = instance_id_from_store(store)\n", | |
" tracking_id = extract_tracking_id(store)\n", | |
" return instance_id, tracking_id\n", | |
"\n", | |
"def url_from_tracking_id(tracking_id:str) -> str:\n", | |
" tracking_id = tracking_id\n", | |
" \"\"\"Formats a url to request actual ERRATA Info from tracking id\"\"\"\n", | |
" if not tracking_id.startswith(CMIP6_TRACKING_PREFIX):\n", | |
" raise ValueError(\"Tracking ID must begin with {}\".format(CMIP6_TRACKING_PREFIX))\n", | |
" return HANDLE_FORMAT.format(tracking_id.lstrip(\"hdl:\"))\n", | |
"\n", | |
"def convert_handle_dict(handle_dict):\n", | |
" summary_dict = {i[\"type\"]: i[\"data\"][\"value\"] for i in handle_dict[\"values\"]}\n", | |
" \n", | |
" # valid = HOSTING_NODE in summary_dict and not UNPUBLISHED_ORIGINALS in summary_dict\n", | |
" if \"HOSTING_NODE\" in summary_dict and not \"UNPUBLISHED_ORIGINALS\" in summary_dict:\n", | |
" valid = True\n", | |
" elif not \"HOSTING_NODE\" in summary_dict and \"UNPUBLISHED_ORIGINALS\" in summary_dict:\n", | |
" valid = False\n", | |
" else:\n", | |
" # Cover case where either both HOSTING_NODE and UNPUBLISHED_ORIGINALS are present or missing\n", | |
" valid = None # Cannot determine validity\n", | |
" return valid, summary_dict" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "62fcdc4f", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"col = google_cmip_col()\n", | |
"# get all stores currently in the catalog\n", | |
"stores = col.df['zstore'].tolist()#[:3000] #TODO: there are still issues with going to the full catalog. This is just a proof of concept\n", | |
"\n", | |
"# cut to a subset for testing\n", | |
"stores = stores[:500]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"id": "6f997d2d", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Mean time per request: 110.84087337999517 ms\n" | |
] | |
} | |
], | |
"source": [ | |
"# Ok lets benchmark this in sequential code (HELLLA SLOW, we got about 5e5 stores...)\n", | |
"s = time.perf_counter()\n", | |
"tracking_ids = [store_to_ids(store) for store in stores]\n", | |
"elapsed = time.perf_counter() - s\n", | |
"print(f\"Mean time per request: {elapsed/len(stores)*1000} ms\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"id": "4fa73fca", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"application/vnd.jupyter.widget-view+json": { | |
"model_id": "48993da33c0a4bc5be754ff42925e758", | |
"version_major": 2, | |
"version_minor": 0 | |
}, | |
"text/plain": [ | |
" 0%| | 0/500 [00:00<?, ?it/s]" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"# instance_ids can have more than one handle id associated (due to our time concatnation)\n", | |
"tracking_ids_split = {k:v.split('\\n') if '\\n' in v else [v] for k,v in tracking_ids}\n", | |
"\n", | |
"# Loop over all tracking ids.\n", | |
"# 1. Parse out all linked tracking_ids into a list of properly formatted urls\n", | |
"# 2. Create a reverse mapping, that maps each unique handle_id to the parent instance_id\n", | |
"tracking_reverse = {}\n", | |
"urls = []\n", | |
"for instance, handle_list in tqdm(tracking_ids_split.items()):\n", | |
" for handle in handle_list:\n", | |
" tracking_reverse[handle] = instance\n", | |
" urls.append(url_from_tracking_id(handle.replace('hdl:',''))) # this is just going back and forth, simplify" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"id": "70c6544c", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"['hdl:21.14100/257db72c-ea48-46c3-b44c-2f94fcdfa97d',\n", | |
" 'hdl:21.14100/25af452a-570a-45cf-8c4d-10a0d260c3dc',\n", | |
" 'hdl:21.14100/33d58d15-fbf0-485e-a99d-50d015568e29',\n", | |
" 'hdl:21.14100/4c4083a0-385c-4567-af22-609602c52df3',\n", | |
" 'hdl:21.14100/d2395f8f-fe74-4aec-936c-836baae5efaf',\n", | |
" 'hdl:21.14100/bd25f537-d1f5-4a42-a26d-26423c430aea',\n", | |
" 'hdl:21.14100/10f7de51-973c-4c68-950f-0d44e4e57ec3',\n", | |
" 'hdl:21.14100/e0f1bcb1-8275-49f1-a311-4904d27c9333',\n", | |
" 'hdl:21.14100/086e6be8-3f5f-446f-a018-659ab4e0c4ca',\n", | |
" 'hdl:21.14100/5dfad613-2169-410d-99b7-6591fb5072dd',\n", | |
" 'hdl:21.14100/3edf886d-7e36-4138-982e-bdf95e4db9f8',\n", | |
" 'hdl:21.14100/7b0810e9-4fa4-416d-9bec-c157cc31f06f',\n", | |
" 'hdl:21.14100/b3b9fd53-1ac0-4a95-b71f-f38a0f622478',\n", | |
" 'hdl:21.14100/ba2d8fe0-d681-4cc3-9a0e-1f04696e4f60',\n", | |
" 'hdl:21.14100/333a76d9-4b62-4f83-b7b1-77eddb56d842',\n", | |
" 'hdl:21.14100/9732c51e-badf-43ce-915e-27f62bad3568',\n", | |
" 'hdl:21.14100/a0878828-894e-46e6-96f6-eb2210425e05',\n", | |
" 'hdl:21.14100/85ded04c-fb27-4b42-a0f9-d66adef2498c',\n", | |
" 'hdl:21.14100/ef644fb6-048f-49da-8489-857cb11522a5',\n", | |
" 'hdl:21.14100/19247e66-a6aa-4e6f-8e0d-dc0959e87c90',\n", | |
" 'hdl:21.14100/cf0d0f17-b11d-4db3-b844-962bb868b295',\n", | |
" 'hdl:21.14100/72a60e86-65cd-45d9-9d5b-0940cf7d8b40',\n", | |
" 'hdl:21.14100/bfb6d3b6-9430-4b5d-8918-9afc7360df2b',\n", | |
" 'hdl:21.14100/f61a8447-cd46-4304-97a3-09030b2e66ef',\n", | |
" 'hdl:21.14100/767601ab-4a8a-4099-a758-542dee5e30a9',\n", | |
" 'hdl:21.14100/e717f419-cabe-4d11-a7b2-465a09a15004',\n", | |
" 'hdl:21.14100/8c7f754d-0426-449c-b999-75680ab4680c',\n", | |
" 'hdl:21.14100/df256905-9453-41aa-b82f-d0f4afeffd5e',\n", | |
" 'hdl:21.14100/d143f217-40b9-456a-9891-2479b098911b',\n", | |
" 'hdl:21.14100/2ac6ce5e-071f-4ca1-8982-6c29cded5129',\n", | |
" 'hdl:21.14100/f17b512f-07a4-4d7e-a5d3-7b58dc028114',\n", | |
" 'hdl:21.14100/3fc1bbb6-d5f4-4fa0-aba6-ac756db263b0',\n", | |
" 'hdl:21.14100/7696afdd-cac5-431b-afec-af243718eb0c',\n", | |
" 'hdl:21.14100/27e60797-4c09-4d8f-bb92-b929ab1e8cc3',\n", | |
" 'hdl:21.14100/c6ab7679-430b-40fa-b60e-224474c830d4',\n", | |
" 'hdl:21.14100/507bf3dd-798f-49d1-abe1-890e6d83fa6c',\n", | |
" 'hdl:21.14100/1dd497ad-83b4-4806-8f09-e9395f74f058',\n", | |
" 'hdl:21.14100/9f00f294-83b8-475a-b808-102cb9edfe31',\n", | |
" 'hdl:21.14100/7434c618-3c5d-4146-aee0-6c9647d596f2',\n", | |
" 'hdl:21.14100/68bd44e7-2313-449e-8bf5-d900eb4ffc3e',\n", | |
" 'hdl:21.14100/75722914-4513-438d-9816-301987696f82',\n", | |
" 'hdl:21.14100/218b09fb-724f-4ccf-bab9-22944201ff57',\n", | |
" 'hdl:21.14100/bf430a94-38f5-4905-a261-417b146d07d3',\n", | |
" 'hdl:21.14100/ddee9897-100c-4e03-b75f-71038f6d3f42',\n", | |
" 'hdl:21.14100/8e4df9ed-03ff-4364-81dc-35feb26be594',\n", | |
" 'hdl:21.14100/7aa71b19-5b4a-46a5-be56-ce3bb51ba562',\n", | |
" 'hdl:21.14100/be1094d0-0611-477a-a9f9-903b9b026b77',\n", | |
" 'hdl:21.14100/fa2d2b8e-5576-40cf-ada7-773f2587d524',\n", | |
" 'hdl:21.14100/c98ad982-8941-4a11-ab89-7c8f3e4f07c6',\n", | |
" 'hdl:21.14100/e18d7726-9c39-4bd4-bf81-317c3434139f',\n", | |
" 'hdl:21.14100/e499d364-3408-4695-bdbd-a999b1ad017e',\n", | |
" 'hdl:21.14100/478de0ea-f563-4a94-a8c0-1e5ffb549945',\n", | |
" 'hdl:21.14100/156f0fa1-0b61-46f2-b551-fd961b34e5c1',\n", | |
" 'hdl:21.14100/1017f003-abfa-4c3d-8cde-4b373f2d0c9c',\n", | |
" 'hdl:21.14100/bd18e3f4-edac-4ec9-b592-787564c4aba4',\n", | |
" 'hdl:21.14100/edd56ce4-01f2-48b9-81ee-fe1e8ef1a2bb',\n", | |
" 'hdl:21.14100/7ca5d10d-f52f-4d07-a1a7-1616049b30f5',\n", | |
" 'hdl:21.14100/daeb01fa-ba62-4d4e-91a7-acf2ee70a8ca',\n", | |
" 'hdl:21.14100/d658ef10-3d96-4d15-b141-fdc47e7c9ea9',\n", | |
" 'hdl:21.14100/318a3e5b-7c9a-4e5e-873c-df0b92b37341',\n", | |
" 'hdl:21.14100/7dbae013-efc6-48dd-8869-93f2dfef9401',\n", | |
" 'hdl:21.14100/0ee6330f-4630-48d4-8fff-8f4ad3b16b1f',\n", | |
" 'hdl:21.14100/bf47a4c9-f267-49e4-826e-bda601366920',\n", | |
" 'hdl:21.14100/6f11d7df-038e-4140-a58d-dae2dd73fb43',\n", | |
" 'hdl:21.14100/b693d840-7ba9-4e69-b7d0-0b701a0a6359',\n", | |
" 'hdl:21.14100/285a8c5b-9f8b-412f-b00e-80f96c4efb35',\n", | |
" 'hdl:21.14100/177c4ad0-cf63-401c-b5ca-deddb1dc12b9']" | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"tracking_ids_split['CMIP6.HighResMIP.CMCC.CMCC-CM2-HR4.highresSST-present.r1i1p1f1.6hrPlev.psl.gn.v20170706']" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"id": "1f68cb09", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"500\n", | |
"37912\n" | |
] | |
} | |
], | |
"source": [ | |
"print(len(stores))\n", | |
"print(len(urls))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "b18847e4", | |
"metadata": {}, | |
"source": [ | |
"Note how the number of urls is much higher than the number of stores." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "2561ff5c", | |
"metadata": {}, | |
"source": [ | |
"### Make http requests for all urls found" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"id": "2c61c2ce", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# YET ANOTHER from here: https://blog.jonlu.ca/posts/async-python-http\n", | |
"import aiohttp\n", | |
"import asyncio\n", | |
"import time\n", | |
"\n", | |
"results = {}\n", | |
"parallel_requests = 100 # 300 was working well# TODO: Tune the number of concurrent tasks for github actions\n", | |
"\n", | |
"conn = aiohttp.TCPConnector(limit=parallel_requests, limit_per_host=parallel_requests)#, ttl_dns_cache=300\n", | |
"session = aiohttp.ClientSession(connector=conn)\n", | |
"\n", | |
"async def gather_with_concurrency(n, *tasks):\n", | |
" semaphore = asyncio.Semaphore(n)\n", | |
"\n", | |
" async def sem_task(task):\n", | |
" async with semaphore:\n", | |
" return await task\n", | |
"\n", | |
" return await asyncio.gather(*(sem_task(task) for task in tasks))\n", | |
"\n", | |
"async def get(url):\n", | |
" try:\n", | |
" async with session.get(url) as response: #, ssl=False\n", | |
" if response.status == 200:\n", | |
"# obj = await response.read()\n", | |
" obj = await response.json()\n", | |
" results[url] = obj\n", | |
" else:\n", | |
" print(response.status)\n", | |
" except aiohttp.ClientConnectorError as e:\n", | |
" print('Connection Error', str(e))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"id": "4c2c8f3e", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"application/vnd.jupyter.widget-view+json": { | |
"model_id": "02caceaf7cda4839b65dc0f3f87e4e99", | |
"version_major": 2, | |
"version_minor": 0 | |
}, | |
"text/plain": [ | |
" 0%| | 0/38 [00:00<?, ?it/s]" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Mean time per request: 185.64117020599952 ms\n", | |
"Mean time per request: 107.03361297799711 ms\n", | |
"Mean time per request: 169.87958212900048 ms\n", | |
"Mean time per request: 103.578841413997 ms\n", | |
"Mean time per request: 104.63531202499871 ms\n", | |
"Mean time per request: 165.029803919002 ms\n", | |
"Mean time per request: 108.45666708099816 ms\n", | |
"Mean time per request: 125.51988407200044 ms\n", | |
"Mean time per request: 100.57504723300008 ms\n", | |
"Mean time per request: 99.84848651200082 ms\n", | |
"Mean time per request: 128.0989212930035 ms\n", | |
"Mean time per request: 100.96519103799801 ms\n", | |
"Mean time per request: 114.25268802600476 ms\n" | |
] | |
}, | |
{ | |
"ename": "ServerDisconnectedError", | |
"evalue": "Server disconnected", | |
"output_type": "error", | |
"traceback": [ | |
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", | |
"\u001b[0;31mServerDisconnectedError\u001b[0m Traceback (most recent call last)", | |
"\u001b[0;32m/tmp/ipykernel_2684/953761995.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 6\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mtime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mperf_counter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[0;31m# TODO: Tune the number of concurrent tasks for github actions\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 8\u001b[0;31m \u001b[0;32mawait\u001b[0m \u001b[0mgather_with_concurrency\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mparallel_requests\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0murl\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0murl\u001b[0m \u001b[0;32min\u001b[0m \u001b[0murls\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 9\u001b[0m \u001b[0melapsed\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mtime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mperf_counter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m-\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 10\u001b[0m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34mf\"Mean time per request: {elapsed/len(batch_urls)*1000} ms\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;32m/tmp/ipykernel_2684/3837418122.py\u001b[0m in \u001b[0;36mgather_with_concurrency\u001b[0;34m(n, *tasks)\u001b[0m\n\u001b[1;32m 17\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0;32mawait\u001b[0m \u001b[0mtask\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 18\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 19\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0;32mawait\u001b[0m \u001b[0masyncio\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgather\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msem_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtask\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtask\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtasks\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 20\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 21\u001b[0m \u001b[0;32masync\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0murl\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;32m/tmp/ipykernel_2684/3837418122.py\u001b[0m in \u001b[0;36msem_task\u001b[0;34m(task)\u001b[0m\n\u001b[1;32m 15\u001b[0m \u001b[0;32masync\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0msem_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtask\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 16\u001b[0m \u001b[0;32masync\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0msemaphore\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 17\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0;32mawait\u001b[0m \u001b[0mtask\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 18\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 19\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0;32mawait\u001b[0m \u001b[0masyncio\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgather\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msem_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtask\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtask\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtasks\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;32m/tmp/ipykernel_2684/3837418122.py\u001b[0m in \u001b[0;36mget\u001b[0;34m(url)\u001b[0m\n\u001b[1;32m 21\u001b[0m \u001b[0;32masync\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0murl\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 22\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 23\u001b[0;31m \u001b[0;32masync\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0msession\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0murl\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mresponse\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;31m#, ssl=False\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 24\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mresponse\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstatus\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;36m200\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 25\u001b[0m \u001b[0;31m# obj = await response.read()\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.8/site-packages/aiohttp/client.py\u001b[0m in \u001b[0;36m__aenter__\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1115\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1116\u001b[0m \u001b[0;32masync\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__aenter__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m->\u001b[0m \u001b[0m_RetType\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1117\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_resp\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mawait\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_coro\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1118\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_resp\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1119\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.8/site-packages/aiohttp/client.py\u001b[0m in \u001b[0;36m_request\u001b[0;34m(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize)\u001b[0m\n\u001b[1;32m 542\u001b[0m \u001b[0mresp\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mawait\u001b[0m \u001b[0mreq\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconn\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 543\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 544\u001b[0;31m \u001b[0;32mawait\u001b[0m \u001b[0mresp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstart\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconn\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 545\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mBaseException\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 546\u001b[0m \u001b[0mresp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mclose\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.8/site-packages/aiohttp/client_reqrep.py\u001b[0m in \u001b[0;36mstart\u001b[0;34m(self, connection)\u001b[0m\n\u001b[1;32m 888\u001b[0m \u001b[0;31m# read response\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 889\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 890\u001b[0;31m \u001b[0mmessage\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mpayload\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mawait\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_protocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;31m# type: ignore\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 891\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mhttp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mHttpProcessingError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mexc\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 892\u001b[0m raise ClientResponseError(\n", | |
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.8/site-packages/aiohttp/streams.py\u001b[0m in \u001b[0;36mread\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 602\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_waiter\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_loop\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcreate_future\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 603\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 604\u001b[0;31m \u001b[0;32mawait\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_waiter\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 605\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0masyncio\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mCancelledError\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0masyncio\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mTimeoutError\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 606\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_waiter\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
"\u001b[0;31mServerDisconnectedError\u001b[0m: Server disconnected" | |
] | |
} | |
], | |
"source": [ | |
"batchsize = 1000\n", | |
"\n", | |
"for i in tqdm(range(0, len(urls), batchsize)): #\n", | |
" # does this help to open a new session for each batch?\n", | |
" batch_urls = urls[i:i+batchsize]\n", | |
" s = time.perf_counter()\n", | |
" # TODO: Tune the number of concurrent tasks for github actions\n", | |
" await gather_with_concurrency(parallel_requests, *[get(url) for url in urls])\n", | |
" elapsed = time.perf_counter() - s\n", | |
" print(f\"Mean time per request: {elapsed/len(batch_urls)*1000} ms\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"id": "7507c93d", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"retracted_instance_ids = []\n", | |
"failed_handles = []\n", | |
"\n", | |
"for k, single_dict in results.items():\n", | |
" valid, summary_dict = convert_handle_dict(single_dict)\n", | |
" handle_id = 'hdl:'+'/'.join(summary_dict['URL'].split('/')[-2:])\n", | |
" if valid is False:\n", | |
" # in the reverse dict the key can have multiple handle ids\n", | |
" instance_id = tracking_reverse.get(handle_id)\n", | |
" if instance_id is None:\n", | |
" print(summary_dict)\n", | |
" failed_handles.append(handle_id)\n", | |
" else:\n", | |
" retracted_instance_ids.append(instance_id)\n", | |
"assert len(failed_handles) == 0" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"id": "e3253331", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"['CMIP6.HighResMIP.CMCC.CMCC-CM2-HR4.highresSST-present.r1i1p1f1.Amon.prw.gn.v20170706',\n", | |
" 'CMIP6.HighResMIP.CMCC.CMCC-CM2-HR4.highresSST-present.r1i1p1f1.Amon.tauu.gn.v20170706',\n", | |
" 'CMIP6.HighResMIP.CMCC.CMCC-CM2-HR4.highresSST-present.r1i1p1f1.Amon.tauv.gn.v20170706',\n", | |
" 'CMIP6.HighResMIP.CMCC.CMCC-CM2-VHR4.highresSST-present.r1i1p1f1.Amon.tauu.gn.v20170927',\n", | |
" 'CMIP6.HighResMIP.CMCC.CMCC-CM2-VHR4.highresSST-present.r1i1p1f1.Amon.tauv.gn.v20170927',\n", | |
" 'CMIP6.HighResMIP.CMCC.CMCC-CM2-VHR4.highresSST-present.r1i1p1f1.Amon.prw.gn.v20170927']" | |
] | |
}, | |
"execution_count": 17, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"retracted_instance_ids" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 18, | |
"id": "24d98a92", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"69.44444444444444" | |
] | |
}, | |
"execution_count": 18, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"2.5e6*100/1000/60/60" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "9f8c1e1d", | |
"metadata": {}, | |
"source": [ | |
"I hit several snags when I tried to scale this up to the full amount of catalogs (~2.5 million tracking ids). Access time varies significantly and having that many concurrent connections probably leads to being blocked.\n", | |
"\n", | |
"Even if we get this to work, assuming an average 100ms (i have seen higher values) per request, we would need several days to query all tracking ids. This seems excessive. " | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "4a9b523f", | |
"metadata": {}, | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.8.12" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment