Created
January 13, 2009 19:21
-
-
Save Arachnid/46573 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
"""Converts Python packages into app-engine-friendly zips or directories.""" | |
import logging | |
import optparse | |
import os | |
import re | |
import sys | |
import zipfile | |
class PackageReader(object): | |
"""Abstract base class for package-reading functionality.""" | |
def __init__(self, path): | |
self.path = path | |
def getFiles(self, prefix): | |
"""Returns a list of files from the package.""" | |
raise NotImplemented() | |
def isFile(self, prefix): | |
"""Checks if the specified file exists in the package.""" | |
raise NotImplemented() | |
def isDir(self, prefix): | |
"""Checks if the specified directory exists in the package.""" | |
raise NotImplemented() | |
def getFile(self, path): | |
"""Returns the bytes of the file.""" | |
raise NotImplemented() | |
def copyTo(self, src, dest): | |
"""Copies the specified package component.""" | |
raise NotImplemented() | |
def addToZip(self, src, zip): | |
"""Adds the specified package component to a zipfile.""" | |
raise NotImplemented() | |
class DirPackageReader(PackageReader): | |
def __init__(self, path): | |
if not path.endswith('/'): | |
path += '/' | |
super(DirPackageReader, self).__init__(path) | |
def getFiles(self, prefix): | |
walkdir = os.path.join(self.path, prefix) | |
for dirpath, dirnames, filenames in os.walk(walkdir): | |
for filename in filenames: | |
yield os.path.join(dirpath[len(self.path):], filename) | |
def isFile(self, path): | |
path = os.path.join(self.path, path) | |
return os.path.isfile(path) | |
def isDir(self, path): | |
path = os.path.join(self.path, path) | |
return os.path.isdir(path) | |
def getFile(self, src): | |
src = os.path.join(self.path, src) | |
srcfile = open(src, 'rb') | |
data = srcfile.read() | |
srcfile.close() | |
return data | |
class ZipPackageReader(PackageReader): | |
def __init__(self, path, prefix=''): | |
super(ZipPackageReader, self).__init__(path) | |
self.zip = zipfile.ZipFile(path, 'r') | |
self.prefix = prefix | |
self.manifest = {} | |
self.dirs = set() | |
def _buildManifest(self): | |
if not self.manifest: | |
for zi in self.zip.infolist(): | |
dir, filename = os.path.split(zi.filename) | |
self.manifest.setdefault(dir, set()).add(filename) | |
while dir: | |
self.dirs.add(dir) | |
dir, filename = os.path.split(dir) | |
def getFiles(self, dir): | |
self._buildManifest() | |
listing = self.manifest.get(os.path.join(self.prefix, dir), set()) | |
return [os.path.join(dir, x) for x in listing] | |
def isFile(self, path): | |
self._buildManifest() | |
path = os.path.join(self.prefix, path) | |
dir, filename = os.path.split(path) | |
if dir not in self.manifest: | |
return False | |
return filename in self.manifest[dir] | |
def isDir(self, path): | |
self._buildManifest() | |
path = os.path.join(self.prefix, path) | |
return path in self.dirs | |
def getFile(self, src): | |
path = os.path.join(self.prefix, src) | |
return self.zip.read(path) | |
def getPackages(path=None): | |
"""Returns an iterator of PackageReader objects for the components of path.""" | |
if not path: | |
path = sys.path | |
for loc in path: | |
if os.path.isdir(loc): | |
yield DirPackageReader(loc) | |
elif os.path.isfile(loc): | |
yield ZipPackageReader(loc) | |
else: | |
tail = 'dummy' | |
prefix = '' | |
while tail and not os.path.exists(loc): | |
loc, tail = os.path.split(loc) | |
prefix = os.path.join(tail, prefix) | |
if os.path.isfile(loc): | |
yield ZipPackageReader(loc, prefix) | |
def findPackage(name, path=None): | |
"""Finds the specified package on the path and returns it. | |
The return value is a (is_dir, basename, PackageReader) tuple if found, or | |
(None, None, None) if not found. | |
""" | |
for package in getPackages(path): | |
if package.isDir(name): | |
return (True, name, package) | |
elif package.isFile("%s.py" % name): | |
return (False, "%s.py" % name, package) | |
return (None, None, None) | |
class PackageWriter(object): | |
def writeFile(self, reader, src): | |
raise NotImplemented() | |
def close(self): | |
raise NotImplemented() | |
class DirPackageWriter(object): | |
def __init__(self, path): | |
self.path = path | |
def writeFile(self, reader, src): | |
outname = os.path.join(self.path, src) | |
outdir = os.path.dirname(outname) | |
if not os.path.isdir(outdir): | |
os.makedirs(os.path.dirname(outname)) | |
outfile = open(outname, "wb") | |
outfile.write(reader.getFile(src)) | |
outfile.close() | |
def close(self): | |
pass | |
class ZipPackageWriter(object): | |
def __init__(self, path, name): | |
self.zip = zipfile.ZipFile(os.path.join(path, name), "w") | |
def writeFile(self, reader, src): | |
self.zip.writestr(src, reader.getFile(src)) | |
def close(self): | |
self.zip.close() | |
class PackagerApp(object): | |
ignore_paths = [ | |
'\.py[oc]$', | |
'^EGG-INFO/', | |
'^tests/', | |
'\.so$', | |
'\.dylib$', | |
] | |
ignore_re = re.compile('|'.join(ignore_paths)) | |
def getOptionParser(self): | |
parser = optparse.OptionParser(usage='%prog [options] package ... dest', | |
description=__doc__) | |
parser.add_option("-q", "--quiet", action="store_const", | |
const=logging.ERROR, dest="verbosity", | |
default=logging.WARN, help="Print errors only") | |
parser.add_option("-v", "--verbose", action="store_const", | |
const=logging.DEBUG, dest="verbosity", | |
help="Print everything") | |
parser.add_option("-u", "--nozip", action="store_true", dest="nozip", | |
help="Don't zip: Output a directory instead of a zipfile") | |
parser.add_option("-n", "--zipname", action="store", dest="zipname", | |
help="Select the name of the zip to create.") | |
return parser | |
def copyPackage(self, reader, writer, pkg): | |
for fn in reader.getFiles(pkg): | |
if fn.endswith('.so') or fn.endswith('.dylib'): | |
logging.warn(" Warning: Found native module '%s' in package '%s'. " | |
"Native modules are not supported in App Engine; this " | |
"package may not work." % (fn, pkg)) | |
if self.ignore_re.search(fn): | |
logging.info(" Skipping file '%s'", fn) | |
else: | |
logging.info(" Copying file '%s'", fn) | |
writer.writeFile(reader, fn) | |
def package(self, sources, dest): | |
if self.options.nozip: | |
writer = DirPackageWriter(dest) | |
else: | |
if self.options.zipname: | |
writer = ZipPackageWriter(dest, self.options.zipname) | |
else: | |
writer = ZipPackageWriter(dest, '%s.zip' % sources[0]) | |
for source in sources: | |
isdir, basename, reader = findPackage(source) | |
if reader: | |
logging.info("Found package '%s' at '%s'", source, reader.path) | |
else: | |
logging.error("Could not find package '%s'." % source) | |
return 1 | |
if not isdir: | |
logging.info("Writing single-file package '%s'" % (basename,)) | |
writer.writeFile(reader, basename) | |
return 0 | |
else: | |
logging.info("Copying package '%s'" % (source,)) | |
self.copyPackage(reader, writer, source) | |
writer.close() | |
return 0 | |
def run(self, args): | |
self.parser = self.getOptionParser() | |
self.options, self.args = self.parser.parse_args(args[1:]) | |
if len(self.args) < 2: | |
self.parser.print_help() | |
return 1 | |
if self.options.nozip and self.options.zipname: | |
self.parser.error("Cannot specify both --nozip and --zipname") | |
logging.basicConfig(format="%(message)s", level=self.options.verbosity) | |
sources = self.args[:-1] | |
dest = self.args[-1] | |
return self.package(sources, dest) | |
def main(args): | |
PackagerApp().run(args) | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment