Created
May 11, 2012 06:59
-
-
Save gccyugi/2658028 to your computer and use it in GitHub Desktop.
upyun-python-api
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# :author yugi | |
# :email [email protected] | |
# :update 2012/5/18 15:06 | |
import md5 as mmd5 | |
DEBUG = False | |
api_upyun = 'api.upyun.com' | |
ENDPOINT_V0 = 'v0.' + api_upyun | |
ENDPOINT_V1 = 'v1.' + api_upyun | |
ENDPOINT_V2 = 'v2.' + api_upyun | |
ENDPOINT_V3 = 'v3.' + api_upyun | |
default_endpoint = ENDPOINT_V0 | |
def md5(e): | |
m = mmd5.new() | |
m.update(e) | |
return m.hexdigest() | |
def md5_fd(fobj): | |
m = mmd5.new() | |
while True: | |
data = fobj.read(8096) | |
if not data: | |
break | |
m.update(data) | |
fobj.seek(0) | |
return m.hexdigest() | |
def set_endpoint(e): | |
global default_endpoint | |
default_endpoint = e | |
def get_endpoint(): | |
return default_endpoint | |
class UpyunCore(object): | |
def __init__(self, auth_type='upyun'): | |
self._auth_type = auth_type | |
def _basic_auth(self, headers, username, password): | |
import base64 | |
data = base64.encodestring(username + ':' + password).strip() | |
headers['Authorization'] = 'Basic ' + data | |
def _upyun_auth(self, headers, method, path, username, password): | |
import time | |
headers['Date'] = time.strftime('%a, %d %b %Y %X GMT', time.gmtime()) | |
content_length = headers.get('Content-Length') or 0 | |
signature = '&'.join([method, path, headers['Date'], str(content_length), | |
md5(password)]) | |
headers['Authorization'] = 'UpYun %s:%s' % (username, md5(signature)) | |
def request(self, host, method, path, data, username, password, options): | |
import httplib | |
headers = options.copy() | |
if self._auth_type == 'basic': | |
self._basic_auth(headers, username, password) | |
else: | |
self._upyun_auth(headers, method, path, username, password) | |
conn = httplib.HTTPConnection(host) | |
conn.request(method, path, data, headers) | |
res = conn.getresponse() | |
if DEBUG: | |
print '%s %s => %s %s' % (method, host + path, res.status, res.reason) | |
return res | |
class UpyunEntry(object): | |
def __init__(self, name, type, size, date): | |
self.name = name | |
self.type = 'folder' if type == 'F' else 'file' | |
self.size = size | |
self.date = date | |
def __str__(self): | |
return '<%s: %s>' % (self.type, self.name) | |
class Upyun(object): | |
def __init__(self, bucket, username, password, transport=UpyunCore()): | |
self._bucket = bucket | |
self._username = username | |
self._password = password | |
self._core = transport | |
def __str__(self): | |
return '#<bucket: %s, username: %s>' % (self._bucket, self._username) | |
def touch(self, path, data, md5_checksum=None, auto_mkdir=False): | |
options = {} | |
if md5_checksum: | |
options['Content-MD5'] = md5_checksum | |
if auto_mkdir: | |
options['mkdir'] = 'true' | |
if type(data) != file: | |
options['Content-Length'] = len(data) | |
res = self._request('PUT', self._generate_path(path), data, options) | |
return self._is_res_ok(res) | |
def touch_image(self, path, data, secret_code=None, | |
md5_checksum=None, auto_mkdir=False): | |
options = {} | |
if secret_code: | |
options['Content-Secret'] = secret_code | |
if md5_checksum: | |
options['Content-MD5'] = md5_checksum | |
if auto_mkdir: | |
options['mkdir'] = 'true' | |
if type(data) != file: | |
options['Content-Length'] = len(data) | |
res = self._request('PUT', self._generate_path(path), data, options) | |
if self._is_res_ok(res): | |
info = {} | |
info['width'] = int(res.getheader('x-upyun-width')) | |
info['height'] = int(res.getheader('x-upyun-height')) | |
info['frames'] = int(res.getheader('x-upyun-frames')) | |
info['is_folder'] = res.getheader('x-upyun-file-type') != 'file' | |
return True, info | |
return False, self._get_res_errmsg(res) | |
def cat(self, path): | |
res = self._request('GET', self._generate_path(path)) | |
if self._is_res_ok(res): | |
return True, res.read() | |
return False, self._get_res_errmsg(res) | |
def stat(self, path): | |
res = self._request('HEAD', self._generate_path(path)) | |
if self._is_res_ok(res): | |
info = {} | |
info['is_folder'] = res.getheader('x-upyun-file-type') != 'file' | |
info['size'] = int(res.getheader('x-upyun-file-size')) | |
info['timestamp'] = int(res.getheader('x-upyun-file-date')) | |
return True, info | |
return False, self._get_res_errmsg(res) | |
def usage(self, path='/'): | |
res = self._request('GET', self._generate_path(path) + '?usage') | |
if self._is_res_ok(res): | |
return True, int(res.read()) | |
return False, self._get_res_errmsg(res) | |
def ls(self, path='/'): | |
res = self._request('GET', self._generate_path(path)) | |
if self._is_res_ok(res): | |
data = res.read() | |
for e in data.split('\n'): | |
info_lst = e.split('\t') | |
if len(info_lst) >= 4: | |
yield UpyunEntry(*info_lst) | |
def mkdir(self, path, auto_mkdir=False): | |
options = {} | |
options['folder'] = 'create' | |
if auto_mkdir: | |
options['mkdir'] = 'true' | |
res = self._request('POST', self._generate_path(path), None, options) | |
return self._is_res_ok(res) | |
def rmdir(self, path): | |
return self.rm(path) | |
def rm(self, path): | |
res = self._request('DELETE', self._generate_path(path)) | |
return self._is_res_ok(res) | |
def _is_res_ok(self, res): | |
return res.status == 200 | |
def _get_res_errmsg(self, res): | |
return '%s %s: %s' % (res.status, res.reason, res.read()) | |
def _request(self, method, path, data=None, options={}): | |
return self._core.request(get_endpoint(), method, path, data, | |
self._username, self._password, | |
options) | |
def _generate_path(self, filename=None): | |
path = '/' + self._bucket | |
return path + filename if filename else path | |
if __name__ == '__main__': | |
s = Upyun('<bucket-name>', '<operator-name>', '<operator-pwd>') | |
# endpoint | |
print 'endpoint: %s' % get_endpoint() | |
set_endpoint(ENDPOINT_V1) | |
print 'endpoint: %s' % get_endpoint() | |
# usage | |
rc, usage = s.usage() | |
if rc: | |
print 'usage: %d' % usage | |
# mkdir | |
s.mkdir('/tmp') | |
s.mkdir('/tmp/1/2', True) | |
# rmdir | |
s.rmdir('/tmp/1/2') | |
s.rmdir('/tmp/1') | |
s.rmdir('/tmp') | |
# touch | |
s.touch('/a.txt', 'hijcak') | |
# cat | |
rc, data = s.cat('/a.txt') | |
if rc: | |
print 'data of a.txt is: ' + data | |
# stat | |
rc, info = s.stat('/a.txt') | |
if rc: | |
print info | |
# rm | |
s.rm('/a.txt') | |
# ls | |
for e in s.ls(): | |
print e |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment