Skip to content

Instantly share code, notes, and snippets.

@yuta-imai
Last active August 29, 2015 14:11
Show Gist options
  • Save yuta-imai/4223fcb67a79298cebda to your computer and use it in GitHub Desktop.
Save yuta-imai/4223fcb67a79298cebda to your computer and use it in GitHub Desktop.
A sample S3 file uploader which uses DynamoDB to manage a list of files.
#!/usr/bin/env python
# [coinlocker]
#
# Copyright (c) 2014 Yuta Imai
#
# This software is released under the MIT License.
#
# http://opensource.org/licenses/mit-license.php
from filechunkio import FileChunkIO
import math
import os
import re
import sys
import time
from optparse import OptionParser
import boto
import boto.dynamodb2
import boto.s3
import boto.s3.bucket
import boto.s3.key
boto.debug = 2
chunk_size = 1024 * 1024 * 5
parser = OptionParser()
parser.add_option("-r", "--region", dest="region",
help="Region for S3 and DynamoDB")
parser.add_option("-b", "--bucket", dest="bucket",
help="S3 Bucket name")
parser.add_option("-p", "--prefix", dest="prefix",
help="Prefix for file names on S3")
parser.add_option("-t", "--table", dest="table",
help="DynamoDB table name")
parser.add_option("-d", "--dir", dest="directory",
help="Root of target directory")
(options, args) = parser.parse_args()
s3 = boto.s3.connect_to_region(options.region)
s3bucket = boto.s3.bucket.Bucket(connection=s3,name=options.bucket)
dynamo = boto.dynamodb2.connect_to_region(options.region)
def lock_key(key):
try:
dynamo.put_item(
options.table,
{'filename' : {'S': key}, 'status': {'S': 'uploaded'}},
expected = {
'filename' : { "Exists" : False }
}
)
return True
except Exception,e:
return False
def scan_dir(target):
os.chdir(target)
keys = os.listdir(target)
result = []
for key in keys:
if os.path.isdir(key):
absolute_path = target + '/' + key
result.append(absolute_path)
return result
def find_dirs(root):
to_go = [root]
result = [root]
while(len(to_go)):
target = to_go.pop(0)
scan_result = scan_dir(target)
to_go.extend(scan_result)
result.extend(scan_result)
return
def upload_files(target):
os.chdir(target)
keys = os.listdir(target)
for key in keys:
absolute_file_path = target + '/' + key
if(os.path.isfile(absolute_file_path)):
if lock_key(absolute_file_path):
s3key_name = options.prefix + absolute_file_path
s3key = boto.s3.key.Key(bucket=s3bucket,name=s3key_name)
file_size = os.path.getsize(absolute_file_path)
if file_size > chunk_size:
multi_part_upload(absolute_file_path,file_size)
else:
s3key.set_contents_from_filename(absolute_file_path)
print 'Uploaded: %s' % (s3key.name)
else:
print '%s seems to be uploaded already.' % (absolute_file_path)
def multi_part_upload(file_path,file_size):
mp = s3bucket.initiate_multipart_upload(file_path)
chunk_count = int(math.ceil(file_size/float(chunk_size)))
for i in range(chunk_count):
offset = chunk_size * i
bytes = min(chunk_size, file_size - offset)
with FileChunkIO(file_path,'r',offset=offset,bytes=bytes) as fp:
mp.upload_part_from_file(fp,part_num=i+1)
mp.complete_upload()
os.chdir(options.directory)
root = os.getcwd()
targets = find_dirs(root)
for target in targets:
upload_files(target)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment