Skip to content

Instantly share code, notes, and snippets.

@irskep
Created July 9, 2012 23:30
Show Gist options
  • Save irskep/3079772 to your computer and use it in GitHub Desktop.
Save irskep/3079772 to your computer and use it in GitHub Desktop.
import itertools
import os
from mrjob.parse import is_s3_uri
from mrjob.util import extract_dir_for_tar
def _unique_suffix():
for i in itertools.count():
yield '-mrjob-%d' % i
def _split_path(path):
"""Split a path like /foo/bar.py#baz.py into (path, name)
(in this case: '/foo/bar.py', 'baz.py').
It's valid to specify no name with something like '/foo/bar.py#'
In practice this means that we'll pick a name.
"""
if '#' in path:
path, name = path.split('#', 1)
if '/' in name or '#' in name:
raise ValueError('Bad name %r; must not contain # or /' % name)
# empty names are okay
else:
name = os.path.basename(path)
return name, path
class BootstrapItem(object):
NAME = None
def __init__(self, s3_tmp_uri, args, opts):
self.s3_tmp_uri = s3_tmp_uri
self.args = args
self.opts = opts
def actions(self):
"""Iterable of bootstrap actions as lists of arguments, e.g.::
yield ['s3://elasticmapreduce/bootstrap-actions/configure-hadoop',
'-m', 'mapred.tasktracker.map.tasks.maximum=1']
"""
return ()
def files_for_upload(self, existing_files):
"""Iterable of files for upload represented as dictionaries, e.g.::
yield {
'local_path': '/.../file.txt',
'remote_name': 'file.txt',
's3_uri': 's3://.../file.txt',
}
:param existing_files: file dicts already generated by other bootstrap
items
"""
return ()
def bootstrap_script_lines(self):
"""Iterable of lines to add to the bootstrap script, without trailing
newlines
"""
return ()
def wrapper_script_lines(self):
"""Iterable of lines to add to the wrapper script, without trailing
newlines
"""
return ()
class BootstrapScriptHeader(BootstrapItem):
NAME = 'bootstrap_script_header'
def bootstrap_script_lines(self):
# shebang
yield '#!/usr/bin/python'
yield ''
# imports
yield 'from __future__ import with_statement'
yield ''
yield 'import distutils.sysconfig'
yield 'import os'
yield 'import stat'
yield 'from subprocess import call, check_call'
yield 'from tempfile import mkstemp'
yield 'from xml.etree.ElementTree import ElementTree'
yield ''
class BootstrapAction(BootstrapItem):
NAME = 'action'
def actions(self):
yield self.args
class UploadFiles(BootstrapItem):
NAME = 'upload_files'
def __init__(self, *args, **kwargs):
super(UploadFiles, self).__init__(*args, **kwargs)
self.files = []
def files_for_upload(self, existing_files):
for rel_path_with_fragment in self.args:
remote_name, rel_path = _split_path(rel_path_with_fragment)
if not remote_name:
remote_name = '%s%s' % (
os.path.basename(rel_path), _unique_suffix())
abs_path = os.path.abspath(rel_path)
if not os.path.exists(abs_path):
raise ValueError("File %r does not exist" % rel_path)
s3_uri = self.s3_tmp_uri + 'files/' + remote_name
if not is_s3_uri(s3_uri):
s3_uri = self.s3_tmp_uri + 'files/file' + _unique_suffix()
# caller will confirm uniqueness of paths
self.files.append({
'local_path': abs_path,
'remote_name': remote_name,
's3_uri': s3_uri,
})
return self.files
def bootstrap_script_lines(self):
for file_dict in self.files:
for line in self.bootstrap_file_lines(file_dict):
yield line
def bootstrap_file_lines(self, file_dict):
yield "check_call(['hadoop', 'fs', '-copyToLocal', %r, %r])" % (
file_dict['s3_uri'], file_dict['remote_name'])
class PythonArchives(UploadFiles):
NAME = 'python_archives'
def __init__(self, *args, **kwargs):
super(PythonArchives, self).__init__(*args, **kwargs)
self.python_bin_in_list = ', '.join(
repr(opt) for opt in self.opts['python_bin'])
def bootstrap_file_lines(self, file_dict):
yield '# install Python archive %s' % file_dict['local_path']
assert file_dict['remote_name'].endswith('.tar.gz')
for line in super(PythonArchives, self).bootstrap_file_lines(
file_dict):
yield line
yield "check_call(['tar', 'xfz', %r])" % file_dict['remote_name']
# figure out name of dir to CD into
cd_into = extract_dir_for_tar(file_dict['remote_name'])
# install the module
yield "check_call(['sudo', %s, 'setup.py', 'install'], cwd=%r)" % (
self.python_bin_in_list, cd_into)
yield ''
class BootstrapMrjob(UploadFiles):
NAME = 'bootstrap_mrjob'
def __init__(self, mrjob_tar_gz_path, *args, **kwargs):
super(BootstrapMrjob, self).__init__(*args, **kwargs)
self.files = [mrjob_tar_gz_path + '#']
def bootstrap_file_lines(self, file_dict):
for line in super(PythonArchives, self).bootstrap_file_lines(
file_dict):
yield line
yield '# bootstrap mrjob'
yield "site_packages = distutils.sysconfig.get_python_lib()"
yield "check_call(['sudo', 'tar', 'xfz', %r, '-C', site_packages])" % (
file_dict['remote_name'])
# re-compile pyc files now, since mappers/reducers can't
# write to this directory. Don't fail if there is extra
# un-compileable crud in the tarball.
yield "mrjob_dir = os.path.join(site_packages, 'mrjob')"
yield "call(['sudo', %s, '-m', 'compileall', '-f', mrjob_dir])" % (
self.python_bin_in_list)
yield ''
class RunCommand(BootstrapItem):
NAME = 'run_command'
def bootstrap_script_lines(self):
if isinstance(self.args, basestring):
yield 'check_call(%r, shell=True)' % self.args
else:
yield 'check_call(%r)' % self.args
class RunScript(UploadFiles):
def bootstrap_file_lines(self, file_dict):
for path in self.args:
yield '# run script %s' % path
yield 'check_call(%r)' % (['./' + file_dict['name']],)
yield ''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment