Last active
May 6, 2023 11:10
-
-
Save xan7r/b0775e4ec23715f846e54223c4959011 to your computer and use it in GitHub Desktop.
Script for basic management of Digital Ocean droplets and DNS records
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import digitalocean, time, datetime, sys, re, argparse | |
from urllib import request | |
# NOTE: REQUIRES python-digitalocean | |
# | |
# examples: | |
# ./digitalOceanAPI.py --create-droplet "ubuntu 18.04" | |
# ./digitalOceanAPI.py --destroy-droplet <ubuntu18.04-20180723.185051> | |
# ./digitalOceanAPI.py --list-droplets | |
# ./digitalOceanAPI.py --list-images | |
# ./digitalOceanAPI.py --list-domains | |
# ./digitalOceanAPI.py --list-records --domain <example.local> | |
# ./digitalOceanAPI.py --create-record --domain <example.local> --record-name www --record-data 127.0.0.1 | |
# ./digitalOceanAPI.py --destroy-record www --domain <example.local> | |
# ./digitalOceanAPI.py --dynamic-dns dns --domain <example.local> | |
apiKey = "" # REPLACE THIS WITH DIGITAL OCEAN API KEY | |
sshKeyID = 0000000 # REPLACE THIS WITH DEFAULT SSH KEY ID (INTEGER) | |
manager = digitalocean.Manager(token=apiKey) | |
def listDroplets(): | |
for i in manager.get_all_droplets(): | |
print("\nName : " + i.name + "\nIP Address : " + i.ip_address + "\nStatus : " + i.status) | |
def listImages(): | |
for i in manager.get_all_images(): | |
print(i) | |
def listRegions(): | |
for i in manager.get_all_regions(): | |
print(str(i)) | |
def listSizes(): | |
for i in manager.get_all_sizes(): | |
print(str(i)) | |
def listKeys(): | |
print("") | |
for i in manager.get_all_sshkeys(): | |
print(str(i)) | |
def listDomains(): | |
print("") | |
for i in manager.get_all_domains(): | |
print(str(i)) | |
def listDomainRecords(domain): | |
if domain == None: | |
print("ERROR: Domain required. Use --list-domains to view all domains") | |
sys.exit(1) | |
domainObj = digitalocean.Domain(name=domain, token=apiKey) | |
try: | |
print("\nID\t\tTYPE\tNAME\t\tDATA\t\t\t\tTTL") | |
for i in domainObj.get_records(): | |
print(str(i.id) + "\t" + i.type + "\t" + i.name + "\t\t" + i.data + "\t\t" + str(i.ttl)) | |
except Exception as e: | |
print("ERROR: Domain " + domain + " not found. Use --list-domains to view all domains") | |
print(e) | |
sys.exit(1) | |
def createDomainRecord(domain, recordType, recordName, recordData): | |
if domain == None: | |
print("ERROR: Domain required. Use --list-domains to view all domains") | |
sys.exit(1) | |
domainObj = digitalocean.Domain(name=domain, token=apiKey) | |
if recordType == None: | |
recordType = "A" | |
if recordName == None: | |
print("ERROR: Domain subdomain name required to add record") | |
sys.exit(1) | |
if recordData == None: | |
print("ERROR: Domain subdomain data required to add record") | |
sys.exit(1) | |
try: | |
if recordType == "MX": | |
domainObj.create_new_domain_record(type=recordType, name=recordName, data=recordData, priority=10) | |
else: | |
domainObj.create_new_domain_record(type=recordType, name=recordName, data=recordData) | |
print("\nCreated Record Successfully.") | |
listDomainRecords(domain) | |
except Exception as e: | |
print("ERROR: Unable to create new record\n" + str(e)) | |
sys.exit(1) | |
def destroyDomainRecord(domain, recordID, bypassConfirmation=False): | |
if domain == None: | |
print("ERROR: Domain required. Use --list-domains to view all domains") | |
sys.exit(1) | |
if recordID == None: | |
print("ERROR: Record ID required.") | |
listDomainRecords(domain) | |
domainObj = digitalocean.Domain(name=domain, token=apiKey) | |
for i in domainObj.get_records(): | |
if str(i.id) == recordID or str(i.name) == recordID: | |
print("\nDNS Record:\n" + str(i.id) + "\t" + i.type + "\t" + i.name + "\t\t" + i.data + "\t\t" + str(i.ttl)) | |
if bypassConfirmation: | |
i.destroy() | |
print("\nDestroyed Record Successfully.") | |
listDomainRecords(domain) | |
else: | |
text = input("\nType CONFIRM to destroy record: ") | |
if text == "CONFIRM": | |
i.destroy() | |
print("\nDestroyed Record Successfully.") | |
listDomainRecords(domain) | |
else: | |
print("Operation aborted. Droplet not destroyed") | |
sys.exit(1) | |
def createDroplet(imageName, imageSize, imageRegion, keyID, userDataFile): | |
if imageSize == None: | |
imageSize = '512mb' | |
if imageRegion == None: | |
imageRegion = 'nyc1' | |
if keyID == None: | |
keyID = sshKeyID | |
userDataContents = "" | |
if userDataFile: | |
with open(userDataFile, 'r') as myfile: | |
userDataContents=myfile.read() | |
allKeys = manager.get_all_sshkeys() | |
for i,val in enumerate(allKeys): | |
if val.id == int(keyID): | |
myKey = allKeys[i] | |
break | |
allImages = manager.get_all_images() | |
DOimageName = None | |
print("") | |
for i in allImages: | |
curName = str(i).upper() | |
if curName.find(imageName.upper()) > -1: | |
DOimageName = str(i.id) | |
print("Creating droplet from image: " + str(i)) | |
break | |
if DOimageName == None: | |
print("Unable to find images matching: " + imageName) | |
sys.exit() | |
ts = datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d.%H%M%S') | |
boxName = re.sub(' ', '', imageName) + "-" + ts | |
droplet = digitalocean.Droplet(token=apiKey, | |
name=boxName, | |
region=imageRegion, | |
image=DOimageName, | |
ssh_keys=[myKey], | |
user_data=userDataContents, | |
size_slug=imageSize) | |
droplet.create() | |
currentStatus = droplet.get_actions()[0] | |
while ( currentStatus.status != 'completed'): | |
time.sleep(2) | |
currentStatus.load() | |
droplet.load() | |
print("Created Host: " + boxName) | |
print("In Region: " + imageRegion) | |
print("With Size: " + imageSize) | |
print("With SSH Key: " + str(myKey)) | |
print("With Public IP: " + droplet.ip_address) | |
def destroyDroplet(droplet): | |
allDroplets = manager.get_all_droplets() | |
for i in allDroplets: | |
if (i.name.upper() == droplet.upper()) or (i.ip_address == droplet): | |
print("\nName : " + i.name + "\nIP Address : " + i.ip_address + "\nStatus : " + i.status) | |
text = input("\nType CONFIRM to destroy droplet: ") | |
if text == "CONFIRM": | |
i.destroy() | |
print("Destroyed droplet " + droplet) | |
return | |
else: | |
print("Operation aborted. Droplet not destroyed") | |
return | |
print("ERROR: Unable to destroy droplet " + droplet) | |
def checkIP(url): | |
req = request.urlopen(url) | |
response = req.read().decode('utf-8').rstrip() | |
return response | |
def DynamicDNS(url, domain, recordName): | |
if domain == None: | |
print("ERROR: Domain required (e.g. example.local). Use --list-domains to view all domains") | |
sys.exit(1) | |
if recordName == None: | |
print("ERROR: Record Name required. (e.g. dynamicdns)") | |
sys.exit(1) | |
currentIP = str(checkIP(url)) | |
dnsIP = None | |
domainObj = digitalocean.Domain(name=domain, token=apiKey) | |
try: | |
for i in domainObj.get_records(): | |
if str(i.name) == recordName: | |
dnsIP = str(i.data) | |
except Exception as e: | |
print("ERROR: Domain " + domain + " not found. Use --list-domains to view all domains") | |
print(e) | |
sys.exit(1) | |
if dnsIP == None: | |
print("ERROR: Existing record not found. Please set DNS once manually before using Dynamic DNS mode.") | |
sys.exit(1) | |
if dnsIP == currentIP: | |
print("[*] DNS already set correctly, nothing to do") | |
sys.exit(0) | |
else: | |
destroyDomainRecord(domain, recordName, True) | |
createDomainRecord(domain, "A", recordName, currentIP) | |
print("[*] DNS record updated to " + currentIP) | |
def parseArgs(): | |
parser = argparse.ArgumentParser(description="Script to interface with Digital Ocean API") | |
primaryCommand = parser.add_mutually_exclusive_group(required=True) | |
primaryCommand.add_argument('-cd','--create-droplet', help='create droplet based on DO image name (fuzzy match)') | |
parser.add_argument('-r','--region', help='specify region when creating droplet (default: nyc1)') | |
parser.add_argument('-s','--size', help='specify size when creating droplet (default: 512mb)') | |
parser.add_argument('-k','--key', help='specify ssh key ID to use when creating droplet') | |
parser.add_argument('-d','--data', help='specify User Data file (bash script) to apply when creating new droplet') | |
primaryCommand.add_argument('-dd','--destroy-droplet', help='destroy droplet based on name or IP (exact match)') | |
parser.add_argument('-D','--domain', help='specify domain to manipulate (exact match)') | |
primaryCommand.add_argument('-cr','--create-record', help='create new subdomain record', action='store_true') | |
primaryCommand.add_argument('-dr','--destroy-record', help='destroy subdomain record by ID or Name') | |
parser.add_argument('-rt','--record-type', help='specify subdomain record type (e.g. A, CNAME)') | |
parser.add_argument('-rn','--record-name', help='specify subdomain record name (e.g. @, www)') | |
parser.add_argument('-rd','--record-data', help='specify subdomain record data (e.g. 127.0.0.1)') | |
primaryCommand.add_argument('-li','--list-images', help='list all available images ', action='store_true') | |
primaryCommand.add_argument('-ld','--list-droplets', help='list all available droplets', action='store_true') | |
primaryCommand.add_argument('-lk','--list-keys', help='list all available ssh keys', action='store_true') | |
primaryCommand.add_argument('-lr','--list-regions', help='list all available regions', action='store_true') | |
primaryCommand.add_argument('-ls','--list-sizes', help='list all available sizes', action='store_true') | |
primaryCommand.add_argument('-lD','--list-domains', help='list all available domain names', action='store_true') | |
primaryCommand.add_argument('-lR','--list-records', help='list records from specified domain', action='store_true') | |
primaryCommand.add_argument('-CI','--check-ip', help='check current ip using specified url', action='store_true') | |
primaryCommand.add_argument('-DD','--dynamic-dns', help='Check IP and update Dynamic DNS if required') | |
parser.add_argument('-u','--url', help='url to use when checking IP (only used in --check-ip and --dynamic-dns)', nargs='?', const='https://ipinfo.io/ip', default='https://ipinfo.io/ip') | |
args = parser.parse_args() | |
return args | |
def main(): | |
args = parseArgs() | |
if args.list_images: | |
listImages() | |
if args.list_droplets: | |
listDroplets() | |
if args.list_keys: | |
listKeys() | |
if args.list_regions: | |
listRegions() | |
if args.list_sizes: | |
listSizes() | |
if args.list_domains: | |
listDomains() | |
if args.list_records: | |
listDomainRecords(args.domain) | |
if args.create_record: | |
createDomainRecord(args.domain, args.record_type, args.record_name, args.record_data) | |
if args.destroy_record: | |
destroyDomainRecord(args.domain, args.destroy_record) | |
if args.create_droplet: | |
createDroplet(args.create_droplet, args.size, args.region, args.key, args.data) | |
if args.destroy_droplet: | |
destroyDroplet(args.destroy_droplet) | |
if args.dynamic_dns: | |
DynamicDNS(args.url, args.domain, args.dynamic_dns) | |
if args.check_ip: | |
print(checkIP(args.url)) | |
print("") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment