Skip to content

Instantly share code, notes, and snippets.

@buswedg
Created August 15, 2020 20:47
Show Gist options
  • Save buswedg/6859cb95d42d4486cd0c80acdb42ceed to your computer and use it in GitHub Desktop.
Save buswedg/6859cb95d42d4486cd0c80acdb42ceed to your computer and use it in GitHub Desktop.
having_fun_with_docker_and_digital_ocean_api\digital_ocean_class
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class DigitalOcean(object):\n",
" metadata_url = 'http://{}/metadata/v1.json'\n",
" images_url = 'https://api.digitalocean.com/v2/images'\n",
" droplets_url = 'https://api.digitalocean.com/v2/droplets'\n",
" databases_url = 'https://api.digitalocean.com/v2/databases'\n",
"\n",
" def __init__(self, api_key):\n",
" self.headers = {\n",
" 'content-type': 'application/json',\n",
" 'authorization': 'Bearer {}'.format(api_key)\n",
" }\n",
"\n",
" def get_metadata(self):\n",
" req = Request(self.metadata_url)\n",
"\n",
" try:\n",
" with urlopen(req) as response:\n",
" return json.loads(response.read().decode('utf8'))\n",
"\n",
" except Exception as e:\n",
" print(e)\n",
"\n",
" def get_image_list(self):\n",
" req = Request(url='{}'.format(self.images_url),\n",
" headers=self.headers)\n",
"\n",
" try:\n",
" with urlopen(req) as response:\n",
" return json.loads(response.read().decode('utf8'))\n",
"\n",
" except Exception as e:\n",
" print(e)\n",
"\n",
" def get_database_list(self):\n",
" req = Request(url='{}'.format(self.databases_url),\n",
" headers=self.headers)\n",
"\n",
" try:\n",
" with urlopen(req) as response:\n",
" return json.loads(response.read().decode('utf8'))\n",
"\n",
" except Exception as e:\n",
" print(e)\n",
"\n",
" def get_database(self, database_id):\n",
" req = Request(url='{}/{}'.format(self.databases_url, database_id),\n",
" headers=self.headers)\n",
"\n",
" try:\n",
" with urlopen(req) as response:\n",
" return json.loads(response.read().decode('utf8'))\n",
"\n",
" except Exception as e:\n",
" print(e)\n",
"\n",
" def update_db_firewall(self, database_id, entries):\n",
" values = {'rules': entries}\n",
" data = json.dumps(values).encode('utf8')\n",
"\n",
" req = Request(url='{}/{}/firewall'.format(self.databases_url, database_id),\n",
" data=data, headers=self.headers, method='PUT')\n",
"\n",
" try:\n",
" with urlopen(req) as response:\n",
" return response.read().decode('utf8')\n",
"\n",
" except Exception as e:\n",
" print(e)\n",
"\n",
" def get_droplet_list(self):\n",
" req = Request(url='{}'.format(self.droplets_url), headers=self.headers)\n",
"\n",
" try:\n",
" with urlopen(req) as response:\n",
" return json.loads(response.read().decode('utf8'))\n",
"\n",
" except Exception as e:\n",
" print(e)\n",
"\n",
" def create_droplet(self, name, region, size, image, ssh_keys, private_networking, backups, user_data):\n",
" values = {'name': name, 'region': region, 'size': size, 'image': image, 'ssh_keys': ssh_keys,\n",
" 'private_networking': private_networking, 'backups': backups, 'user_data': user_data}\n",
" data = json.dumps(values).encode('utf8')\n",
"\n",
" req = Request(self.droplets_url, data=data, headers=self.headers)\n",
"\n",
" try:\n",
" with urlopen(req) as response:\n",
" return json.loads(response.read().decode('utf8'))\n",
"\n",
" except Exception as e:\n",
" print(e)\n",
"\n",
" def delete_droplet(self, droplet_id):\n",
" req = Request(url='{}/{}'.format(self.droplets_url, droplet_id),\n",
" headers=self.headers, method='DELETE')\n",
"\n",
" try:\n",
" with urlopen(req) as response:\n",
" return response.getcode()\n",
"\n",
" except Exception as e:\n",
" print(e)\n",
"\n",
" def simplify(self, result):\n",
" return [\n",
" {\n",
" 'id': entry.get('id'),\n",
" 'name': entry.get('name'),\n",
" 'status': entry.get('status')\n",
" }\n",
"\n",
" for entry in result.get('droplets')\n",
" ]"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment