Last active
September 12, 2019 06:07
-
-
Save jg75/af86bcc4caec03098d52 to your computer and use it in GitHub Desktop.
boto3 examples
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import botocore | |
from boto3.s3.transfer import S3Transfer | |
def lookup(s3, bucket_name): | |
try: | |
s3.meta.client.head_bucket(Bucket=bucket_name) | |
except botocore.exceptions.ClientError as e: | |
error_code = int(e.response['Error']['Code']) | |
if error_code == 404: | |
return False | |
return True | |
if __name__ == '__main__': | |
s3 = boto3.resource('s3') | |
bucket_name = 'test-kick-the-bucket' | |
bucket = s3.Bucket(bucket_name) | |
cors = bucket.Cors() | |
# Getting the bucket doesn't guarantee that it exists. | |
# Check if it's a bucket, if not, then create it | |
# and add a CORS rule. | |
if not lookup(s3, bucket_name): | |
s3.create_bucket(Bucket=bucket_name) | |
# Cross Origin Resource Sharing | |
# Create a new CORS rule | |
cors.put(CORSConfiguration={ | |
'CORSRules': [{ | |
'AllowedMethods': ['GET'], | |
'AllowedOrigins': ['*'] | |
}] | |
}) | |
else: | |
# Cross Origin Resource Sharing | |
# Delete any existing CORS rules | |
cors.delete() | |
# Create a new CORS rule | |
cors.put(CORSConfiguration={ | |
'CORSRules': [{ | |
'AllowedMethods': ['GET'], | |
'AllowedOrigins': ['*'] | |
}] | |
}) | |
# Creating a bucket that already exists | |
# doesn't actually do anything anyway | |
s3.create_bucket(Bucket=bucket_name) | |
# Add a key | |
key = s3.Object(bucket_name, 'stuff.txt') | |
key.put(Body='stuff!') | |
# Add a key from a file | |
key = s3.Object(bucket_name, 's3_stuff.py') | |
key.put(Body=open('s3_stuff.py', 'rb')) | |
# Add a key and set access controls to publically available | |
key = s3.Object(bucket_name, 'stuff') | |
key.put(Body='stuff!', ACL='public-read') | |
# Add a key with some metadata added | |
key = s3.Object(bucket_name, 'funny_stuff') | |
key.put(Body='stuff!', Metadata={'foo': 'bar'}) | |
# Iterate all the buckets | |
# for bucket in s3.buckets.all(): | |
# Iterate all the keys | |
for key in bucket.objects.all(): | |
metadata = key.get()['Metadata'] | |
foo = metadata.get('foo') | |
if metadata and foo: | |
print "%s: foo=%s" % (key.key, foo) | |
else: | |
print key.key | |
# S3Transfer client example | |
s3_client = boto3.client('s3') | |
s3_transfer = S3Transfer(s3_client) | |
s3_transfer.upload_file('./example.txt', bucket.name, 'example.txt') | |
s3_transfer.download_file(bucket.name, 'example.txt', './example.txt') | |
# Delete all the things! | |
for key in bucket.objects.all(): | |
key.delete() | |
bucket.delete() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
# Override the region name configured in the default session. | |
boto3.setup_default_session(region_name='us-east-1') | |
sqs = boto3.resource('sqs') | |
queue = sqs.get_queue_by_name(QueueName='test') | |
# Check if the queue exists with a 5 second delay, | |
# if not, then create it with a 5 second delay. | |
if not queue: | |
attributes = {'DelaySeconds': '5'} | |
queue = sqs.create_queue( | |
QueueName=queue_name, | |
Attributes=attributes | |
) | |
# Throw an error if another queue with that name already exists. | |
elif queue.attributes.get('DelaySeconds') != '5': | |
# List all queues in this region | |
for queue in sqs.queues.all(): | |
print queue.attributes['QueueArn'].split(':')[-1], queue.url | |
raise ValueError(': A queue named test is already in use') | |
for message in queue.receive_messages(MessageAttributeNames=['Author']): | |
author = '' | |
if message.message_attributes is not None: | |
author = message.message_attributes.get('Author').get('StringValue') | |
print '%' * 30 | |
print "%s: %s" % (author, message.body) | |
print message | |
print '%' * 30 | |
# Delete the message once it's been processed | |
message.delete() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
# Override the region name configured in the default session. | |
boto3.setup_default_session(region_name='us-east-1') | |
sqs = boto3.resource('sqs') | |
queue_name = 'test' | |
queue = None | |
# Check if the queue exists with a 5 second delay, | |
# if not, then create it with a 5 second delay. | |
try: | |
queue = sqs.get_queue_by_name(QueueName=queue_name) | |
# Throw an error if another queue with that name already exists. | |
if queue.attributes.get('DelaySeconds') != '5': | |
raise ValueError(': A queue named test is already in use') | |
except: | |
attributes = {'DelaySeconds': '5'} | |
queue = sqs.create_queue( | |
QueueName=queue_name, | |
Attributes=attributes | |
) | |
# List all queues in this region | |
for queue in sqs.queues.all(): | |
print queue.attributes['QueueArn'].split(':')[-1], queue.url | |
# Send a message onto the queue | |
response = queue.send_message( | |
MessageBody='hello world', | |
MessageAttributes={ | |
'Author': { | |
'StringValue': 'Viking Pirate Ninja Rock God', | |
'DataType': 'String' | |
} | |
} | |
) | |
# Print the response data if the request failed | |
if response.get('ResponseMetadata').get('HTTPStatusCode') != 200: | |
print response | |
# Create a bunch of message entries | |
entries = [] | |
for i in range(1, 10): | |
entries.append({ | |
'Id': "Blah-%s" % str(i), | |
'MessageBody': 'all kinds of stuff', | |
'MessageAttributes': { | |
'Author': { | |
'StringValue': 'Viking Pirate Ninja Rock God', | |
'DataType': 'String' | |
} | |
} | |
}) | |
# Batch the messages onto the queue | |
response = queue.send_messages(Entries=entries) | |
# Print the response data for failed messages | |
failed = response.get('Failed') | |
if failed: | |
print failed |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
I am stuff. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment