Skip to content

Instantly share code, notes, and snippets.

@ikbear
Last active December 27, 2015 08:39
Show Gist options
  • Save ikbear/7297694 to your computer and use it in GitHub Desktop.
Save ikbear/7297694 to your computer and use it in GitHub Desktop.
上传测试 upload token
# -*- coding: utf-8 -*-
import os
import sys
import StringIO
sys.path.append("/Users/ikbear/Code/Qiniu/SDK/python-sdk/")
# @gist import_io
import qiniu.io
# @endgist
import qiniu.conf
# @gist import_rs
import qiniu.rs
# @endgist
# @gist import_fop
import qiniu.fop
# @endgist
# @gist import_resumable_io
import qiniu.resumable_io as rio
# @endgist
# @gist import_rsf
import qiniu.rsf
# @endgist
bucket_name = None
uptoken = None
key = None
key2 = None
key3 = None
domain = None
pic_key = None
# ----------------------------------------------------------
def setup(access_key, secret_key, bucketname, bucket_domain, pickey):
global bucket_name, uptoken, key, key2, domain, key3, pic_key
qiniu.conf.ACCESS_KEY = access_key
qiniu.conf.SECRET_KEY = secret_key
bucket_name = bucketname
domain = bucket_domain
pic_key = pickey
# @gist uptoken
policy = qiniu.rs.PutPolicy(bucket_name)
policy.returnBody = u'{"mimeType": $(mimeType), "fsize": $(fsize), "hash": $(hash), "key": $(key)}'
uptoken = policy.token()
# @endgist
key = "python-demo-put-file"
key2 = "python-demo-put-file-2"
key3 = "python-demo-put-file-3"
def _setup():
''' 根据环境变量配置信息 '''
access_key = getenv("QINIU_ACCESS_KEY")
if access_key is None:
exit("请配置环境变量 QINIU_ACCESS_KEY")
secret_key = getenv("QINIU_SECRET_KEY")
bucket_name = getenv("QINIU_TEST_BUCKET")
domain = getenv("QINIU_TEST_DOMAIN")
pickey = 'QINIU_UNIT_TEST_PIC'
setup(access_key, secret_key, bucket_name, domain, pickey)
def getenv(name):
env = os.getenv(name)
if env is None:
sys.stderr.write("请配置环境变量 %s\n" % name)
exit(1)
return env
def put_file():
''' 演示上传文件的过程 '''
# 尝试删除
qiniu.rs.Client().delete(bucket_name, key)
# @gist put_file
localfile = "%s" % __file__
ret, err = qiniu.io.put_file(uptoken, key, localfile)
print ret
if err is not None:
sys.stderr.write('error: %s ' % err)
return
# @endgist
def resumable_put():
''' 断点续上传 '''
# 尝试删除
qiniu.rs.Client().delete(bucket_name, key)
# @gist resumable_put
a = "resumable upload string"
extra = rio.PutExtra(bucket_name)
extra.mime_type = "text/plain"
ret, err = rio.put(uptoken, key, StringIO.StringIO(a), len(a), extra)
if err is not None:
sys.stderr.write('error: %s ' % err)
return
print ret,
# @endgist
if __name__ == "__main__":
_setup()
# put_file()
resumable_put()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment