-
-
Save wifecooky/6690e2c1015da54db7766fe09611f2fb to your computer and use it in GitHub Desktop.
[AWS] s3 and sqs boto3 client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import unicode_literals | |
""" | |
S3 bucket CRUD operations core module | |
""" | |
import logging | |
import time | |
import boto3 | |
import botocore | |
from botocore.client import Config | |
class S3Client: # pragma: no cover | |
""" | |
S3 class encapsulates uploading, | |
downloading & other s3 file ops and handling errors | |
This is not covered in unit test test coverage, | |
but in integration tests since its an external process | |
""" | |
S3_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.000Z' # Not used | |
RECONNECT_SLEEP_SECS = 0.5 | |
CONN_RETRIES = 10 | |
CONN_CONFIG = Config(connect_timeout=5, retries={'max_attempts': 0}) | |
def __init__(self, config, reconnect_sleep_secs=RECONNECT_SLEEP_SECS, conn_retries=CONN_RETRIES): | |
""" | |
Load config from passed params or override with defaults | |
:param config: dict, config with access_key_id, secret_access_key, bucket name | |
:return: None | |
""" | |
self.config = config | |
self.bucket_name = self.config['bucket_name'] | |
self.access_key_id = self.config['access_key_id'] | |
self.secret_access_key = self.config['secret_access_key'] | |
self.aws_region = self.config['aws_region'] | |
self.RECONNECT_SLEEP_SECS = reconnect_sleep_secs | |
self.CONN_RETRIES = conn_retries | |
self.connection_attempt = 0 | |
self.connection = None | |
self.bucket = None | |
self.connect() | |
def connect(self): | |
""" | |
Creates object connection to the designated region (self.boto.cli_region). | |
The connection is established on the first call for this instance (lazy) and cached. | |
:return: None | |
""" | |
try: | |
self.connection_attempt += 1 | |
self.connection = boto3.resource('s3', region_name=self.aws_region, | |
aws_access_key_id=self.access_key_id, | |
aws_secret_access_key=self.secret_access_key, | |
config=self.CONN_CONFIG) | |
self._get_bucket() | |
except Exception as e: | |
logging.exception("S3Client.connect failed with params {}, error {}".format(self.config, e)) | |
if self.connection_attempt >= self.CONN_RETRIES: | |
raise | |
def _get_bucket(self): | |
""" | |
Uses S3 Connection and return connection to queue | |
S3 used for getting the listing file in the SQS message | |
:return: None | |
""" | |
try: | |
self.bucket = self.connection.Bucket(name=self.bucket_name) | |
except Exception as e: | |
# I.e. gaierror: [Errno -2] Name or service not known | |
logging.exception("S3Client.get_bucket unable to get bucket {}, error {}".format(self.bucket_name, e)) | |
raise | |
def list(self): | |
""" | |
List contents of a bucket | |
:return: list of s3.ObjectSummary | |
""" | |
return list(self.bucket.objects.all()) | |
def read(self, key): | |
""" | |
Get bucket key value, return contents | |
Get contents of a file from S3 | |
:param key: str, bucket key filename | |
:return: str, contents of key | |
""" | |
try: | |
obj = self.connection.Object(self.bucket_name, key) | |
contents = obj.get()['Body'].read().decode('utf-8') | |
except Exception as e: # Retry in-case we have a connection error | |
logging.exception("S3Client.read failed for key {}, error {}".format(key, e)) | |
time.sleep(self.RECONNECT_SLEEP_SECS) | |
self.connect() | |
contents = self.read(key) | |
return contents | |
def write(self, key, contents): | |
""" | |
Create bucket key from string | |
Write content to a file in S3 | |
:param contents: str, contents to save to a file | |
:param key: str, bucket key filename | |
:return: dict, output | |
""" | |
output = response = None | |
try: | |
response = self.connection.Object(self.bucket_name, key).put(Body=contents) | |
output = { | |
'file_name': key, | |
# 'is_new': not k.exists(), | |
} | |
except Exception as e: | |
logging.exception("S3Client.write failed for key {}, error {}, response {}".format(key, e, response)) | |
return output | |
def upload(self, key, origin_path): | |
""" | |
Create bucket key from filename | |
Upload a file to S3 from origin file | |
:param origin_path: str, path to origin filename | |
:param key: str, bucket key filename | |
:return: bool, success | |
""" | |
try: | |
file_body = open(origin_path, 'rb') | |
self.connection.Bucket(self.bucket_name).put_object(Key=key, Body=file_body) | |
except Exception as e: | |
logging.exception("S3Client.upload failed for key {}, error {} ".format(key, e)) | |
return True | |
def download(self, key, destination): | |
""" | |
Get key | |
Download a file from S3 to destination | |
:param destination: str, path to local file name | |
:param key: str, bucket key filename | |
:return: bool, success | |
""" | |
result = True | |
try: | |
self.bucket.download_file(key, destination) | |
except botocore.exceptions.ClientError as e: | |
if e.response['Error']['Code'] == "404": | |
logging.error("S3Client.download bucket missing key file {}".format(key)) | |
else: | |
raise | |
except Exception as e: | |
logging.warning("S3Client.download failed for key {} to {}, error {}, retrying".format(key, destination, e)) | |
time.sleep(self.RECONNECT_SLEEP_SECS) | |
self.connect() | |
result = self.download(key, destination) | |
return result | |
def remove(self, keys): | |
""" | |
Deletes the given keys from the given bucket. | |
:param keys: list, list of key names | |
:return: bool, success | |
""" | |
logging.warning("S3Client.remove deleting keys {}".format(keys)) | |
objects = [{'Key': key} for key in keys] | |
self.bucket.delete_objects(Delete={'Objects': objects}) | |
return True | |
if __name__ == "__main__": | |
# TODO: turn into tests | |
conf = {'access_key_id': '<test key id>', | |
'secret_access_key': '<test access key>', | |
'aws_region': 'ca-central-1', | |
'bucket_name': 'aws-web-distro'} | |
s3_client = S3Client(config=conf) | |
print(s3_client.list()) | |
# s3_client.upload('test2.jpg', '/Users/rad/Desktop/test.jpg') | |
print(s3_client.read('readonly-access/readonly.txt')) | |
# print(s3_client.write('new-test-key.txt', 'this is some data')) | |
print(s3_client.remove(keys=['test.jpg', 'test2.jpg'])) | |
# s3_client.download('new-test-key.txt', 'my_local_image.jpg') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import unicode_literals | |
""" | |
SQS feed queue core class | |
Sample SQS message from S3 file created trigger: | |
{ | |
"Records": [ | |
{ | |
"eventVersion": "2.0", | |
"eventSource": "aws:s3", | |
"awsRegion": "us-west-2", | |
"eventTime": "2009-03-21T03:24:48.558Z", | |
"eventName": "ObjectCreated:Put", | |
"userIdentity": { | |
"principalId": "AWS:someid" | |
}, | |
"requestParameters": { | |
"sourceIPAddress": "1.2.3.4" | |
}, | |
"responseElements": { | |
"x-amz-request-id": "some-request-id", | |
"x-amz-id-2": "some-id" | |
}, | |
"s3": { | |
"s3SchemaVersion": "1.0", | |
"configurationId": "triggerName", | |
"bucket": { | |
"name": "us-west-2-feeds-uat", | |
"ownerIdentity": { | |
"principalId": "some-id" | |
}, | |
"arn": "arn:aws:s3:::us-west-2-bucket" | |
}, | |
"object": { | |
"key": "some file", | |
"size": 1319506, | |
"eTag": "some-tag", | |
"sequencer": "some-sequence" | |
} | |
} | |
} | |
] | |
} | |
""" | |
import logging | |
import time | |
import boto3 | |
from botocore import exceptions | |
# Try to get ujson if available | |
try: | |
import ujson as json | |
except ImportError: | |
import json | |
class SQSClient: | |
""" | |
SQS class encapsulates queue operations, | |
This is not covered in unit test test coverage, | |
but in integration tests since its an external process | |
""" | |
# Request timeout to poll for msg, must be 0 to 20, poll seconds | |
MSG_WAIT_SECONDS = 20 | |
# Make message invisible to other consumers. Defaults via SQS to 30, visibility timeout | |
MSG_INVISIBLE_SECONDS = 14 | |
RECONNECT_SLEEP_SECS = 0.5 | |
CONN_RETRIES = 20 | |
def __init__(self, config, msg_wait_seconds=MSG_WAIT_SECONDS, | |
msg_invisible_seconds=MSG_INVISIBLE_SECONDS, reconnect_sleep_secs=RECONNECT_SLEEP_SECS, | |
conn_retries=CONN_RETRIES): | |
""" | |
Load config from passed params or override with defaults | |
:param config: dict with access_key_id, secret_access_key, bucket name | |
:return: None | |
""" | |
# Load from passed params or override with defaults | |
try: | |
self.config = config | |
self.access_key_id = self.config['access_key_id'] | |
self.secret_access_key = self.config['secret_access_key'] | |
self.aws_region = self.config['aws_region'] | |
self.queue_url = self.config['queue_url'] | |
self.MSG_WAIT_SECONDS = msg_wait_seconds | |
self.MSG_INVISIBLE_SECONDS = msg_invisible_seconds | |
self.RECONNECT_SLEEP_SECS = reconnect_sleep_secs | |
self.CONN_RETRIES = conn_retries | |
except Exception as e: | |
logging.exception("SQSClient.__init__ configuration error {}".format(e)) | |
self.access_key_id = None | |
self.secret_access_key = None | |
self.aws_region = None | |
self.queue_url = None | |
self.config = None | |
self.connection_attempt = 0 | |
self.client = None | |
self.connect() | |
def connect(self): | |
""" | |
Establish SQS connection | |
""" | |
try: | |
self.client = boto3.client('sqs', | |
region_name=self.aws_region, | |
aws_access_key_id=self.access_key_id, | |
aws_secret_access_key=self.secret_access_key) | |
self.connection_attempt = 0 # Got queue connection, reset retries | |
except Exception as e: | |
logging.exception("SQSClient.connect failed with params {}, error {}".format(self.config, e)) | |
self.connection_attempt += 1 | |
if self.connection_attempt >= self.CONN_RETRIES: | |
raise | |
def get_messages(self, num_messages=1, msg_invisible_seconds=None, msg_wait_seconds=None): | |
""" | |
Get messages from sqs feed queue | |
:param num_messages: int, number of messages to get, max is 10 | |
:param msg_wait_seconds: int, time to wait (poll time between retries) | |
:param msg_invisible_seconds: int, how long the message is invisible to other consumers | |
:return: list, of sqs messages object | |
""" | |
if msg_invisible_seconds is None: | |
msg_invisible_seconds = self.MSG_INVISIBLE_SECONDS | |
if msg_wait_seconds is None: | |
msg_wait_seconds = self.MSG_WAIT_SECONDS | |
try: | |
# Long polling for a message from SQS (list of 1 message) | |
response = self.client.receive_message(MaxNumberOfMessages=num_messages, | |
QueueUrl=self.queue_url, | |
WaitTimeSeconds=msg_wait_seconds, | |
VisibilityTimeout=msg_invisible_seconds) or {} | |
sqs_messages = response.get('Messages') or [] | |
except Exception as e: | |
# I.e. gaierror: [Errno -2] Name or service not known | |
logging.exception("SQSClient.get_messages error, retrying. {}".format(e)) | |
time.sleep(self.RECONNECT_SLEEP_SECS) | |
self.connect() | |
if self.connection_attempt >= self.CONN_RETRIES: | |
raise | |
sqs_messages = self.get_messages() | |
return sqs_messages | |
def delete_message(self, sqs_message): | |
""" | |
Delete an sqs msg | |
:param sqs_message: str, receipt handle associated with | |
the message to delete. | |
:return: | |
""" | |
try: | |
receipt_handle = sqs_message['ReceiptHandle'] | |
except KeyError: | |
logging.error("SQSClient.delete_message missing 'ReceiptHandle' key in message, required for delete") | |
raise | |
try: | |
self.client.delete_message(QueueUrl=self.queue_url, | |
ReceiptHandle=receipt_handle) | |
except self.client.exceptions.ReceiptHandleIsInvalid: | |
# Message was already deleted, handle no longer valid, old msg | |
pass | |
except Exception as e: | |
logging.exception("SQSClient.delete_message error, retrying. {}".format(e)) | |
time.sleep(self.RECONNECT_SLEEP_SECS) | |
self.connect() | |
if self.connection_attempt >= self.CONN_RETRIES: | |
raise | |
self.delete_message(sqs_message) | |
return True | |
def send_message(self, body, delay_seconds=0): | |
""" | |
For testing only, send a message | |
:param body: str, message_content | |
:param delay_seconds: int, time to make message visible | |
:return: bool, success | |
""" | |
if isinstance(body, dict): | |
body = json.dumps(body) | |
return self.client.send_message( | |
QueueUrl=self.queue_url, | |
MessageBody=body, | |
DelaySeconds=delay_seconds, | |
) | |
if __name__ == "__main__": | |
# TODO: Turn into tests | |
conf = {'access_key_id': '<test key id>', | |
'secret_access_key': '<test access key>', | |
'aws_region': 'ca-central-1', | |
'queue_url': 'https://sqs.ca-central-1.amazonaws.com/584374059506/django-content-services-applenews'} | |
client = SQSClient(conf, conn_retries=1, msg_wait_seconds=10) | |
# Create some messages | |
client.send_message(body={'test': 'body'}) | |
client.send_message(body={'test': 'body'}) | |
msgs = client.get_messages(num_messages=2) | |
assert isinstance(msgs, list) | |
assert msgs | |
assert len(msgs) >= 1, len(msgs) # Could be 1 or 2 | |
assert isinstance(msgs[0], dict), type(msgs[0]) | |
is_deleted = [client.delete_message(msg) for msg in msgs] | |
assert all(is_deleted) | |
# Clean up all messages | |
while msgs: | |
[client.delete_message(msg) for msg in msgs] | |
msgs = client.get_messages(msg_wait_seconds=1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment