Created
December 13, 2021 21:04
-
-
Save b10n1k/aed84ac73608da3633d1b41160010c22 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ocw.lib.gce import GCE | |
from webui.settings import PCWConfig | |
from tests.generators import min_image_age_hours, max_image_age_hours, gce_max_service_account_age_hours, gce_min_service_account_age_hours | |
from tests.generators import mock_get_feature_property | |
from tests import generators | |
from datetime import datetime, timezone, timedelta | |
import googleapiclient.discovery | |
from googleapiclient.http import HttpMock | |
from googleapiclient.http import HttpMockSequence | |
from googleapiclient.discovery import build | |
import pprint | |
import json | |
def test_parse_image_name(monkeypatch): | |
monkeypatch.setattr(PCWConfig, 'get_feature_property', lambda *args, **kwargs: "FOOF") | |
gce = GCE('fake') | |
assert gce.parse_image_name('sles12-sp5-gce-x8664-0-9-1-byos-build1-56') == { | |
'key': '12-sp5-gce-byos-x8664', | |
'build': '0-9-1-1-56' | |
} | |
assert gce.parse_image_name('sles15-sp2-byos-x8664-0-9-3-gce-build1-10') == { | |
'key': '15-sp2-gce-byos-x8664', | |
'build': '0-9-3-1-10' | |
} | |
assert gce.parse_image_name('sles15-sp2-x8664-0-9-3-gce-build1-10') == { | |
'key': '15-sp2-gce-x8664', | |
'build': '0-9-3-1-10' | |
} | |
assert gce.parse_image_name('sles15-sp2-chost-byos-x8664-0-9-3-gce-build1-11') == { | |
'key': '15-sp2-gce-chost-byos-x8664', | |
'build': '0-9-3-1-11' | |
} | |
assert gce.parse_image_name('do not match') is None | |
class FakeRequest: | |
def __init__(self, response={}): | |
self.response = response | |
def execute(self): | |
return self.response | |
class FakeMockImages: | |
def __init__(self, responses): | |
self.deleted = list() | |
self.responses = responses | |
def list(self, *args, **kwargs): | |
return self.responses.pop(0) | |
def list_next(self, *args, **kwargs): | |
return self.responses.pop(0) | |
def delete(self, *args, **kwargs): | |
self.deleted.append(kwargs['image']) | |
return self.responses.pop(0) | |
class FakeServiceAccounts: | |
def __init__(self, responses): | |
self.deleted = list() | |
#print(type(responses)) | |
#print(responses) | |
self.responses = responses | |
def list(self, *args, **kwargs): | |
print(type(self.responses)) | |
return self.responses.pop(0) | |
def list_next(self, *args, **kwargs): | |
return None | |
def delete(self, *args, **kwargs): | |
self.deleted.append(kwargs['image']) | |
return self.responses.pop(0) | |
def keys(self, *args, **kwargs): | |
return self.responses | |
class FakeResource: | |
def projects(self): | |
return self.serviceAccounts | |
def serviceAccounts(self): | |
return FakeServiceAccounts() | |
def test_cleanup_all(monkeypatch): | |
newer_then_min_age = datetime.now(timezone.utc).isoformat() | |
older_then_min_age = (datetime.now(timezone.utc) - timedelta(hours=min_image_age_hours+1)).isoformat() | |
older_then_max_age = (datetime.now(timezone.utc) - timedelta(hours=max_image_age_hours+1)).isoformat() | |
fmi = FakeMockImages([ | |
FakeRequest({ # on images().list() | |
'items': [ | |
{'name': 'I will not be parsed', 'creationTimestamp': older_then_max_age}, | |
{'name': 'sles12-sp5-gce-x8664-0-9-1-byos-build1-54', 'creationTimestamp': newer_then_min_age}, | |
{'name': 'sles12-sp5-gce-x8664-0-9-1-byos-build1-56', 'creationTimestamp': older_then_min_age} | |
] | |
}), | |
FakeRequest({ # on images().list_next() | |
'items': [ | |
{'name': 'sles12-sp5-gce-x8664-0-9-1-byos-build1-57', 'creationTimestamp': older_then_min_age}, | |
{'name': 'sles12-sp5-gce-x8664-0-9-1-byos-build1-58', 'creationTimestamp': older_then_max_age} | |
] | |
}), | |
None, # on images().list_next() | |
FakeRequest({'error': {'errors': [{'message': 'err message'}]}, 'warnings': [{'message': 'warning message'}]}), | |
FakeRequest(), # on images().delete() | |
]) | |
def mocked_compute_client(): | |
pass | |
mocked_compute_client.images = lambda *args, **kwargs: fmi | |
monkeypatch.setattr(GCE, 'compute_client', lambda self: mocked_compute_client) | |
monkeypatch.setattr(GCE, 'cleanup_vaultopenqa_serviceaccounts', lambda self: True) | |
monkeypatch.setattr(PCWConfig, 'get_feature_property', mock_get_feature_property) | |
gce = GCE('fake') | |
generators.max_images_per_flavor = 2 | |
gce.cleanup_all() | |
assert fmi.deleted == ['sles12-sp5-gce-x8664-0-9-1-byos-build1-56', 'sles12-sp5-gce-x8664-0-9-1-byos-build1-58'] | |
fmi = FakeMockImages([FakeRequest({})]) | |
gce.cleanup_all() | |
assert fmi.deleted == [] | |
def test_cleanup_serviceAccounts(monkeypatch): | |
service_account_older_than_max_age = (datetime.now(timezone.utc) - timedelta(hours=gce_max_service_account_age_hours+1)).isoformat() | |
service_account_older_than_min_age = (datetime.now(timezone.utc) - timedelta(hours=gce_min_service_account_age_hours+1)).isoformat() | |
fake_vault_service_accounts = FakeServiceAccounts([ | |
FakeRequest({ # on serviceAccounts().list() | |
"accounts": [ | |
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]", | |
"email": "[email protected]"}, | |
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]", | |
"email": "[email protected]"}, | |
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]", | |
"email": "[email protected]"}, | |
] | |
}) | |
]) | |
fake_vault_service_accounts_keys = FakeServiceAccounts([ | |
FakeRequest({ # on serviceAccounts().keys().list() | |
"keys": [ | |
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd", | |
"validAfterTime": service_account_older_than_max_age}, | |
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd", | |
"validAfterTime": service_account_older_than_max_age}, | |
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd", | |
"validAfterTime": service_account_older_than_min_age}, | |
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd", | |
"validAfterTime": service_account_older_than_max_age}, | |
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd", | |
"validAfterTime": service_account_older_than_max_age}, | |
{"name": "projects/suse-sle-qa/serviceAccounts/[email protected]/keys/c718317203fdacf1833db761b45c0a8beb6f7bbd", | |
"validAfterTime": service_account_older_than_min_age} | |
] | |
}) | |
]) | |
def mocked_iam_client(): | |
#pass | |
#return MockedServiceAccountsClient(fake_vault_service_accounts, fake_vault_service_accounts_keys) | |
#return MockedServiceAccountsClient() | |
return FakeServiceAccounts() | |
# def mock_projects(): | |
# return mock_serviceAccounts() | |
# def mock_serviceAccounts(): | |
# return FakeServiceAccounts() | |
monkeypatch.setattr(PCWConfig, 'get_feature_property', mock_get_feature_property) | |
# iam_service.projects().serviceAccounts().list(name="projects/suse-sle-qa") | |
#mocked_iam_client.projects = lambda self: mock_projects | |
#mocked_iam_client.serviceAccounts = lambda self: mock_serviceAccounts | |
monkeypatch.setattr(GCE, 'iam_client', lambda self: mocked_iam_client) | |
mocked_iam_client.projects = lambda *args, **kwargs: mocked_iam_client | |
mocked_iam_client.serviceAccounts = lambda *args, **kwargs: fake_vault_service_accounts | |
mocked_iam_client.keys = lambda *args, **kwargs: fake_vault_service_accounts_keys | |
# mocked_iam_client.serviceAccounts = lambda *args, **kwargs: mocked_iam_client | |
# mocked_iam_client.list = lambda *args, **kwargs: {} | |
# #mocked_iam_client.list.execute = lambda *args, **kwargs: {} | |
# print("mocking keys") | |
# #mocked_iam_client.keys = lambda *args, **kwargs: fake_vault_service_accounts_keys | |
# mocked_iam_client.keys.list = lambda *args, **kwargs: {} | |
# #mocked_iam_client.keys.execute = lambda *args, **kwargs: {} | |
gce = GCE("fake") | |
gce.cleanup_vaultopenqa_serviceaccounts() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment