Skip to content

Instantly share code, notes, and snippets.

@okomestudio
Last active April 24, 2020 15:05
Show Gist options
  • Save okomestudio/699edbb8e095f07bafcc to your computer and use it in GitHub Desktop.
Save okomestudio/699edbb8e095f07bafcc to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
"""S3DistCp
Run S3DistCp via boto. Currently only a limited set of S3DistCp
options useful for preparing AWS/EMR input data are exposed.
For details of S3DistCp options, see Distributed Copy Using S3DistCP:
http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html
Examples
--------
For preparing input data for AWS/EMR, a typical run concatenates a
number of small files into a compressed file, following the given
group-by instruction. For example:
$ python s3distcp.py \
your_ec2_key \
s3://inputbucket/
s3://outputbucket/ \
--group-by ".*([0-9a-f])[0-9a-f]*\.json" \
--target-size 2048 \
--output-codec lzo
will group JSON files under s3://inputbucket/ by the first HEX
character (followed by HEX sequence of arbitrary length) and create
LZO-compressed output files. Each output file will be roughly 2 GB
(before compression), with it file name prefixed by that HEX char and
sequential numbers appended in case multiple output files are created
due to exceeding the target file size.
"""
from argparse import ArgumentParser
import logging
import os
import re
import time
import boto
from boto import emr
from boto.emr.step import JarStep
from boto.emr.instance_group import InstanceGroup
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter(
fmt='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %H:%M:%S'))
log.addHandler(ch)
def ensure_unicode(s, encoding='utf-8'):
return s if isinstance(s, unicode) else s.decode(encoding)
class S3URI(unicode):
validate = re.compile(ur'^s3://(.+)$', re.U)
def __init__(self, uri):
b, k = self.split_s3_path(uri)
self.bucket_name = b
self.key_name = k
@classmethod
def split_s3_path(cls, uri):
m = cls.validate.search(uri)
if not m:
raise ValueError(u'Invalid S3 URI ({})'.format(uri))
ts = m.group(1).split('/', 1)
bucket = ts[0]
key = ts[1] if len(ts) > 1 else ''
return ensure_unicode(bucket), ensure_unicode(key)
def __unicode__(self):
return os.path.join(u's3://', self.bucket_name, self.key_name)
def __str__(self):
return self.__unicode__.encode('utf-8')
def main(args):
srcs = [S3URI(src) for src in args.src]
dest = S3URI(args.dest)
region = args.region
process_name = os.path.splitext(os.path.basename(__file__))[0]
s3conn = boto.connect_s3()
if s3conn.lookup(dest.bucket_name) is None:
log.info('S3 bucket {} does not exist, quitting.'.format(
dest.bucket_name))
exit(1)
log.info('Creating EMR job...')
instance_type = args.ec2_instance_type
igs = [
InstanceGroup(1, 'MASTER', 'm1.large', 'ON_DEMAND', 'master'),
InstanceGroup(1, 'CORE', instance_type, 'ON_DEMAND', 'core')]
if args.ec2_num_task_instances:
igs.append(InstanceGroup(
args.ec2_num_task_instances, 'TASK', instance_type,
'SPOT', 'task', args.ec2_task_instance_bid_price))
step_args = [
'--s3Endpoint', ''.join(['s3-', region, '.amazonaws.com']),
'--dest', dest]
for src in srcs:
step_args.extend(['--src', src])
if args.src_pattern:
step_args.extend(['--srcPattern', args.src_pattern])
if args.group_by:
step_args.extend(['--groupBy', args.group_by])
if args.target_size:
step_args.extend(['--targetSize', str(args.target_size)])
if args.append_to_last_file:
step_args.extend(['--appendToLastFile'])
if args.output_codec:
step_args.extend(['--outputCodec', args.output_codec])
steps = [JarStep(
name='emr-s3distcp',
jar='/home/hadoop/lib/emr-s3distcp-1.0.jar',
step_args=step_args)]
if args.log_uri:
log_uri = os.path.join(args.log_uri, process_name)
else:
log_uri = None
conn = emr.connect_to_region(region)
jobid = conn.run_jobflow(
name=process_name,
ec2_keyname=args.key_name,
instance_groups=igs,
log_uri=log_uri,
steps=steps,
enable_debugging=args.ec2_enable_debugging,
ami_version=args.ec2_ami_version,
job_flow_role='EMR_EC2_DefaultRole',
service_role='EMR_DefaultRole')
log.info(u'EMR job ({}): {} => {}'.format(jobid, src, dest))
prev_state = None
while True:
time.sleep(15)
status = conn.describe_jobflow(jobid)
if status.state in ['COMPLETED', 'FAILED', 'TERMINATED']:
break
if status.state != prev_state:
log.info(u'{} ({})'.format(status.state, jobid))
prev_state = status.state
log.info(u'{} ({})'.format(status.state, jobid))
if __name__ == '__main__':
p = ArgumentParser(description=__doc__.strip())
p.add_argument(
'key_name', type=str,
help='AWS/EC2 key name')
p.add_argument(
'src', type=str, nargs='+', metavar='src1 [src2 ...]',
help='source S3 URI(s)')
p.add_argument(
'dest', type=str,
help='destination S3 URI')
p.add_argument(
'--src-pattern', type=str, default=None,
help='--srcPattern option of S3DistCp')
p.add_argument(
'--group-by', type=str, default=None,
help='--groupBy option of S3DistCp')
p.add_argument(
'--target-size', type=int, default=None,
help='target size in mebibytes (MiB) based on the group-by option')
p.add_argument(
'--append-to-last-file', action='store_true', default=False,
help='--appendToLastFile option of S3DistCp')
p.add_argument(
'--output-codec', type=str, default=None,
help='compression codec for the copied files.')
default = 'us-west-2'
p.add_argument(
'--region', type=str, default=default,
help='AWS/EC2 region (default: {})'.format(default))
default = None
p.add_argument(
'--log-uri', type=str, default=default,
help='S3 URI for storing logs (default: {})'.format(default))
p.add_argument(
'--ec2-ami-version', type=str, default='3.9.0',
help='EC2 AMI version')
p.add_argument(
'--ec2-enable-debugging', action='store_true', default=False,
help='Enable jobflow debugging')
p.add_argument(
'--ec2-instance-type', type=str, default='c3.8xlarge',
help='EMR instance type')
p.add_argument(
'--ec2-num-task-instances', type=int, default=0,
help='Number of task instances')
p.add_argument(
'--ec2-task-instance-bid-price', type=float, default=1.95,
help='Bid price for task instances')
main(p.parse_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment