Created
July 9, 2012 23:30
-
-
Save irskep/3079772 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import itertools | |
| import os | |
| from mrjob.parse import is_s3_uri | |
| from mrjob.util import extract_dir_for_tar | |
| def _unique_suffix(): | |
| for i in itertools.count(): | |
| yield '-mrjob-%d' % i | |
| def _split_path(path): | |
| """Split a path like /foo/bar.py#baz.py into (path, name) | |
| (in this case: '/foo/bar.py', 'baz.py'). | |
| It's valid to specify no name with something like '/foo/bar.py#' | |
| In practice this means that we'll pick a name. | |
| """ | |
| if '#' in path: | |
| path, name = path.split('#', 1) | |
| if '/' in name or '#' in name: | |
| raise ValueError('Bad name %r; must not contain # or /' % name) | |
| # empty names are okay | |
| else: | |
| name = os.path.basename(path) | |
| return name, path | |
| class BootstrapItem(object): | |
| NAME = None | |
| def __init__(self, s3_tmp_uri, args, opts): | |
| self.s3_tmp_uri = s3_tmp_uri | |
| self.args = args | |
| self.opts = opts | |
| def actions(self): | |
| """Iterable of bootstrap actions as lists of arguments, e.g.:: | |
| yield ['s3://elasticmapreduce/bootstrap-actions/configure-hadoop', | |
| '-m', 'mapred.tasktracker.map.tasks.maximum=1'] | |
| """ | |
| return () | |
| def files_for_upload(self, existing_files): | |
| """Iterable of files for upload represented as dictionaries, e.g.:: | |
| yield { | |
| 'local_path': '/.../file.txt', | |
| 'remote_name': 'file.txt', | |
| 's3_uri': 's3://.../file.txt', | |
| } | |
| :param existing_files: file dicts already generated by other bootstrap | |
| items | |
| """ | |
| return () | |
| def bootstrap_script_lines(self): | |
| """Iterable of lines to add to the bootstrap script, without trailing | |
| newlines | |
| """ | |
| return () | |
| def wrapper_script_lines(self): | |
| """Iterable of lines to add to the wrapper script, without trailing | |
| newlines | |
| """ | |
| return () | |
| class BootstrapScriptHeader(BootstrapItem): | |
| NAME = 'bootstrap_script_header' | |
| def bootstrap_script_lines(self): | |
| # shebang | |
| yield '#!/usr/bin/python' | |
| yield '' | |
| # imports | |
| yield 'from __future__ import with_statement' | |
| yield '' | |
| yield 'import distutils.sysconfig' | |
| yield 'import os' | |
| yield 'import stat' | |
| yield 'from subprocess import call, check_call' | |
| yield 'from tempfile import mkstemp' | |
| yield 'from xml.etree.ElementTree import ElementTree' | |
| yield '' | |
| class BootstrapAction(BootstrapItem): | |
| NAME = 'action' | |
| def actions(self): | |
| yield self.args | |
| class UploadFiles(BootstrapItem): | |
| NAME = 'upload_files' | |
| def __init__(self, *args, **kwargs): | |
| super(UploadFiles, self).__init__(*args, **kwargs) | |
| self.files = [] | |
| def files_for_upload(self, existing_files): | |
| for rel_path_with_fragment in self.args: | |
| remote_name, rel_path = _split_path(rel_path_with_fragment) | |
| if not remote_name: | |
| remote_name = '%s%s' % ( | |
| os.path.basename(rel_path), _unique_suffix()) | |
| abs_path = os.path.abspath(rel_path) | |
| if not os.path.exists(abs_path): | |
| raise ValueError("File %r does not exist" % rel_path) | |
| s3_uri = self.s3_tmp_uri + 'files/' + remote_name | |
| if not is_s3_uri(s3_uri): | |
| s3_uri = self.s3_tmp_uri + 'files/file' + _unique_suffix() | |
| # caller will confirm uniqueness of paths | |
| self.files.append({ | |
| 'local_path': abs_path, | |
| 'remote_name': remote_name, | |
| 's3_uri': s3_uri, | |
| }) | |
| return self.files | |
| def bootstrap_script_lines(self): | |
| for file_dict in self.files: | |
| for line in self.bootstrap_file_lines(file_dict): | |
| yield line | |
| def bootstrap_file_lines(self, file_dict): | |
| yield "check_call(['hadoop', 'fs', '-copyToLocal', %r, %r])" % ( | |
| file_dict['s3_uri'], file_dict['remote_name']) | |
| class PythonArchives(UploadFiles): | |
| NAME = 'python_archives' | |
| def __init__(self, *args, **kwargs): | |
| super(PythonArchives, self).__init__(*args, **kwargs) | |
| self.python_bin_in_list = ', '.join( | |
| repr(opt) for opt in self.opts['python_bin']) | |
| def bootstrap_file_lines(self, file_dict): | |
| yield '# install Python archive %s' % file_dict['local_path'] | |
| assert file_dict['remote_name'].endswith('.tar.gz') | |
| for line in super(PythonArchives, self).bootstrap_file_lines( | |
| file_dict): | |
| yield line | |
| yield "check_call(['tar', 'xfz', %r])" % file_dict['remote_name'] | |
| # figure out name of dir to CD into | |
| cd_into = extract_dir_for_tar(file_dict['remote_name']) | |
| # install the module | |
| yield "check_call(['sudo', %s, 'setup.py', 'install'], cwd=%r)" % ( | |
| self.python_bin_in_list, cd_into) | |
| yield '' | |
| class BootstrapMrjob(UploadFiles): | |
| NAME = 'bootstrap_mrjob' | |
| def __init__(self, mrjob_tar_gz_path, *args, **kwargs): | |
| super(BootstrapMrjob, self).__init__(*args, **kwargs) | |
| self.files = [mrjob_tar_gz_path + '#'] | |
| def bootstrap_file_lines(self, file_dict): | |
| for line in super(PythonArchives, self).bootstrap_file_lines( | |
| file_dict): | |
| yield line | |
| yield '# bootstrap mrjob' | |
| yield "site_packages = distutils.sysconfig.get_python_lib()" | |
| yield "check_call(['sudo', 'tar', 'xfz', %r, '-C', site_packages])" % ( | |
| file_dict['remote_name']) | |
| # re-compile pyc files now, since mappers/reducers can't | |
| # write to this directory. Don't fail if there is extra | |
| # un-compileable crud in the tarball. | |
| yield "mrjob_dir = os.path.join(site_packages, 'mrjob')" | |
| yield "call(['sudo', %s, '-m', 'compileall', '-f', mrjob_dir])" % ( | |
| self.python_bin_in_list) | |
| yield '' | |
| class RunCommand(BootstrapItem): | |
| NAME = 'run_command' | |
| def bootstrap_script_lines(self): | |
| if isinstance(self.args, basestring): | |
| yield 'check_call(%r, shell=True)' % self.args | |
| else: | |
| yield 'check_call(%r)' % self.args | |
| class RunScript(UploadFiles): | |
| def bootstrap_file_lines(self, file_dict): | |
| for path in self.args: | |
| yield '# run script %s' % path | |
| yield 'check_call(%r)' % (['./' + file_dict['name']],) | |
| yield '' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment