Skip to content

Instantly share code, notes, and snippets.

@naomi-henderson
Created September 27, 2019 19:33
Show Gist options
  • Save naomi-henderson/0dd12fe267a52750f07f07c029498f7d to your computer and use it in GitHub Desktop.
Save naomi-henderson/0dd12fe267a52750f07f07c029498f7d to your computer and use it in GitHub Desktop.
get google spreadsheet and turn into ESGF API search requests
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import gspread\n",
"from oauth2client.service_account import ServiceAccountCredentials"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\"\"\"ESGF API Search Results to Pandas Dataframes\n",
"\"\"\"\n",
"\n",
"from __future__ import print_function\n",
"\n",
"import warnings\n",
"from datetime import datetime\n",
"import dask\n",
"import requests\n",
"import pandas as pd\n",
"from collections import OrderedDict\n",
"\n",
"# API AT: https://github.com/ESGF/esgf.github.io/wiki/ESGF_Search_REST_API\n",
"\n",
"def _check_doc_for_malformed_id(d):\n",
" source_id = d['source_id'][0]\n",
" expt_id = d['experiment_id'][0]\n",
" if not f\"{source_id}_{expt_id}\" in d['id']:\n",
" raise ValueError(f\"Dataset id {d['id']} is malformed\")\n",
"\n",
"def _maybe_squeze_values(d):\n",
" def _maybe_squeeze(value):\n",
" if isinstance(value, str):\n",
" return value\n",
" try:\n",
" if len(value)==1:\n",
" return value[0]\n",
" except TypeError:\n",
" return(value)\n",
" return {k: _maybe_squeeze(v) for k, v in d.items()}\n",
"\n",
"def _get_request(server, verbose=False, **payload):\n",
" client = requests.session()\n",
" url_keys = []\n",
" url_keys = [\"{}={}\".format(k, payload[k]) for k in payload]\n",
" url = \"{}/?{}\".format(server, \"&\".join(url_keys))\n",
" if verbose:\n",
" print(url)\n",
" r = client.get(url)\n",
" r.raise_for_status()\n",
" resp = r.json()[\"response\"]\n",
" return resp\n",
"\n",
"def _get_page_dataframe(server, expected_size, offset=0,\n",
" filter_server_url=None, verbose=False,\n",
" **payload):\n",
"\n",
" resp = _get_request(server, offset=offset, verbose=verbose, **payload)\n",
"\n",
" docs = resp[\"docs\"]\n",
" assert len(docs) == expected_size\n",
"\n",
" all_files = []\n",
" for d in docs:\n",
" try:\n",
" _check_doc_for_malformed_id(d)\n",
" except ValueError:\n",
" continue\n",
" dataset_id = d['dataset_id']\n",
" item = OrderedDict(dataset_id=dataset_id, id=d['id'])\n",
" target_urls = d.pop('url')\n",
" item.update(_maybe_squeze_values(d))\n",
" for f in target_urls:\n",
" access_url, mime_type, service_type = f.split(\"|\")\n",
" if service_type == 'OPENDAP':\n",
" access_url = access_url.replace('.html', '')\n",
" if filter_server_url is None or filter_server_url in access_url:\n",
" item.update({f'{service_type}_url': access_url})\n",
" all_files.append(item)\n",
"\n",
" return pd.DataFrame(all_files)\n",
"\n",
"\n",
"_get_page_dataframe_d = dask.delayed(_get_page_dataframe)\n",
"\n",
"\n",
"def _get_csrf_token(server):\n",
" client = requests.session()\n",
" client.get(server)\n",
" if 'csrftoken' in client.cookies:\n",
" # Django 1.6 and up\n",
" csrftoken = client.cookies['csrftoken']\n",
" else:\n",
" # older versions\n",
" csrftoken = client.cookies['csrf']\n",
" return csrftoken\n",
"\n",
"\n",
"def esgf_search(server=\"https://esgf-node.llnl.gov/esg-search/search\",\n",
" project=\"CMIP6\", page_size=10,\n",
" # this option should not be necessary with local_node=True\n",
" filter_server_url=None, local_node=True,\n",
" verbose=False, format=\"application%2Fsolr%2Bjson\",\n",
" use_csrf=False, delayed=False, **search):\n",
"\n",
"\n",
" payload = search\n",
" #payload[\"project\"] = project\n",
" payload[\"type\"]= \"File\"\n",
"\n",
" if local_node:\n",
" payload[\"distrib\"] = \"false\"\n",
"\n",
" if use_csrf:\n",
" payload[\"csrfmiddlewaretoken\"] = _get_csrf_token(server)\n",
"\n",
" payload[\"format\"] = format\n",
"\n",
" init_resp = _get_request(server, offset=0, limit=page_size,\n",
" verbose=verbose, **payload)\n",
" \n",
" num_found = int(init_resp[\"numFound\"])\n",
" \n",
" if delayed:\n",
" page_function = _get_page_dataframe_d\n",
" else:\n",
" page_function = _get_page_dataframe\n",
"\n",
" all_frames = []\n",
" for offset in range(0, num_found, page_size):\n",
"\n",
" expected_size = (page_size if offset <= (num_found - page_size)\n",
" else (num_found - offset))\n",
" df_d = page_function(server, expected_size, limit=page_size, offset=offset,\n",
" verbose=verbose,\n",
" filter_server_url=filter_server_url,\n",
" **payload)\n",
"\n",
" all_frames.append(df_d)\n",
"\n",
" if delayed:\n",
" all_frames = dask.compute(*all_frames)\n",
" \n",
" dfa = pd.concat(all_frames,sort=True)\n",
" \n",
" # dropping duplicates on checksum removes all identical files\n",
" return dfa.drop_duplicates(subset='checksum')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Download latest google spreadsheet of requests:\n",
"\n",
"scope = ['https://spreadsheets.google.com/feeds',\n",
" 'https://www.googleapis.com/auth/drive']\n",
"\n",
"credentials = ServiceAccountCredentials.from_json_keyfile_name(\n",
" 'json/Pangeo Hackathon-e48a41b13c91.json', scope) # Your json file here\n",
"\n",
"gc = gspread.authorize(credentials)\n",
"\n",
"wks = gc.open(\"CMIP6 Hackathon Data Request (Responses)\").sheet1\n",
"\n",
"data = wks.get_all_values()\n",
"headers = data.pop(0)\n",
"\n",
"df = pd.DataFrame(data, columns=headers)\n",
"\n",
"df['experiments'] = [s.replace('*','').replace(' ','').split(',') for s in df.experiment_ids.values]\n",
"df['models'] = [s.replace('All Available','All').replace(' ','').split(',') for s in df.source_ids.values]\n",
"df['variables'] = [s.replace(' ','').split(',') for s in df['variable_ids (comma separated list)'].values]\n",
"df['table'] = [s.replace(' ','').split(':')[0] for s in df.table_id.values]\n",
"df['requester'] = df['Your name'] \n",
"df['status'] = df['LDEO status'] \n",
"\n",
"df_req = df.drop(['Your name', 'Science Question/Motivation',\n",
" 'Have you verified the existence of the data you will request?','table_id', 'source_ids', 'experiment_ids',\n",
" 'variable_ids (comma separated list)', 'Questions and comments', 'status'],1)\n",
"df_req"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"server = \"https://esgf-node.llnl.gov/esg-search/search\"\n",
"df_list = []\n",
"for index, row in df_req.iterrows():\n",
" timestamp = row['Timestamp']\n",
" name = row['requester']\n",
" email = row['E-mail']\n",
" experiment_ids = row['experiments']\n",
" source_ids = row['models']\n",
" variable_ids = row['variables']\n",
" table_id = row['table']\n",
"\n",
" if name == 'Test':\n",
" continue\n",
"\n",
" for experiment_id in experiment_ids:\n",
" for variable_id in variable_ids:\n",
" print(experiment_id, variable_id, table_id, source_ids)\n",
" if source_ids[0] == 'All':\n",
" try:\n",
" files= esgf_search(server=server, mip_era='CMIP6', variable_id=variable_id, \n",
" table_id=table_id, experiment_id=experiment_id, page_size=500, verbose=True)\n",
" print('got-em')\n",
" except:\n",
" print(experiment_id, table_id,variable_id,'nothing doing')\n",
" continue\n",
"\n",
" files.loc[:,'version'] = [str.split('/')[-2] for str in files['HTTPServer_url']]\n",
" files.loc[:,'file_name'] = [str.split('/')[-1] for str in files['HTTPServer_url']]\n",
" # might need to set activity_id to activity_drs for some files (see old versions)\n",
" files.loc[:,'activity_id'] = files.activity_drs\n",
"\n",
" df_list += [files.drop_duplicates(subset =[\"file_name\",\"version\",\"checksum\"]) ]\n",
"\n",
" else:\n",
" for source_id in source_ids:\n",
" try:\n",
" files= esgf_search(server=server, mip_era='CMIP6', variable_id=variable_id, \n",
" table_id=table_id, experiment_id=experiment_id, source_id = source_id, page_size=500, verbose=True)\n",
" print('got-em')\n",
" except:\n",
" print(experiment_id, table_id,variable_id,'nothing doing')\n",
" continue\n",
" files.loc[:,'version'] = [str.split('/')[-2] for str in files['HTTPServer_url']]\n",
" files.loc[:,'file_name'] = [str.split('/')[-1] for str in files['HTTPServer_url']]\n",
" # might need to set activity_id to activity_drs for some files (see old versions)\n",
" files.loc[:,'activity_id'] = files.activity_drs\n",
"\n",
" df_list += [files.drop_duplicates(subset =[\"file_name\",\"version\",\"checksum\"]) ]\n",
"\n",
"dESGF = pd.concat(df_list,sort=False)\n",
"dESGF = dESGF.drop_duplicates(subset =[\"file_name\",\"version\",\"checksum\"])\n",
"dESGF.to_csv('csv/ESGF_requests.csv',index=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "pangeo-latest",
"language": "python",
"name": "pangeo-latest"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment