Skip to content

Instantly share code, notes, and snippets.

@korenyushkin
Created May 13, 2011 19:19
Show Gist options
  • Save korenyushkin/971133 to your computer and use it in GitHub Desktop.
Save korenyushkin/971133 to your computer and use it in GitHub Desktop.
Akshell Tool
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import hashlib
import os
import sys
import imp
import distutils.dir_util
import threading
import optparse
import time
import cookielib
import urllib
import urllib2
import simplejson
import fnmatch
try:
import readline
except ImportError:
print "No readline support available"
CONFIG_FILE = ".akshellrc"
SERVER = "www.akshell.com"
DEFAULT_CONFIG_DIR = (os.path.join(os.environ['APPDATA'], 'Akshell')
if sys.platform == 'win32' else
os.path.join(os.path.expanduser('~'), '.akshell'))
DEFAULT_SCAN_INTERVAL = 1
def fntest(filename, patterns):
filename = os.path.basename(filename)
for pattern in patterns:
if fnmatch.fnmatch(filename, pattern):
return True
return False
def find_config(path):
"""
Find config file in any of upper folders
"""
if path in ('', '/'):
module = None
else:
try:
parent_dir = os.path.join(path, CONFIG_FILE)
# dirtiest trick ever:
module = imp.load_source('config', parent_dir)
except IOError:
module = find_config(os.path.split(path)[0])
return module
def load_config(filename):
try:
module = imp.load_source('config', filename)
except IOError:
module = None
return module
def yn_choice(message, default='y'):
choices = 'Y/n' if default.lower() in ('y', 'yes') else 'y/N'
choice = raw_input("%s (%s) " % (message, choices))
values = ('y', 'yes', '') if default == 'y' else ('y', 'yes')
return True if choice.strip().lower() in values else False
class Error(Exception):
pass
class RequestError(Error):
def __init__(self, message, code):
Error.__init__(self, message)
self.code = code
class Browser(object):
def __init__(self, cookie_file):
self._cookie_file = cookie_file
self.cookie = cookielib.MozillaCookieJar(self._cookie_file)
try:
self.cookie.load()
except IOError:
pass
self.opener = urllib2.OpenerDirector()
self.opener.add_handler(urllib2.HTTPHandler())
self.opener.add_handler(urllib2.HTTPCookieProcessor(self.cookie))
def get(self, url, headers=None):
headers = dict(headers) if headers else {}
headers.setdefault('X-Requested-With', 'XMLHttpRequest')
request = urllib2.Request(url, headers=headers)
response = self.opener.open(request)
if response.code != 200:
raise RequestError(response.read(), response.code)
self.cookie.save()
return response.read()
def get_json(self, url):
response = self.get(url, {'Accept': "application/json"})
return simplejson.loads(response)
def post_json(self, url, data):
headers = {
'X-Requested-With': 'XMLHttpRequest',
'Content-Type': 'application/json; charset=UTF-8',
'Accept': "application/json",
}
encoded_data = simplejson.dumps(data)
request = urllib2.Request(url, encoded_data, headers=headers)
response = self.opener.open(request)
if response.code != 200:
raise RequestError(response.read(), response.code)
self.cookie.save()
return response.read()
def put(self, url, data):
headers = {}
headers.setdefault('X-Requested-With', 'XMLHttpRequest')
request = urllib2.Request(url, data, headers=headers)
request.get_method = lambda: "PUT"
response = self.opener.open(request)
if response.code != 200:
raise RequestError(response.read(), response.code)
self.cookie.save()
return response.read()
class AkshellError(Error):
pass
class FileCache(object):
def __init__(self, config, username, application, application_dir):
self._config = config
self._login = username
self._app = application
self._app_dir = application_dir
self._config_dir = getattr(config, "CONFIG_DIR", DEFAULT_CONFIG_DIR)
self.cache = {}
@property
def cache_filename(self):
return os.path.join(self._config_dir, "%s-%s-cache.txt" % (self._login, self._app))
def load(self):
try:
f = open(self.cache_filename, "rb")
except IOError:
return
content = f.read()
f.close()
try:
cache = simplejson.loads(content)
except simplejson.decoder.JSONDecodeError:
return
self.cache = cache
def save(self):
f = open(self.cache_filename, "wb")
f.write(simplejson.dumps(self.cache))
f.close()
def update(self, filename):
abs_filename = os.path.join(self._app_dir, filename)
local_time = os.stat(abs_filename).st_mtime
self.cache[filename] = local_time
self.save()
def is_updated(self, filename):
"""
Если локального файла нету - False, качаем по любому, удаляем только с веб морды
Если файла нет в кеше (но он есть локально), то файл был создан, True
"""
abs_filename = os.path.join(self._app_dir, filename)
if not os.path.exists(abs_filename):
return False
cached_time = self.cache.get(filename, None)
if cached_time is None:
return True
local_time = os.stat(abs_filename).st_mtime
if local_time > cached_time:
# Обновлен локально
return True
# Перекачиваем
return False
def get_updated_files(self, filename):
raise NotImplementedError()
class AkshellClient(object):
"""
Akshell webinterface client
"""
def __init__(self, working_dir, config, error_handler, force=False):
self._login = config.AKSHELL_LOGIN
self._password = config.AKSHELL_PASSWD
self._application = config.AKSHELL_APP
self._config = config
self._config_dir = getattr(config, "CONFIG_DIR", DEFAULT_CONFIG_DIR)
self._ignores = getattr(config, "IGNORE", ["~*", ".*"])
self._force = force
self.application_dir = working_dir
self._error_handler = error_handler
# Force create config dir
try:
os.mkdir(self._config_dir)
except OSError:
pass
self.browser = Browser(self.cookie_file)
self.cache = FileCache(config, self._login, self._application, self.application_dir)
def _get_info(self):
basis_url = self.url("basis.js")
user_info = self.browser.get(basis_url, {'Accept': 'application/json'})
user_info = user_info[user_info.index("=") + 1:]
return simplejson.loads(user_info)
def log(self, message, *args):
print message % args
def login(self):
self.log("Logging in as %s", self._login)
info = self._get_info()
if info['username'] == self._login:
return
login_url = self.url("login")
self.browser.post_json(login_url, {'name': self._login, 'password': self._password})
info = self._get_info()
if info['username'] == self._login:
raise AkshellError("Unable to log in")
def url(self, relative_part):
relative_part = relative_part.lstrip("/")
return "http://%s/%s" % (SERVER, urllib.quote(relative_part))
def load(self, filename):
abs_filename = os.path.join(self.application_dir, filename)
remote_url = self.url("apps/%s/code/%s" % (self._application, filename.replace("\\", "/").lstrip("/")))
contents = self.browser.get(remote_url)
remote_md5 = hashlib.md5()
remote_md5.update(contents)
remote_md5 = remote_md5.hexdigest()
distutils.dir_util.create_tree(self.application_dir, [filename])
local_md5 = ""
if os.path.exists(abs_filename):
f = open(abs_filename, "rb")
local_md5 = hashlib.md5()
local_md5.update(f.read())
local_md5 = local_md5.hexdigest()
f.close()
if local_md5 == remote_md5:
self.log("Download %s [not changed]", remote_url)
return
self.log("Download %s", remote_url)
f = open(abs_filename, "wb")
f.write(contents)
f.close()
self.cache.update(filename)
def save(self, filename):
abs_filename = os.path.join(self.application_dir, filename)
remote_url = self.url("apps/%s/code/%s" % (self._application, filename.replace("\\", "/").lstrip("/")))
self.log("Upload %s", remote_url)
f = open(abs_filename, "rb")
contents = f.read()
f.close()
self.browser.put(remote_url, contents)
self.cache.update(filename)
def rescan(self):
filenames = []
for root, dirs, files in os.walk(self.application_dir):
rel_dir = os.path.relpath(root, self.application_dir)
for f in files:
filename = os.path.normpath(os.path.join(rel_dir, f))
if not fntest(filename, self._ignores):
filenames.append(filename)
for filename in filenames:
if self.cache.is_updated(filename):
self.save(filename)
def load_directory(self, base_directory, subdir=""):
for filename, data in base_directory.iteritems():
if data is None:
rel_filename = os.path.join(subdir, filename)
local_file_updated = self.cache.is_updated(rel_filename)
if local_file_updated:
if self._force:
self.save(rel_filename)
else:
print rel_filename
self._error_handler("Unsaved local files exist, please run with --force key to upload changes, or remove local files to update them from server")
else:
if not self._skip:
self.load(rel_filename)
else:
if subdir:
prefix = subdir + "/"
else:
prefix = ""
self.load_directory(data, prefix + filename)
def load_application(self, skip=False, force=False):
self.cache.load()
self.log("Loading application %s", self._application)
url = self.url("apps/%s/code/" % self._application)
data = self.browser.get_json(url)
self._force = force
self._skip = skip
self.load_directory(data)
def eval(self, string):
print 'Eval: "%s"...' % string
result = simplejson.loads(self.browser.post_json(self.url("apps/%s/envs/debug" % self._application), {'action': 'eval', 'expr': string}))
if not result['ok']:
print "Err: ",
print result['result']
@property
def cookie_file(self):
return os.path.join(self._config_dir, "%s-cookie.txt" % self._login)
class FileSystemWatcher(threading.Thread):
def __init__(self, client, timeout):
self.timeout = timeout
self.client = client
super(FileSystemWatcher, self).__init__()
def run(self):
while True:
time.sleep(self.timeout)
self.client.rescan()
def main(args):
parser = optparse.OptionParser()
parser.add_option("-f", "--force",
dest="force",
action="store_true",
help="Save modified local files on startup")
parser.add_option("-s", "--skip",
dest="skip",
action="store_true",
help="Do not download remote files on startup. Use with care")
parser.add_option("-c", "--config",
dest="config",
action="store",
help="config file")
opts, cmd_args = parser.parse_args(args)
start_dir = os.path.realpath(os.curdir)
if opts.config is None:
CONFIG = find_config(start_dir)
else:
CONFIG = load_config(opts.config)
if CONFIG is None:
parser.error("No .akshellrc in this or parent directory and config file not specified in command line")
# clean run
working_dir = getattr(CONFIG, "WORK_DIR", os.path.join(start_dir, CONFIG.AKSHELL_APP))
if os.path.exists(working_dir):
try:
os.rmdir(working_dir)
os.mkdir(working_dir)
except OSError, e:
pass
client = AkshellClient(working_dir, CONFIG, parser.error)
client.login()
# todo
client.load_application(opts.skip, opts.force)
processor = FileSystemWatcher(client, getattr(CONFIG, "INTERVAL", DEFAULT_SCAN_INTERVAL))
processor.daemon = True
processor.start()
while 1:
s = raw_input(">>")
s = s.strip()
if s:
client.eval(s)
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment