Created
January 16, 2010 17:47
-
-
Save dbr/278916 to your computer and use it in GitHub Desktop.
fxphd.com class download processor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.6 | |
"""Processes fxphd.com class ZIP files, by copying the archive to a specified | |
location, then extracting this into a specified path. | |
Directory structure is: | |
/example/fxphd/10jan_term/ | |
archives/ | |
preview_classes/ | |
my_classes/ | |
Courses listed (by the "abc123" format course name) are put in my_classes, | |
others are put in "preview_classes" | |
New classes are labelled green using Finder's labels, to mark then as | |
unwatched. | |
The script was written on OS X. It is by no means perfect, and you should | |
verify files are copied/extracted correctly. "Works on my machine". If it | |
works on yours, great. Completely unsupported script. Consider it public domain | |
""" | |
import re | |
import os | |
import sys | |
import shutil | |
import subprocess | |
from zipfile import ZipFile | |
from AppKit import NSURL | |
from ScriptingBridge import SBApplication | |
def config(): | |
conf = {} | |
conf['base_dir'] = '/Volumes/eggDrive/downloads/fxphd/{term_name}' | |
conf['my_classes_dir'] = conf['base_dir'] + '/my_classes' | |
conf['preview_classes_dir'] = conf['base_dir'] + '/preview_classes' | |
conf['archives_dir'] = conf['base_dir'] + '/archives' | |
conf['terms'] = { | |
'11_07_july_term' : ['bkd221', 'mya214', 'nuk305', 'hou203'], | |
'11_10_oct_term': ['bkd222', 'pyt201', 'rms101', 'mar301'], | |
'12_01_jan_term': ['bkd223', 'dct301', 'hou204', 'pft303'], | |
} | |
conf['current'] = '12_01_jan_term' | |
return conf | |
def getInfoFromZipName(fname): | |
i = re.match("(?P<course>\w{3,4}\d{3})-class(?P<classno>\d\d)(?:-updated|r)?.zip", fname) | |
if i: | |
info = i.groupdict() | |
info['type'] = 'regularclass' | |
return info | |
j = re.match("(?P<course>\w{3}\d{3})-files((?P<classno>\d\d))r?.zip", fname) | |
if j: | |
info = j.groupdict() | |
info['type'] = 'files' | |
return info | |
k = re.match("(?P<course>\w{3}\d{3})-class(?P<classno>\d\d)r?.(mov|mp4)", fname) | |
if k: | |
info = k.groupdict() | |
info['type'] = 'uncompressedqt' | |
return info | |
raise NotImplementedError(fname) | |
def pathFromInfo(info): | |
conf = config() | |
# Check if course is being taken | |
for term_name, courses in conf['terms'].items(): | |
if info['course'] in courses: | |
if info['type'] in ['regularclass']: | |
fname = "{0[course]}/{0[course]}-class{0[classno]}".format(info) | |
fpath = conf['my_classes_dir'].format(term_name = term_name) | |
return os.path.join(fpath, fname) | |
elif info['type'] == 'uncompressedqt': | |
# QT's go into root directory | |
fname = "{0[course]}/".format(info) | |
fpath = conf['my_classes_dir'].format(term_name = term_name) | |
return os.path.join(fpath, fname) | |
elif info['type'] == 'files': | |
fname = "{0[course]}/{0[course]}-files{0[classno]}".format(info) | |
fpath = conf['my_classes_dir'].format(term_name = term_name) | |
return os.path.join(fpath, fname) | |
else: | |
raise NotImplementedError(info['type']) | |
# Preview classes | |
term_name = conf['current'] | |
print "Preview class for {0}".format(term_name) | |
if info['type'] in ['regularclass']: | |
fname = "{0[course]}/{0[course]}-class{0[classno]}".format(info) | |
fpath = conf['preview_classes_dir'].format(term_name = term_name) | |
return os.path.join(fpath, fname) | |
elif info['type'] == 'uncompressedqt': | |
# QT's go into root directory | |
fname = "{0[course]}/".format(info) | |
fpath = conf['preview_classes_dir'].format(term_name = term_name) | |
return os.path.join(fpath, fname) | |
elif info['type'] == 'files': | |
fname = "{0[course]}/{0[course]}-files{0[classno]}".format(info) | |
fpath = conf['preview_classes_dir'].format(term_name = term_name) | |
return os.path.join(fpath, fname) | |
else: | |
raise NotImplementedError(info['type']) | |
def copy_file(source, destdir): | |
print "Copy file {0} to folder {1}".format(source, destdir) | |
path, sourcefname = os.path.split(source) | |
try: | |
os.makedirs(destdir) | |
except OSError: | |
pass | |
targetfilething = os.path.join(destdir, sourcefname) | |
if os.path.isfile(targetfilething): | |
print "Archive exists at {0}, not copying".format(targetfilething) | |
return | |
shutil.copyfile(source, targetfilething) | |
return targetfilething | |
def make_dir_green(path): | |
applescript = """ | |
tell application "Finder" | |
set x to POSIX file "{0}" as alias | |
set the label index of x to 6 -- 6 = green | |
end tell | |
""".format(path) | |
cmd = subprocess.Popen(["osascript"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) | |
cmd.stdin.write(applescript) | |
def zip_has_single_subdir(sourcezipfile): | |
zf = ZipFile(sourcezipfile) | |
allfilenames = [cur.filename for cur in zf.filelist] | |
zf.close() | |
if any(x.find("../") > -1 for x in allfilenames): | |
raise ValueError("ZIP file contains relative .. dirs in filename") | |
if any(x.startswith("/") for x in allfilenames): | |
raise ValueError("Files contain absolute dirs") | |
firstdirs = [x.split("/")[0] for x in allfilenames] | |
if (len(set(firstdirs)) == 1): | |
return (True, firstdirs[0]) | |
else: | |
return (False, '') | |
def extract_zip_to_dir(sourcezipfile, destdir): | |
hasonesubdir, subdirname = zip_has_single_subdir(sourcezipfile) | |
if hasonesubdir: | |
wheretosir, _ = os.path.split(destdir) | |
print "Extract to one level up from {0}".format(wheretosir) | |
else: | |
wheretosir = destdir | |
print "Extract to {0}".format(wheretosir) | |
try: | |
os.makedirs(wheretosir) | |
except OSError, e: | |
if e.errno == 17: | |
print "Directory already exists" | |
else: | |
raise | |
else: | |
print "Making full path to {0}".format(wheretosir) | |
cmd = ["unzip", "-d", wheretosir, sourcezipfile] | |
print "Running cmd:" | |
print " ".join(cmd) | |
proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, stdin = subprocess.PIPE) | |
proc.stdin.write("None") | |
so, se = proc.communicate() | |
make_dir_green(os.path.join(os.path.split(destdir)[0], subdirname)) | |
make_dir_green(os.path.split(destdir)[0]) | |
def trashPath(path): | |
"""Trashes a path using the Finder, via OS X's Scripting Bridge. | |
""" | |
targetfile = NSURL.fileURLWithPath_(path) | |
finder = SBApplication.applicationWithBundleIdentifier_("com.apple.Finder") | |
items = finder.items().objectAtLocation_(targetfile) | |
items.delete() | |
def main(): | |
for srcpath in sys.argv[1:]: | |
srcpath = os.path.abspath(srcpath) | |
fpath, fname = os.path.split(srcpath) | |
try: | |
curinfo = getInfoFromZipName(fname) | |
except NotImplementedError, e: | |
print "Unable to parse %s (Why: %s)" % (fname, e) | |
else: | |
newpath = pathFromInfo(curinfo) | |
if curinfo['type'] == 'uncompressedqt': | |
print "#" * 20 | |
print "Creating directory..." | |
try: | |
os.makedirs(newpath) | |
except OSError, e: | |
if e.errno != 17: # Err 17 is "File exists" | |
raise | |
print "...directory already exists" | |
else: | |
print "Copying file: %s to %s" % (fname, newpath) | |
if os.path.isfile(os.path.join(newpath, fname)): | |
print "File already exists, not copying" | |
else: | |
print copy_file(srcpath, newpath) | |
print "...copied" | |
print "Colouring files green", newpath | |
make_dir_green(newpath) | |
make_dir_green(os.path.join(newpath, fname)) | |
print "Trashing source file %s" % srcpath | |
trashPath(srcpath) | |
else: | |
print "#" * 20 | |
print "Copying archive" | |
new_archive = copy_file(srcpath, os.path.join(config()['archives_dir'].format(term_name = config()['current']), curinfo['course'])) | |
print "#" * 20 | |
print "Extracting archive" | |
extract_zip_to_dir(new_archive, newpath) | |
print "#" * 20 | |
print "Trashing source archive" | |
trashPath(srcpath) | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment