Created
March 4, 2013 08:21
-
-
Save Mic92/5080814 to your computer and use it in GitHub Desktop.
dropbox upload for pyload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
import sys | |
import inspect | |
import os.path | |
import subprocess | |
import logging | |
from dropbox import client, rest, session | |
# XXX Fill in your consumer key and secret below | |
# You can find these at http://www.dropbox.com/developers/apps | |
APP_KEY = '<YOUR_APP_KEY>' | |
APP_SECRET = '<YOUR_APP_SECRET>' | |
ACCESS_TYPE = 'dropbox' # should be 'dropbox' or 'app_folder' as configured for your app | |
DEST_DIRECTORY = 'Server' # the path to upload to | |
SCRIPT_PATH = os.path.dirname(inspect.getfile(inspect.currentframe())) | |
# Log to both file and console | |
LOG_FORMAT = "%(asctime)s %(levelname)-10s DropboxUpload: %(message)s" | |
logging.basicConfig( | |
filename=os.path.join(SCRIPT_PATH, "upload-dropbox.log"), | |
level = logging.INFO, | |
format = LOG_FORMAT) | |
console = logging.StreamHandler() | |
console.setLevel(logging.INFO) | |
formatter = logging.Formatter(LOG_FORMAT) | |
console.setFormatter(formatter) | |
logging.getLogger('').addHandler(console) | |
info = logging.info | |
error = logging.error | |
class DropboxUploader(): | |
def __init__(self, app_key, app_secret): | |
self.sess = StoredSession(app_key, app_secret, ACCESS_TYPE) | |
self.api_client = client.DropboxClient(self.sess) | |
self.sess.load_creds() | |
def upload(self, from_path, to_path): | |
"""Copy local file to Dropbox""" | |
if not self.sess.is_linked(): | |
try: | |
self.sess.link() | |
except rest.ErrorResponse, e: | |
self.stdout.write('Login Error: %s\n' % str(e)) | |
return | |
from_file = open(os.path.expanduser(from_path), "rb") | |
length = os.path.getsize(from_path) | |
#self.api_client.put_file(to_path, from_file) | |
uploader = self.api_client.ChunkedUploader(self.api_client, from_file, length) | |
for i in range(10): | |
while True: | |
try: | |
uploader.upload_chunked() | |
except Exception as e: | |
error("error while uploading: %s", e) | |
continue | |
break | |
uploader.finish(to_path) | |
class StoredSession(session.DropboxSession): | |
"""a wrapper around DropboxSession that stores a token to a file on disk""" | |
TOKEN_FILE = os.path.join(SCRIPT_PATH, "dropbox_token") | |
def load_creds(self): | |
try: | |
stored_creds = open(self.TOKEN_FILE).read() | |
self.set_token(*stored_creds.split('|')) | |
info("loaded access token") | |
except IOError: | |
pass # don't worry if it's not there | |
def write_creds(self, token): | |
f = open(self.TOKEN_FILE, 'w') | |
f.write("|".join([token.key, token.secret])) | |
f.close() | |
def delete_creds(self): | |
os.unlink(self.TOKEN_FILE) | |
def link(self): | |
request_token = self.obtain_request_token() | |
url = self.build_authorize_url(request_token) | |
print "url:", url | |
print "Please authorize in the browser. After you're done, press enter." | |
raw_input() | |
self.obtain_access_token(request_token) | |
self.write_creds(self.token) | |
def unlink(self): | |
self.delete_creds() | |
session.DropboxSession.unlink(self) | |
def remove(value, deletechars): | |
for c in deletechars: | |
value = value.replace(c,'') | |
return value; | |
def main(): | |
if APP_KEY == '' or APP_SECRET == '': | |
error("You need to set your APP_KEY and APP_SECRET!") | |
sys.exit(1) | |
info("Called with the following arguments {0}".format(sys.argv)) | |
if len(sys.argv) != 6: | |
error("Wrong number of arguments.") | |
error("usage: {0} pluginname url filename file_location id".format(sys.argv[0])) | |
sys.exit(1) | |
_, plugin, url, filename, location, _ = sys.argv | |
# strip '/' from filename | |
#location = remove(location, '/') | |
uploader = DropboxUploader(APP_KEY, APP_SECRET) | |
dirname = os.path.basename(os.path.dirname(location)) | |
_, extension = os.path.splitext(filename) | |
is_archive = extension in ['rar', 'zip'] | |
to_location = DEST_DIRECTORY + '/' + dirname + '/' + filename | |
uploader.upload(location, to_location) | |
if is_archive: | |
info("Test Archive: {0}".format(filename)) | |
res = subprocess.call("7z", "t", location) | |
if res != 0: | |
info("CRC sum failed") | |
return | |
info("Remove File: '{0}'".format(filename)) | |
os.remove(location) | |
info("Finished") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment