Skip to content

Instantly share code, notes, and snippets.

@b10n1k
Created December 13, 2021 21:04
Show Gist options
  • Save b10n1k/aed84ac73608da3633d1b41160010c22 to your computer and use it in GitHub Desktop.
Save b10n1k/aed84ac73608da3633d1b41160010c22 to your computer and use it in GitHub Desktop.
from ocw.lib.gce import GCE
from webui.settings import PCWConfig
from tests.generators import min_image_age_hours, max_image_age_hours, gce_max_service_account_age_hours, gce_min_service_account_age_hours
from tests.generators import mock_get_feature_property
from tests import generators
from datetime import datetime, timezone, timedelta
import googleapiclient.discovery
from googleapiclient.http import HttpMock
from googleapiclient.http import HttpMockSequence
from googleapiclient.discovery import build
import pprint
import json
def test_parse_image_name(monkeypatch):
monkeypatch.setattr(PCWConfig, 'get_feature_property', lambda *args, **kwargs: "FOOF")
gce = GCE('fake')
assert gce.parse_image_name('sles12-sp5-gce-x8664-0-9-1-byos-build1-56') == {
'key': '12-sp5-gce-byos-x8664',
'build': '0-9-1-1-56'
}
assert gce.parse_image_name('sles15-sp2-byos-x8664-0-9-3-gce-build1-10') == {
'key': '15-sp2-gce-byos-x8664',
'build': '0-9-3-1-10'
}
assert gce.parse_image_name('sles15-sp2-x8664-0-9-3-gce-build1-10') == {
'key': '15-sp2-gce-x8664',
'build': '0-9-3-1-10'
}
assert gce.parse_image_name('sles15-sp2-chost-byos-x8664-0-9-3-gce-build1-11') == {
'key': '15-sp2-gce-chost-byos-x8664',
'build': '0-9-3-1-11'
}
assert gce.parse_image_name('do not match') is None
class FakeRequest:
def __init__(self, response={}):
self.response = response
def execute(self):
return self.response
class FakeMockImages:
def __init__(self, responses):
self.deleted = list()
self.responses = responses
def list(self, *args, **kwargs):
return self.responses.pop(0)
def list_next(self, *args, **kwargs):
return self.responses.pop(0)
def delete(self, *args, **kwargs):
self.deleted.append(kwargs['image'])
return self.responses.pop(0)
class FakeServiceAccounts:
def __init__(self, responses):
self.deleted = list()
#print(type(responses))
#print(responses)
self.responses = responses
def list(self, *args, **kwargs):
print(type(self.responses))
return self.responses.pop(0)
def list_next(self, *args, **kwargs):
return None
def delete(self, *args, **kwargs):
self.deleted.append(kwargs['image'])
return self.responses.pop(0)
def keys(self, *args, **kwargs):
return self.responses
class FakeResource:
def projects(self):
return self.serviceAccounts
def serviceAccounts(self):
return FakeServiceAccounts()
def test_cleanup_all(monkeypatch):
newer_then_min_age = datetime.now(timezone.utc).isoformat()
older_then_min_age = (datetime.now(timezone.utc) - timedelta(hours=min_image_age_hours+1)).isoformat()
older_then_max_age = (datetime.now(timezone.utc) - timedelta(hours=max_image_age_hours+1)).isoformat()
fmi = FakeMockImages([
FakeRequest({ # on images().list()
'items': [
{'name': 'I will not be parsed', 'creationTimestamp': older_then_max_age},
{'name': 'sles12-sp5-gce-x8664-0-9-1-byos-build1-54', 'creationTimestamp': newer_then_min_age},
{'name': 'sles12-sp5-gce-x8664-0-9-1-byos-build1-56', 'creationTimestamp': older_then_min_age}
]
}),
FakeRequest({ # on images().list_next()
'items': [
{'name': 'sles12-sp5-gce-x8664-0-9-1-byos-build1-57', 'creationTimestamp': older_then_min_age},
{'name': 'sles12-sp5-gce-x8664-0-9-1-byos-build1-58', 'creationTimestamp': older_then_max_age}
]
}),
None, # on images().list_next()
FakeRequest({'error': {'errors': [{'message': 'err message'}]}, 'warnings': [{'message': 'warning message'}]}),
FakeRequest(), # on images().delete()
])
def mocked_compute_client():
pass
mocked_compute_client.images = lambda *args, **kwargs: fmi
monkeypatch.setattr(GCE, 'compute_client', lambda self: mocked_compute_client)
monkeypatch.setattr(GCE, 'cleanup_vaultopenqa_serviceaccounts', lambda self: True)
monkeypatch.setattr(PCWConfig, 'get_feature_property', mock_get_feature_property)
gce = GCE('fake')
generators.max_images_per_flavor = 2
gce.cleanup_all()
assert fmi.deleted == ['sles12-sp5-gce-x8664-0-9-1-byos-build1-56', 'sles12-sp5-gce-x8664-0-9-1-byos-build1-58']
fmi = FakeMockImages([FakeRequest({})])
gce.cleanup_all()
assert fmi.deleted == []
def test_cleanup_serviceAccounts(monkeypatch):
service_account_older_than_max_age = (datetime.now(timezone.utc) - timedelta(hours=gce_max_service_account_age_hours+1)).isoformat()
service_account_older_than_min_age = (datetime.now(timezone.utc) - timedelta(hours=gce_min_service_account_age_hours+1)).isoformat()
fake_vault_service_accounts = FakeServiceAccounts([
FakeRequest({ # on serviceAccounts().list()
"accounts": [
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]",
"email": "[email protected]"},
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]",
"email": "[email protected]"},
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]",
"email": "[email protected]"},
]
})
])
fake_vault_service_accounts_keys = FakeServiceAccounts([
FakeRequest({ # on serviceAccounts().keys().list()
"keys": [
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd",
"validAfterTime": service_account_older_than_max_age},
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd",
"validAfterTime": service_account_older_than_max_age},
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd",
"validAfterTime": service_account_older_than_min_age},
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd",
"validAfterTime": service_account_older_than_max_age},
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd",
"validAfterTime": service_account_older_than_max_age},
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd",
"validAfterTime": service_account_older_than_min_age}
]
})
])
def mocked_iam_client():
#pass
#return MockedServiceAccountsClient(fake_vault_service_accounts, fake_vault_service_accounts_keys)
#return MockedServiceAccountsClient()
return FakeServiceAccounts()
# def mock_projects():
# return mock_serviceAccounts()
# def mock_serviceAccounts():
# return FakeServiceAccounts()
monkeypatch.setattr(PCWConfig, 'get_feature_property', mock_get_feature_property)
# iam_service.projects().serviceAccounts().list(name="projects/suse-sle-qa")
#mocked_iam_client.projects = lambda self: mock_projects
#mocked_iam_client.serviceAccounts = lambda self: mock_serviceAccounts
monkeypatch.setattr(GCE, 'iam_client', lambda self: mocked_iam_client)
mocked_iam_client.projects = lambda *args, **kwargs: mocked_iam_client
mocked_iam_client.serviceAccounts = lambda *args, **kwargs: fake_vault_service_accounts
mocked_iam_client.keys = lambda *args, **kwargs: fake_vault_service_accounts_keys
# mocked_iam_client.serviceAccounts = lambda *args, **kwargs: mocked_iam_client
# mocked_iam_client.list = lambda *args, **kwargs: {}
# #mocked_iam_client.list.execute = lambda *args, **kwargs: {}
# print("mocking keys")
# #mocked_iam_client.keys = lambda *args, **kwargs: fake_vault_service_accounts_keys
# mocked_iam_client.keys.list = lambda *args, **kwargs: {}
# #mocked_iam_client.keys.execute = lambda *args, **kwargs: {}
gce = GCE("fake")
gce.cleanup_vaultopenqa_serviceaccounts()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment