Skip to content

Instantly share code, notes, and snippets.

@b10n1k
Created December 13, 2021 14:13
Show Gist options
  • Save b10n1k/14c82c374d3d345aaf6f4a682666d5c7 to your computer and use it in GitHub Desktop.
Save b10n1k/14c82c374d3d345aaf6f4a682666d5c7 to your computer and use it in GitHub Desktop.
from .vault import GCECredential
from .provider import Provider, Image
import googleapiclient.discovery
from google.oauth2 import service_account
from dateutil.parser import parse
from .decorators import filterService
from googleapiclient.errors import HttpError
import re
class GCE(Provider):
__instances = dict()
def __new__(cls, vault_namespace, *args, **kwargs):
if vault_namespace in GCE.__instances:
return GCE.__instances[vault_namespace]
GCE.__instances[vault_namespace] = object.__new__(cls)
return GCE.__instances[vault_namespace]
def __init__(self, namespace):
self.__credentials = GCECredential(namespace)
self.__compute_client = None
self.__iam_client = None
self.__project = None
self.credentials = None
super().__init__(namespace)
def iam_client(self):
if self.__credentials.isExpired():
self.__credentials.renew()
self.__iam_client = None
self.credentials = None
self.__project = self.__credentials.getPrivateKeyData()["project_id"]
if self.__iam_client is None:
self.credentials = service_account.Credentials.from_service_account_info(
self.__credentials.getPrivateKeyData()
)
self.__iam_client = googleapiclient.discovery.build(
"iam", "v1", credentials=self.credentials, cache_discovery=True
)
return self.__iam_client
@staticmethod
def url_to_name(url):
return url[url.rindex("/")+1:]
def cleanup_all(self):
# removed code ......
self.cleanup_vaultopenqa_serviceaccounts()
def cleanup_vaultopenqa_serviceaccounts(self):
self.log_info(f"## GCE Vault key cleaning - {self.__project} ##")
service = self.iam_client()
# googleapiclient.discovery.build('iam', 'v1', credentials=self.credentials)
# Get service account emails
service_accounts_emails = self.parser_service_account_list(service)
# For each email found return the ones with associated old date
service_accounts_emails_filtered = self.filter_email_service_accounts(service, service_accounts_emails)
self.delete_vaultopenqa_service_accounts(service, service_accounts_emails_filtered)
def filter_email_service_accounts(self, service, vault_account_tuple, time_limit=24):
'''Returns a list of the emails of the Service Accounts filtered by _time_limit_
Makes a API request to the ServiceAccount to list all the keys associated with
the corresponding email of the service.
The respond returns a json in the format
_{'keys': [
{'name': 'projects/suse-sle-qa/serviceAccounts/{ACCOUNT}/keys/{KEY_HASH} # the full path for a key,
'validAfterTime': '2021-11-15T12:13:43Z', # age of the key
'validBeforeTime': '2021-12-02T12:13:43Z',
'keyAlgorithm': 'KEY_ALG_RSA_2048',
'keyOrigin': 'GOOGLE_PROVIDED',
'keyType': 'USER_MANAGED'}
]}
Using `validAfterTime` the function calculate which Service Accounts
should be deleted. The actual delete function takes the email in its
request, so the `filter_email_service_accounts` has to returned them
corelated email addresses of the `service_resp['keys']`
Parameters
----------
service : googleapiclient.discovery.Resource, required
The IAM Service Account instance
vault_account_tuple : googleapiclient.discovery.Resource, required
The IAM Service Account instance
time_limit : int
the number of hours where the Service Accounts are keep been
ignored from the cleanup job
'''
from datetime import datetime, timedelta
time_limit = datetime.now() - timedelta(hours=time_limit)
dt_frm = datetime.fromisoformat
filtered_service_accounts = []
for email in vault_account_tuple:
service_resp = service.projects().serviceAccounts().keys().list(
name='projects/-/serviceAccounts/%s' % email).execute()
filtered_service_accounts += [k['name'] for k in service_resp['keys'] if dt_frm(
k['validAfterTime'][:-1]) < time_limit]
return list(filter(lambda e: e in str(filtered_service_accounts), vault_account_tuple))
@filterService(name='vaultopenqa')
def parser_service_account_list(self, iam_service):
''' Returns a list of the emails of the Service Accounts
The request is using the resource name of the project associated with
the service accounts. This returns a list of dict items, which are
also not vault related.
The objects returned in chunks which contain a `nextPageToken`
to the next page. The `parser_service_account_list` parses each page
and returns the full list. Subsequently, the list is filtered to return
only emails correlated with vault Service Accounts
Parameters
----------
iam_service : googleapiclient.discovery.Resource, required
The IAM Service Account instance
'''
service_accounts_emails = []
req = iam_service.projects().serviceAccounts().list(
name='projects/suse-sle-qa')
print(type(req))
while True:
resp = req.execute()
print(type(resp))
service_accounts_emails += [service_account['email']
for service_account in resp.get('accounts', [])]
req = iam_service.projects().serviceAccounts().list_next(
previous_request=req, previous_response=resp)
if req is None:
break
return tuple(set(service_accounts_emails))
def delete_vaultopenqa_service_accounts(self, iam_service, vault_accounts):
''' Deletes a list of vault keys
Expects _iam_service_ of type *googleapiclient.discovery.Resource*
and _vault_accounts_ list of strings.
A short comment from the documentation. - Deleting a service account
key does not revoke short-lived credentials
that have been issued based on the service account key.
Parameters
----------
iam_service : googleapiclient.discovery.Resource, required
The IAM Service Account instance
vault_accounts : list, required
A list of keys to delete. This is retrieved by the _accounts_ of
the *parser_service_account_list* which is the email field.
Raises
------
TypeError
This will raised when the _vault_accounts_ will not match the expected pattern in the _name_
HttpError
Http Respond Errors
'''
if (len(vault_accounts) < 1):
self.log_info("Nothing to delete")
else:
for account_email in list(vault_accounts):
if self.dry_run:
self.log_warn(
f"Deletion of vault Service Account {account_email} skipped due to dry run mode")
else:
try:
# TODO: if deletion needs key removal first
# iam_service.projects().serviceAccounts().keys().delete(
# name=vault_accounts).execute()
iam_service.projects().serviceAccounts().delete(
name='projects/-/serviceAccounts/%s' % account_email).execute()
except (TypeError, HttpError) as err:
self.log_err(f"Fail to delete Service Account {account_email} \n{err}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment