Created
May 28, 2024 22:05
-
-
Save fsultan/eabfee073509b2e4f2bce9872700540c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from unittest.mock import MagicMock, patch | |
from datetime import timedelta | |
from cryptography import x509 | |
from cryptography.hazmat.backends import default_backend | |
from lambda_function import upload_secret | |
from lambda_function import write_cert | |
from lambda_function import create_cert | |
class TestUploadSecret(unittest.TestCase): | |
def setUp(self): | |
self.client = MagicMock() | |
self.secret = MagicMock() | |
self.secret_name = 'your_secret_name' | |
self.type = 'private-key' | |
def test_upload_secret_private_key(self): | |
self.client.describe_secret.return_value = {} | |
upload_secret(self.client, self.secret, self.secret_name, self.type) | |
self.client.upload_secret.assert_called_once() | |
self.client.restore_secret.assert_not_called() | |
self.client.put_secret_value.assert_called_once_with( | |
secret_name=self.secret_name, secret_string=self.secret_string | |
) | |
self.assertEqual(logger.info.call_args[0][0], "CREATED private-key SECRET") | |
def test_upload_secret_existing_deleted(self): | |
self.client.describe_secret.return_value = {'DeletedDate': '2022-01-01'} | |
upload_secret(self.client, self.secret, self.secret_name, self.type) | |
self.client.upload_secret.assert_not_called() | |
self.client.restore_secret.assert_called_once_with( | |
secret_name=self.secret_name | |
) | |
self.client.put_secret_value.assert_called_once_with( | |
secret_name=self.secret_name, secret_string=self.secret_string | |
) | |
self.assertEqual(logger.info.call_args[0][0], "RESTORED AND UPDATED private-key SECRET") | |
def test_upload_secret_existing_not_deleted(self): | |
self.client.describe_secret.return_value = {'DeletedDate': None} | |
upload_secret(self.client, self.secret, self.secret_name, self.type) | |
self.client.upload_secret.assert_not_called() | |
self.client.restore_secret.assert_not_called() | |
self.client.put_secret_value.assert_called_once_with( | |
secret_name=self.secret_name, secret_string=self.secret_string | |
) | |
self.assertEqual(logger.info.call_args[0][0], "UPDATED private-key SECRET") | |
def test_upload_secret_exception(self): | |
self.client.describe_secret.side_effect = Exception("Some error") | |
upload_secret(self.client, self.secret, self.secret_name, self.type) | |
self.client.upload_secret.assert_called_once() | |
self.client.restore_secret.assert_not_called() | |
self.client.put_secret_value.assert_not_called() | |
self.assertEqual(logger.error.call_args[0][0], "Could not upload certificate due to Some error") | |
class TestWriteCert(unittest.TestCase): | |
def setUp(self): | |
self.cert = MagicMock() | |
self.key = MagicMock() | |
def test_write_cert_success(self): | |
with patch("builtins.open", create=True) as mock_open, \ | |
patch("shutil.copy") as mock_copy: | |
write_cert(self.cert, self.key) | |
mock_open.assert_any_call("/tmp/certificateChain.pem", "wt") | |
mock_open.assert_any_call("/tmp/privateKey.pem", "wt") | |
mock_copy.assert_called_once_with("/tmp/certificateChain.pem", "/tmp/trustedCertificates.pem") | |
def test_write_cert_exception(self): | |
with patch("builtins.open", side_effect=Exception("Some error")), \ | |
self.assertRaises(Exception) as context: | |
write_cert(self.cert, self.key) | |
self.assertEqual(str(context.exception), "Some error") | |
self.assertEqual(logger.error.call_args[0][0], "COULD NOT WRITE FILE DUE TO : Some error") | |
if __name__ == '__main__': | |
unittest.main() | |
class TestCreateCert(unittest.TestCase): | |
def setUp(self): | |
self.key = MagicMock() | |
def test_create_cert_success(self): | |
cert = create_cert(self.key) | |
self.assertIsInstance(cert, x509.Certificate) | |
self.assertEqual(cert.subject.country_name, "US") | |
self.assertEqual(cert.subject.state_or_province_name, "New Jersey") | |
self.assertEqual(cert.subject.locality_name, "Jersey City") | |
self.assertEqual(cert.subject.organization_name, "JP Morgan Chase") | |
self.assertEqual(cert.subject.organizational_unit_name, "EFP") | |
self.assertEqual(cert.subject.common_name, CNAME) | |
self.assertEqual(cert.serial_number, 2002) | |
self.assertEqual(cert.issuer, cert.subject) | |
self.assertEqual(cert.public_key, self.key) | |
self.assertEqual(cert.signature_algorithm_oid, x509.SignatureAlgorithmOID.SHA256_WITH_RSA) | |
def test_create_cert_exception(self): | |
with patch("lambda_function.logger.error") as mock_logger_error: | |
self.key.side_effect = Exception("Some error") | |
with self.assertRaises(Exception) as context: | |
create_cert(self.key) | |
self.assertEqual(str(context.exception), "Some error") | |
mock_logger_error.assert_called_once_with("CERTIFICATE CREATION FAILED DUE TO Some error") | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment