Last active
February 21, 2019 21:19
-
-
Save jimmyislive/c5d12acc128dab0df8b9 to your computer and use it in GitHub Desktop.
aws kms
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Define your AWS_KMS_ARN, KMS_REGION, KMS_AWS_ACCESS_KEY_ID, KMS_AWS_SECRET_ACCESS_KEY someplace | |
import base64 | |
import boto3 | |
from Crypto.Cipher import AES | |
class AwsKms(object): | |
def __init__(self): | |
self.key_id = AWS_KMS_ARN | |
self.client = boto3.client('kms', | |
region_name=KMS_REGION, | |
aws_access_key_id=KMS_AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=KMS_AWS_SECRET_ACCESS_KEY) | |
def generate_data_key(self, key_spec='AES_256'): | |
"""returns plaintext and encrypted key. | |
Store the encrypted key / Use the plaintext key and promptly discard | |
""" | |
response = self.client.generate_data_key(KeyId=self.key_id, KeySpec=key_spec) | |
if response['ResponseMetadata']['HTTPStatusCode'] == 200: | |
return base64.b64encode(response['CiphertextBlob']) | |
# if you cannot generate the symmetric key itself, something is wrong with your | |
# credentials...bail out | |
raise Exception('Error while generating data key: {0}'.format(response)) | |
def get_plaintext_symmetric_key(self, cipherkey): | |
response = self.client.decrypt(CiphertextBlob=base64.b64decode(cipherkey)) | |
if response['ResponseMetadata']['HTTPStatusCode'] == 200: | |
return response['Plaintext'] | |
def encrypt(self, plaintext, cipherkey): | |
symmetric_key = self.get_plaintext_symmetric_key(cipherkey) | |
if not symmetric_key: | |
# either log something or raise an exception here... | |
return | |
return base64.b64encode(AES.new(symmetric_key, AES.MODE_CFB).encrypt(plaintext)) | |
def decrypt(self, ciphertext, cipherkey): | |
symmetric_key = self.get_plaintext_symmetric_key(cipherkey) | |
if not symmetric_key: | |
# either log something or raise an exception here... | |
return | |
return AES.new(symmetric_key, AES.MODE_CFB).decrypt(base64.b64decode(ciphertext)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment