Last active
April 24, 2020 15:05
-
-
Save okomestudio/699edbb8e095f07bafcc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
# -*- coding: utf-8 -*- | |
"""S3DistCp | |
Run S3DistCp via boto. Currently only a limited set of S3DistCp | |
options useful for preparing AWS/EMR input data are exposed. | |
For details of S3DistCp options, see Distributed Copy Using S3DistCP: | |
http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html | |
Examples | |
-------- | |
For preparing input data for AWS/EMR, a typical run concatenates a | |
number of small files into a compressed file, following the given | |
group-by instruction. For example: | |
$ python s3distcp.py \ | |
your_ec2_key \ | |
s3://inputbucket/ | |
s3://outputbucket/ \ | |
--group-by ".*([0-9a-f])[0-9a-f]*\.json" \ | |
--target-size 2048 \ | |
--output-codec lzo | |
will group JSON files under s3://inputbucket/ by the first HEX | |
character (followed by HEX sequence of arbitrary length) and create | |
LZO-compressed output files. Each output file will be roughly 2 GB | |
(before compression), with it file name prefixed by that HEX char and | |
sequential numbers appended in case multiple output files are created | |
due to exceeding the target file size. | |
""" | |
from argparse import ArgumentParser | |
import logging | |
import os | |
import re | |
import time | |
import boto | |
from boto import emr | |
from boto.emr.step import JarStep | |
from boto.emr.instance_group import InstanceGroup | |
log = logging.getLogger(__name__) | |
log.setLevel(logging.INFO) | |
ch = logging.StreamHandler() | |
ch.setFormatter(logging.Formatter( | |
fmt='%(asctime)s %(message)s', | |
datefmt='%m/%d/%Y %H:%M:%S')) | |
log.addHandler(ch) | |
def ensure_unicode(s, encoding='utf-8'): | |
return s if isinstance(s, unicode) else s.decode(encoding) | |
class S3URI(unicode): | |
validate = re.compile(ur'^s3://(.+)$', re.U) | |
def __init__(self, uri): | |
b, k = self.split_s3_path(uri) | |
self.bucket_name = b | |
self.key_name = k | |
@classmethod | |
def split_s3_path(cls, uri): | |
m = cls.validate.search(uri) | |
if not m: | |
raise ValueError(u'Invalid S3 URI ({})'.format(uri)) | |
ts = m.group(1).split('/', 1) | |
bucket = ts[0] | |
key = ts[1] if len(ts) > 1 else '' | |
return ensure_unicode(bucket), ensure_unicode(key) | |
def __unicode__(self): | |
return os.path.join(u's3://', self.bucket_name, self.key_name) | |
def __str__(self): | |
return self.__unicode__.encode('utf-8') | |
def main(args): | |
srcs = [S3URI(src) for src in args.src] | |
dest = S3URI(args.dest) | |
region = args.region | |
process_name = os.path.splitext(os.path.basename(__file__))[0] | |
s3conn = boto.connect_s3() | |
if s3conn.lookup(dest.bucket_name) is None: | |
log.info('S3 bucket {} does not exist, quitting.'.format( | |
dest.bucket_name)) | |
exit(1) | |
log.info('Creating EMR job...') | |
instance_type = args.ec2_instance_type | |
igs = [ | |
InstanceGroup(1, 'MASTER', 'm1.large', 'ON_DEMAND', 'master'), | |
InstanceGroup(1, 'CORE', instance_type, 'ON_DEMAND', 'core')] | |
if args.ec2_num_task_instances: | |
igs.append(InstanceGroup( | |
args.ec2_num_task_instances, 'TASK', instance_type, | |
'SPOT', 'task', args.ec2_task_instance_bid_price)) | |
step_args = [ | |
'--s3Endpoint', ''.join(['s3-', region, '.amazonaws.com']), | |
'--dest', dest] | |
for src in srcs: | |
step_args.extend(['--src', src]) | |
if args.src_pattern: | |
step_args.extend(['--srcPattern', args.src_pattern]) | |
if args.group_by: | |
step_args.extend(['--groupBy', args.group_by]) | |
if args.target_size: | |
step_args.extend(['--targetSize', str(args.target_size)]) | |
if args.append_to_last_file: | |
step_args.extend(['--appendToLastFile']) | |
if args.output_codec: | |
step_args.extend(['--outputCodec', args.output_codec]) | |
steps = [JarStep( | |
name='emr-s3distcp', | |
jar='/home/hadoop/lib/emr-s3distcp-1.0.jar', | |
step_args=step_args)] | |
if args.log_uri: | |
log_uri = os.path.join(args.log_uri, process_name) | |
else: | |
log_uri = None | |
conn = emr.connect_to_region(region) | |
jobid = conn.run_jobflow( | |
name=process_name, | |
ec2_keyname=args.key_name, | |
instance_groups=igs, | |
log_uri=log_uri, | |
steps=steps, | |
enable_debugging=args.ec2_enable_debugging, | |
ami_version=args.ec2_ami_version, | |
job_flow_role='EMR_EC2_DefaultRole', | |
service_role='EMR_DefaultRole') | |
log.info(u'EMR job ({}): {} => {}'.format(jobid, src, dest)) | |
prev_state = None | |
while True: | |
time.sleep(15) | |
status = conn.describe_jobflow(jobid) | |
if status.state in ['COMPLETED', 'FAILED', 'TERMINATED']: | |
break | |
if status.state != prev_state: | |
log.info(u'{} ({})'.format(status.state, jobid)) | |
prev_state = status.state | |
log.info(u'{} ({})'.format(status.state, jobid)) | |
if __name__ == '__main__': | |
p = ArgumentParser(description=__doc__.strip()) | |
p.add_argument( | |
'key_name', type=str, | |
help='AWS/EC2 key name') | |
p.add_argument( | |
'src', type=str, nargs='+', metavar='src1 [src2 ...]', | |
help='source S3 URI(s)') | |
p.add_argument( | |
'dest', type=str, | |
help='destination S3 URI') | |
p.add_argument( | |
'--src-pattern', type=str, default=None, | |
help='--srcPattern option of S3DistCp') | |
p.add_argument( | |
'--group-by', type=str, default=None, | |
help='--groupBy option of S3DistCp') | |
p.add_argument( | |
'--target-size', type=int, default=None, | |
help='target size in mebibytes (MiB) based on the group-by option') | |
p.add_argument( | |
'--append-to-last-file', action='store_true', default=False, | |
help='--appendToLastFile option of S3DistCp') | |
p.add_argument( | |
'--output-codec', type=str, default=None, | |
help='compression codec for the copied files.') | |
default = 'us-west-2' | |
p.add_argument( | |
'--region', type=str, default=default, | |
help='AWS/EC2 region (default: {})'.format(default)) | |
default = None | |
p.add_argument( | |
'--log-uri', type=str, default=default, | |
help='S3 URI for storing logs (default: {})'.format(default)) | |
p.add_argument( | |
'--ec2-ami-version', type=str, default='3.9.0', | |
help='EC2 AMI version') | |
p.add_argument( | |
'--ec2-enable-debugging', action='store_true', default=False, | |
help='Enable jobflow debugging') | |
p.add_argument( | |
'--ec2-instance-type', type=str, default='c3.8xlarge', | |
help='EMR instance type') | |
p.add_argument( | |
'--ec2-num-task-instances', type=int, default=0, | |
help='Number of task instances') | |
p.add_argument( | |
'--ec2-task-instance-bid-price', type=float, default=1.95, | |
help='Bid price for task instances') | |
main(p.parse_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment