Last active
December 27, 2015 08:39
-
-
Save ikbear/7297694 to your computer and use it in GitHub Desktop.
上传测试 upload token
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import StringIO | |
sys.path.append("/Users/ikbear/Code/Qiniu/SDK/python-sdk/") | |
# @gist import_io | |
import qiniu.io | |
# @endgist | |
import qiniu.conf | |
# @gist import_rs | |
import qiniu.rs | |
# @endgist | |
# @gist import_fop | |
import qiniu.fop | |
# @endgist | |
# @gist import_resumable_io | |
import qiniu.resumable_io as rio | |
# @endgist | |
# @gist import_rsf | |
import qiniu.rsf | |
# @endgist | |
bucket_name = None | |
uptoken = None | |
key = None | |
key2 = None | |
key3 = None | |
domain = None | |
pic_key = None | |
# ---------------------------------------------------------- | |
def setup(access_key, secret_key, bucketname, bucket_domain, pickey): | |
global bucket_name, uptoken, key, key2, domain, key3, pic_key | |
qiniu.conf.ACCESS_KEY = access_key | |
qiniu.conf.SECRET_KEY = secret_key | |
bucket_name = bucketname | |
domain = bucket_domain | |
pic_key = pickey | |
# @gist uptoken | |
policy = qiniu.rs.PutPolicy(bucket_name) | |
policy.returnBody = u'{"mimeType": $(mimeType), "fsize": $(fsize), "hash": $(hash), "key": $(key)}' | |
uptoken = policy.token() | |
# @endgist | |
key = "python-demo-put-file" | |
key2 = "python-demo-put-file-2" | |
key3 = "python-demo-put-file-3" | |
def _setup(): | |
''' 根据环境变量配置信息 ''' | |
access_key = getenv("QINIU_ACCESS_KEY") | |
if access_key is None: | |
exit("请配置环境变量 QINIU_ACCESS_KEY") | |
secret_key = getenv("QINIU_SECRET_KEY") | |
bucket_name = getenv("QINIU_TEST_BUCKET") | |
domain = getenv("QINIU_TEST_DOMAIN") | |
pickey = 'QINIU_UNIT_TEST_PIC' | |
setup(access_key, secret_key, bucket_name, domain, pickey) | |
def getenv(name): | |
env = os.getenv(name) | |
if env is None: | |
sys.stderr.write("请配置环境变量 %s\n" % name) | |
exit(1) | |
return env | |
def put_file(): | |
''' 演示上传文件的过程 ''' | |
# 尝试删除 | |
qiniu.rs.Client().delete(bucket_name, key) | |
# @gist put_file | |
localfile = "%s" % __file__ | |
ret, err = qiniu.io.put_file(uptoken, key, localfile) | |
print ret | |
if err is not None: | |
sys.stderr.write('error: %s ' % err) | |
return | |
# @endgist | |
def resumable_put(): | |
''' 断点续上传 ''' | |
# 尝试删除 | |
qiniu.rs.Client().delete(bucket_name, key) | |
# @gist resumable_put | |
a = "resumable upload string" | |
extra = rio.PutExtra(bucket_name) | |
extra.mime_type = "text/plain" | |
ret, err = rio.put(uptoken, key, StringIO.StringIO(a), len(a), extra) | |
if err is not None: | |
sys.stderr.write('error: %s ' % err) | |
return | |
print ret, | |
# @endgist | |
if __name__ == "__main__": | |
_setup() | |
# put_file() | |
resumable_put() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment