Skip to content

Instantly share code, notes, and snippets.

@jay0lee
Last active April 25, 2024 14:16
Show Gist options
  • Save jay0lee/86518fe82ffb3dfd90a962a14678478b to your computer and use it in GitHub Desktop.
Save jay0lee/86518fe82ffb3dfd90a962a14678478b to your computer and use it in GitHub Desktop.
Delete old, unused Google Cloud Workstation Disks
#!/usr/bin/env python3
'''
Delete old Cloud Workstation Disks
WARNING: This script will delete disks and all data on them. It should be
considered "proof of concept" quality and should be customized to fit your
environmental needs, checked for logic errors and monitored. I am not
responsible for data loss by using this script.
This script will iterate over Compute Disks associateed to the same
project as the GCE instance the script is run on:
- disks that are NOT associated to Google Cloud Workstations (do not have a label
named goog-drz-workstation-uuid) will be skipped.
- disks currently in use (attached to some GCE / Workstation instance)
will be skipped.
- disks associated to Cloud Workstations that still exist (but may be
powered off and detached) will be skipped.
- remaining disks will be considered for deletion based on last detach
date. If disk has not been detached then we'll use attach date. If
disk has not been attached then we'll use create date.
Setup Steps:
1. Create a service account in the Cloud Workstations project.
2. Grant the service account the following GCP IAM roles at a project
level:
- Compute Storage Admin
- Cloud Workstations Admin
3. Attach the service account to a GCE VM or Workstation.
4. Install the following packages on the VM:
$ sudo apt install python3-google-auth python3-googleapi \
python3-dateutil
5. Run the script. The script won't actually delete anything unless you
specify --doit. So try:
$ ./delete_old_workstation_disks.py --older_than 30
to simulate looking for unused disks older than 30 days.
Arguments:
--older_than <days> - delete disks older than <days>
--doit - actually delete unused and old disks.
--debug - enable API debugging output.
'''
import argparse
import datetime
from google.auth import default
from googleapiclient.discovery import build
import httplib2
def parse_args():
'''
parses the script's arguments
'''
parser = argparse.ArgumentParser(description='Delete old, unused Cloud Workstations Disks')
parser.add_argument('--older_than',
help=('number of days disk must be unused to be '
'considered old and ready for deletion.'),
type=int,
required=True)
parser.add_argument('--doit',
help='actually perform deletion of old, unused disks.',
action='store_true')
parser.add_argument('--debug',
help='output HTTP debug logging of API calls.',
action='store_true')
return parser.parse_args()
def get_adc_and_project():
'''
Get Google Application Default Credentials. See:
https://cloud.google.com/docs/authentication/application-default-credentials
https://google-auth.readthedocs.io/en/master/reference/google.auth.html
'''
return default()
def build_compute_svc():
'''
Discovery-based Comnpute Engine API service
https://cloud.google.com/compute/docs/reference/rest/v1
'''
credentials, project = get_adc_and_project()
svc = build('compute', 'v1', credentials=credentials)
return (svc, project)
def build_workstations_svc():
'''
Discovery-based Workstations API service
https://cloud.google.com/workstations/docs/reference/rest
'''
credentials, project = get_adc_and_project()
svc = build('workstations', 'v1', credentials=credentials)
return (svc, project)
def get_pages(resource, method, parameters, items='items'):
'''
Generic function to get all pages of an API call.
'''
all_items = []
endpoint = getattr(resource, method)(**parameters)
while endpoint is not None:
page_results = endpoint.execute()
all_items.extend(page_results.get(items, []))
endpoint = resource.list_next(endpoint, page_results)
return all_items
def get_zones(svc, project):
'''
returns all compute zones for the given project
https://cloud.google.com/compute/docs/reference/rest/v1/zones/list
'''
parameters = {
'fields': 'items/name,nextPageToken',
'project': project,
}
results = get_pages(svc.zones(),
'list',
parameters)
zones = [zone['name'] for zone in results if 'name' in zone]
return zones
def get_disks(svc, project):
'''
returns all disks for the given project
https://cloud.google.com/compute/docs/reference/rest/v1/disks/aggregatedList
'''
disks = []
print('Getting all Compute disks...')
fields = 'items(name,labels,lastAttachTimestamp,lastDetachTimestamp)'
results = svc.disks().aggregatedList(project=project,
fields=fields,
returnPartialSuccess=True,
includeAllScopes=True,
).execute()
for v in results.get('items', {}).values():
if rdisks := v.get('disks'):
disks.extend(rdisks)
print(f'Got {len(disks)} disks.\n')
return disks
def delete_disk(svc, project, region, name):
'''
delete a given disk by region
https://cloud.google.com/compute/docs/reference/rest/v1/regionDisks/delete
'''
operation = svc.regionDisks().delete(project=project,
region=region,
disk=name).execute()
print(f' began operation {operation["id"]} to delete the disk.')
def get_ws_locations(svc, project):
'''
returns all Workstation locations for a given project
https://cloud.google.com/workstations/docs/reference/rest/v1/projects.locations/list
'''
parameters = {
'name': f'projects/{project}',
'fields': 'locations/name,nextPageToken',
}
print('Getting all Workstation locations...')
results = get_pages(svc.projects().locations(),
'list',
parameters=parameters,
items='locations')
print(f' got {len(results)} locations.')
print()
return [location['name'] for location in results if 'name' in location]
def get_ws_clusters(svc, locations):
'''
returns all clusters for the given locations
https://cloud.google.com/workstations/docs/reference/rest/v1/projects.locations.workstationClusters/list
'''
lcount = len(locations)
clusters = []
i = 0
parameters = {
'fields': 'workstationClusters/name,nextPageToken',
}
for location in locations:
i += 1
parameters['parent'] = location
print(f' Getting Workstation Clusters in location {location} ({i}/{lcount})...')
results = get_pages(svc.projects().locations().workstationClusters(),
'list',
parameters,
items='workstationClusters')
print(f' got {len(results)} clusters.')
clusters.extend([cluster['name'] for cluster in results if 'name' in cluster])
print()
return clusters
def get_ws_configs(svc, clusters):
'''
returns all configs for the given clusters
https://cloud.google.com/workstations/docs/reference/rest/v1/projects.locations.workstationClusters.workstationConfigs/list
'''
ccount = len(clusters)
configs = []
i = 0
parameters = {
'fields': 'workstationConfigs/name,nextPageToken',
}
for cluster in clusters:
i += 1
parameters['parent'] = cluster
print(f' Getting Workstation Configs for cluster {cluster} ({i}/{ccount})...')
results = get_pages(svc.projects().locations().workstationClusters().workstationConfigs(),
'list',
parameters,
items='workstationConfigs')
print(f' got {len(results)} configs.')
configs.extend([config['name'] for config in results if 'name' in config])
print()
return configs
def get_ws_workstations(svc, configs):
'''
returns all Workstation UIDs for the given configs
https://cloud.google.com/workstations/docs/reference/rest/v1/projects.locations.workstationClusters.workstationConfigs.workstations/list
'''
ccount = len(configs)
workstations = []
i = 0
parameters = {
'fields': '*',
}
for config in configs:
i += 1
parameters['parent'] = config
print(f' Getting Workstations for config {config} ({i}/{ccount})...')
results = get_pages(svc.projects().locations().workstationClusters()
.workstationConfigs().workstations(),
'list',
parameters,
items='workstations')
print(f' got {len(results)} workstations.')
workstations.extend([ws['uid'] for ws in results if 'uid' in ws])
print()
return workstations
def delete_old_disk(doit, svc, project, delete_before_date, disk):
'''
takes a disk and deletes it if it's older than
delete_before_date
'''
# Get the create date, if not set use 1970.
never_date = '1970-01-01T00:00:00+00:00'
create_date = disk.get('creationTimestamp', never_date)
# Get date disk was last attached to an instance, if not set use create_date.
last_attach = disk.get('lastAttachTimestamp', create_date)
# Get date disk was last used, if last detach not set use last_attach.
last_use = datetime.datetime.fromisoformat(disk.get('lastDetachTimestamp', last_attach))
disk_name = disk.get('name')
print(f' considering deleting {disk_name} last used {last_use}.')
if delete_before_date > last_use:
print(f' disk last detached {last_use} and subject to deletion.')
if doit:
print(' doing it.\n')
name = disk.get('name')
region = disk.get('region').split('/')[-1]
delete_disk(svc, project, region, name)
else:
print(' running in simulated mode. Add --doit to actually delete disks.\n')
else:
print(' disk isn\'t considered old yet. Skipping.\n')
def main():
'''
main function logic
'''
args = parse_args()
if args.debug:
httplib2.debuglevel = 4
now = datetime.datetime.now(datetime.timezone.utc)
d = datetime.timedelta(days=args.older_than)
delete_before_date = now - d
print(f'Will delete unused disks last used before {delete_before_date}\n')
gce_svc, project = build_compute_svc()
disks = get_disks(gce_svc, project)
ws_svc, _ = build_workstations_svc()
ws_locations = get_ws_locations(ws_svc, project)
ws_clusters = get_ws_clusters(ws_svc, ws_locations)
ws_configs = get_ws_configs(ws_svc, ws_clusters)
workstations = get_ws_workstations(ws_svc, ws_configs)
# iterate through disks and make a decision.
for disk in disks:
print(f'Analyzing disk {disk.get("name")} with ID {disk.get("id")}...')
if not (workstation_id_label := disk.get('labels', {}).get('goog-drz-workstation-uuid')):
print(' disk not associated by label to Cloud Workstations, skipping.\n')
continue
if (users := disk.get('users')):
print(f' disk in use by {", ".join(users)} instance(s), skipping.\n')
continue
if workstation_id_label in workstations:
print((' disk associated to existing Cloud Workstation '
f'{workstation_id_label}, skipping.\n'))
continue
print((f' disk associated to Cloud Workstation {workstation_id_label}'
'which no longer seems to exist.'))
delete_old_disk(args.doit, gce_svc, project, delete_before_date, disk)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment