Last active
October 15, 2018 12:11
-
-
Save iamahuman/360885452838b5866e23c91eb9e457b1 to your computer and use it in GitHub Desktop.
Create and extract Zip archives with KS X 1001(euc-kr / MS949) encoded filenames
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import zipfile | |
import os | |
import stat | |
import sys | |
import codecs | |
import time | |
import datetime | |
import unicodedata | |
import sys | |
import pytz | |
import shutil | |
# Python 2 / Python 3 compatiblity | |
if sys.version_info < (3,): | |
text_type = unicode | |
binary_type = str | |
sep_u = os.sep.decode() | |
slash_u = '/'.decode() | |
slash_b, dot_b = '/', '.' | |
to_str = lambda x: x | |
else: | |
text_type = str | |
binary_type = bytes | |
sep_u = os.sep | |
slash_u = '/' | |
slash_b, dot_b = bytes((0x2f,)), bytes((0x2e,)) | |
to_str = lambda x: x.decode() if isinstance(x, bytes) else x | |
# ZIP timestamp utility functions | |
def timestamp_to_zip(tsval, tz=pytz.UTC): | |
dt = datetime.datetime.fromtimestamp(tsval, tz=pytz.UTC).astimezone(tz) | |
return (dt.year, dt.month, dt.day + 1, dt.hour, dt.minute, dt.second) | |
def zip_to_timestamp(tup, tz=pytz.UTC): | |
year, month, dayp1, hour, minute, second = tup | |
dt = datetime.datetime(year, month, dayp1 - 1, hour, minute, second, tzinfo=tz) | |
return (dt - datetime.datetime(1970, 1, 1, tzinfo=pytz.UTC)).total_seconds() | |
assert zip_to_timestamp(timestamp_to_zip(112462320, pytz.UTC), pytz.UTC) == 112462320 | |
class Kalzip(object): | |
TEXT_EXTS = frozenset(('.txt', '.log', '.ini', '.c', '.h', | |
'.cpp', '.hpp', '.cc', '.hh', '.py', '.java', '.htm', '.html', | |
'.css', '.js', '.vbs', '.vb')) | |
def __init__(self): | |
object.__init__(self) | |
self.encnam_native = sys.getfilesystemencoding() | |
self.encnam_zip = 'cp949' | |
self.name_use_nfd = sys.platform == 'darwin' | |
self.text_exts = self.TEXT_EXTS | |
self.text_max_filesize = 131072 | |
self.enctxt_native = 'utf-8' | |
self.enctxt_zip = 'cp949' | |
self.timezone = pytz.timezone('Asia/Seoul') | |
def timestamp_to_zip(self, tsval): | |
return timestamp_to_zip(tsval, self.timezone) | |
def zip_to_timestamp(self, tup): | |
return zip_to_timestamp(tup, self.timezone) | |
def native_path_to_zip(self, path, append_slash=False): | |
""" | |
Converts native filesystem path (text or bytes) to ZIP filename. | |
""" | |
if isinstance(path, text_type): | |
path_u = path | |
elif isinstance(path, binary_type): | |
path_u = codecs.decode(path, self.encnam_native, 'replace') | |
else: | |
raise TypeError("path must be %s or %s" % (text_type.__name__, binary_type.__name__)) | |
path_n = os.path.normpath(path_u).replace(sep_u, slash_u) | |
path_n = unicodedata.normalize('NFC', path_n) | |
if append_slash: | |
path_n += slash_u | |
return codecs.encode(path_n, self.encnam_zip, 'replace') | |
def zip_path_to_native(self, path, do_encode=False): | |
""" | |
Converts ZIP filename (bytes) to native filesystem path. | |
""" | |
if not isinstance(path, binary_type): | |
raise TypeError("path must be %s" % (binary_type.__name__,)) | |
path_n = codecs.decode(path, self.encnam_zip, 'replace') | |
if self.name_use_nfd: | |
path_n = unicodedata.normalize('NFD', path_n) | |
path_n = path_n.replace(slash_u, sep_u) | |
return (codecs.encode(path_n, self.encnam_native) | |
if do_encode else path_n) | |
def add_entry(self, zf, indir, fname): | |
src_name = os.path.join(indir, fname) if indir is not None else fname | |
stat_obj = os.stat(src_name) | |
_, ext = os.path.splitext(fname) | |
is_text = (to_str(ext) in self.text_exts and | |
stat.S_ISREG(stat_obj.st_mode) and | |
stat_obj.st_size <= self.text_max_filesize) | |
is_dir = stat.S_ISDIR(stat_obj.st_mode) | |
zpath = self.native_path_to_zip(fname, append_slash=is_dir) | |
if is_text: | |
try: | |
encbuf = bytearray() | |
encoder = codecs.getincrementalencoder(self.enctxt_zip)('strict') | |
with codecs.open(src_name, "r", encoding=self.enctxt_native) as cf: | |
while True: | |
buf = cf.read(1024) | |
is_eof = len(buf) == 0 | |
encbuf.extend(encoder.encode(buf, is_eof)) | |
if len(encbuf) > self.text_max_filesize: | |
raise RuntimeError("text file too large") | |
if is_eof: | |
break | |
except Exception: | |
# TODO report errors | |
is_text = False | |
else: | |
zi = zipfile.ZipInfo(zpath, date_time=self.timestamp_to_zip(stat_obj.st_mtime)) | |
zi.external_attr = (6 << 22) | (4 << 19) | (4 << 16) # -rwxr-xr-x(Unix) | |
zi.compress_type = zipfile.ZIP_DEFLATED | |
zf.writestr(zi, bytes(encbuf)) | |
if not is_text: | |
if is_dir: | |
zi = zipfile.ZipInfo(zpath, date_time=self.timestamp_to_zip(stat_obj.st_mtime)) | |
zi.external_attr = (4 << 28) | (7 << 22) | (5 << 19) | (5 << 16) | 0x10 # drwxr-xr-x(Unix) DIR(MSDOS) | |
zi.compress_type = zipfile.ZIP_STORED | |
zf.writestr(zi, b'') | |
else: | |
zf.write(src_name, arcname=zpath, compress_type=zipfile.ZIP_DEFLATED) | |
def extract_entry(self, zf, outdir, zinfo): | |
comp = zinfo.filename.rsplit(dot_b, 1) | |
is_dir = zinfo.filename.endswith(slash_b) | |
is_text = (not is_dir and len(comp) > 1 and ('.' + to_str(comp[-1])) in self.text_exts) | |
dst_name = os.path.join(outdir, self.zip_path_to_native(zinfo.filename)) | |
try: | |
os.makedirs(os.path.dirname(dst_name)) | |
except OSError: | |
pass | |
#if os.path.exists(dst_name) and not is_dir: | |
# raise IOError("file exists: " + repr(dst_name)) | |
if is_text: | |
try: | |
txtdata = codecs.encode( | |
codecs.decode(zf.read(zinfo), | |
self.enctxt_zip, 'strict'), | |
self.enctxt_native, 'strict') | |
with open(dst_name, "wb") as bf: | |
bf.write(txtdata) | |
mtime = self.zip_to_timestamp(zinfo.date_time) | |
os.utime(dst_name, (time.time(), mtime)) | |
except Exception: | |
is_text = False | |
if not is_text: | |
if is_dir: | |
try: | |
os.makedirs(dst_name) | |
except OSError: | |
pass | |
else: | |
mtime = self.zip_to_timestamp(zinfo.date_time) | |
os.utime(dst_name, (time.time(), mtime)) | |
else: | |
with zf.open(zinfo, 'r') as af, open(dst_name, "wb") as bf: | |
shutil.copyfileobj(af, bf) | |
mtime = self.zip_to_timestamp(zinfo.date_time) | |
os.utime(dst_name, (time.time(), mtime)) | |
def add_entry_rec(self, zf, indir, fname): | |
if os.path.islink(fname): | |
return | |
self.add_entry(zf, indir, fname) | |
if os.path.isdir(fname): | |
for child in os.listdir(fname): | |
self.add_entry_rec(zf, indir, fname + '/' + child) | |
def zip(self, outname, indir, files): | |
with zipfile.ZipFile(outname, 'w') as zf: | |
for request in files: | |
self.add_entry_rec(zf, indir, request) | |
def unzip(self, zipname, outdir): | |
with zipfile.ZipFile(zipname, 'r') as zf: | |
for zinfo in zf.infolist(): | |
self.extract_entry(zf, outdir, zinfo) | |
if __name__ == '__main__': | |
k = Kalzip() | |
if sys.argv[1] == 'a': | |
k.zip(sys.argv[2], '.', sys.argv[3:]) | |
elif sys.argv[1] == 'x': | |
k.unzip(sys.argv[2], '.') | |
else: | |
raise ValueError('cmdline') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment