Last active
October 21, 2017 07:25
-
-
Save yxlao/17a781fa6aa1a5c410c88b1cd4f3ddb2 to your computer and use it in GitHub Desktop.
Dropbox Automatic Sync
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import socket | |
import time | |
import argparse | |
import updown | |
import logging | |
# Usage: | |
# 1. | |
# pip install dropbox | |
# 2. | |
# Put token in 'token.cfg' | |
# 3. | |
# Change directory setting in __main__ | |
# 4. | |
# python start_daemon.py | |
class DropboxUploadDaemon(object): | |
def __init__(self, | |
monitored_root_dir, | |
monitored_dirs, | |
token_path, | |
refresh_minute_interval): | |
""" | |
Args: | |
monitored_root_dir: root of monitored_dirs | |
monitored_dirs: list of monitored directories | |
token_path: path for token | |
refresh_minute_interval: interval time between uploads | |
""" | |
assert refresh_minute_interval > 0 | |
# Parse args | |
self.monitored_root_dir = monitored_root_dir | |
self.monitored_dirs = monitored_dirs | |
self.refresh_minute_interval = refresh_minute_interval | |
self.hostname = socket.gethostname() | |
# Mock argparse, just fill out the default values here | |
self.args = argparse.Namespace() | |
self.args.yes = True | |
self.args.no = False | |
self.args.default = False | |
try: | |
self.args.token = DropboxUploadDaemon._get_token_config(token_path) | |
except: | |
raise FileNotFoundError( | |
"Could not load token from {}".format(token_path)) | |
# Logging support | |
self.logger = logging.getLogger('DropboxUploadDaemon') | |
self.logger.setLevel(logging.INFO) # TODO: somehow need to use warning | |
self.logger.warning("DropboxUploadDaemon started") | |
self.logger.warning("monitored_root_dir: %s", self.monitored_root_dir) | |
self.logger.warning("monitored_dirs: %s", self.monitored_dirs) | |
self.logger.warning("refresh_minute_interval: %s", | |
self.refresh_minute_interval) | |
self.logger.warning("hostname: %s", self.hostname) | |
# self.logger.warning(self.args) | |
def start_daemon(self): | |
""" | |
Loop through folders and sync them periodically | |
""" | |
while True: | |
# Loop through all specified dirs and sync | |
self._run_sync_once() | |
# Sleep | |
sleep_seconds = int(self.refresh_minute_interval * 60) | |
self.logger.warning("Sleeping for %s seconds", sleep_seconds) | |
time.sleep(sleep_seconds) | |
def _run_sync_once(self): | |
""" | |
Loop through all specified dirs and sync | |
""" | |
for monitored_dir in self.monitored_dirs: | |
# Local folder | |
self.args.rootdir = os.path.join(self.monitored_root_dir, | |
monitored_dir) | |
if not os.path.isdir(self.args.rootdir): | |
self.logger.warning("Local directory %s does not exist", | |
self.args.rootdir) | |
continue | |
# Remote folder, with hostname prefix | |
self.args.folder = os.path.join(self.hostname, monitored_dir) | |
# Upload | |
self.logger.warning("%s Uploading from %s to %s %s", | |
'\x1b[6;30;42m', self.args.rootdir, | |
self.args.folder, '\x1b[0m') | |
updown.main(self.args) | |
@staticmethod | |
def _get_token_config(config_path='token.cfg'): | |
""" | |
Helper function to read token configs | |
""" | |
with open(config_path) as f: | |
token = f.readlines()[0].strip() | |
return token | |
if __name__ == '__main__': | |
# Setup monitored directories | |
monitored_root_dir = os.path.expanduser("~/path/to/dir") | |
monitored_dirs = [ | |
"foo/bar", | |
"hello", | |
] | |
# Call updown.py and upload files periodically | |
dud = DropboxUploadDaemon(monitored_root_dir=monitored_root_dir, | |
monitored_dirs=monitored_dirs, | |
token_path='token.cfg', | |
refresh_minute_interval=5) | |
dud.start_daemon() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Upload the contents of your Downloads folder to Dropbox. | |
Modified from: | |
https://github.com/dropbox/dropbox-sdk-python/blob/master/example/updown.py | |
MIT License | |
This is an example app for API v2. | |
""" | |
from __future__ import print_function | |
import argparse | |
import contextlib | |
import datetime | |
import os | |
import six | |
import sys | |
import time | |
import unicodedata | |
if sys.version.startswith('2'): | |
input = raw_input # noqa: E501,F821; pylint: disable=redefined-builtin,undefined-variable,useless-suppression | |
import dropbox | |
# OAuth2 access token. TODO: login etc. | |
TOKEN = '' | |
def main(args): | |
"""Main program. | |
Parse command line, then iterate over files and directories under | |
rootdir and upload all files. Skips some temporary files and | |
directories, and avoids duplicate uploads by comparing size and | |
mtime with the server. | |
""" | |
if sum([bool(b) for b in (args.yes, args.no, args.default)]) > 1: | |
print('At most one of --yes, --no, --default is allowed') | |
sys.exit(2) | |
if not args.token: | |
print('--token is mandatory') | |
sys.exit(2) | |
folder = args.folder | |
rootdir = os.path.expanduser(args.rootdir) | |
print('Dropbox folder name:', folder) | |
print('Local directory:', rootdir) | |
if not os.path.exists(rootdir): | |
print(rootdir, 'does not exist on your filesystem') | |
sys.exit(1) | |
elif not os.path.isdir(rootdir): | |
print(rootdir, 'is not a folder on your filesystem') | |
sys.exit(1) | |
dbx = dropbox.Dropbox(args.token) | |
for dn, dirs, files in os.walk(rootdir): | |
subfolder = dn[len(rootdir):].strip(os.path.sep) | |
listing = list_folder(dbx, folder, subfolder) | |
print('Descending into', subfolder, '...') | |
# First do all the files. | |
for name in files: | |
fullname = os.path.join(dn, name) | |
if not isinstance(name, six.text_type): | |
name = name.decode('utf-8') | |
nname = unicodedata.normalize('NFC', name) | |
if name.startswith('.'): | |
print('Skipping dot file:', name) | |
elif name.startswith('@') or name.endswith('~'): | |
print('Skipping temporary file:', name) | |
elif name.endswith('.pyc') or name.endswith('.pyo'): | |
print('Skipping generated file:', name) | |
elif nname in listing: | |
md = listing[nname] | |
mtime = os.path.getmtime(fullname) | |
mtime_dt = datetime.datetime(*time.gmtime(mtime)[:6]) | |
size = os.path.getsize(fullname) | |
if (isinstance(md, dropbox.files.FileMetadata) and | |
mtime_dt == md.client_modified and size == md.size): | |
print(name, 'is already synced [stats match]') | |
else: | |
print(name, 'exists with different stats, downloading') | |
res = download(dbx, folder, subfolder, name) | |
with open(fullname) as f: | |
data = f.read() | |
if res == data: | |
print(name, 'is already synced [content match]') | |
else: | |
print(name, 'has changed since last sync') | |
if yesno('Refresh %s' % name, False, args): | |
upload(dbx, fullname, folder, subfolder, name, | |
overwrite=True) | |
elif yesno('Upload %s' % name, True, args): | |
upload(dbx, fullname, folder, subfolder, name) | |
# Then choose which subdirectories to traverse. | |
keep = [] | |
for name in dirs: | |
if name.startswith('.'): | |
print('Skipping dot directory:', name) | |
elif name.startswith('@') or name.endswith('~'): | |
print('Skipping temporary directory:', name) | |
elif name == '__pycache__': | |
print('Skipping generated directory:', name) | |
elif yesno('Descend into %s' % name, True, args): | |
print('Keeping directory:', name) | |
keep.append(name) | |
else: | |
print('OK, skipping directory:', name) | |
dirs[:] = keep | |
def list_folder(dbx, folder, subfolder): | |
"""List a folder. | |
Return a dict mapping unicode filenames to | |
FileMetadata|FolderMetadata entries. | |
""" | |
path = '/%s/%s' % (folder, subfolder.replace(os.path.sep, '/')) | |
while '//' in path: | |
path = path.replace('//', '/') | |
path = path.rstrip('/') | |
try: | |
with stopwatch('list_folder'): | |
res = dbx.files_list_folder(path) | |
except dropbox.exceptions.ApiError as err: | |
print('Folder listing failed for', path, '-- assumed empty:', err) | |
return {} | |
else: | |
rv = {} | |
for entry in res.entries: | |
rv[entry.name] = entry | |
return rv | |
def download(dbx, folder, subfolder, name): | |
"""Download a file. | |
Return the bytes of the file, or None if it doesn't exist. | |
""" | |
path = '/%s/%s/%s' % (folder, subfolder.replace(os.path.sep, '/'), name) | |
while '//' in path: | |
path = path.replace('//', '/') | |
with stopwatch('download'): | |
try: | |
md, res = dbx.files_download(path) | |
except dropbox.exceptions.HttpError as err: | |
print('*** HTTP error', err) | |
return None | |
data = res.content | |
print(len(data), 'bytes; md:', md) | |
return data | |
def upload(dbx, fullname, folder, subfolder, name, overwrite=False): | |
"""Upload a file. | |
Return the request response, or None in case of error. | |
""" | |
path = '/%s/%s/%s' % (folder, subfolder.replace(os.path.sep, '/'), name) | |
while '//' in path: | |
path = path.replace('//', '/') | |
mode = (dropbox.files.WriteMode.overwrite | |
if overwrite | |
else dropbox.files.WriteMode.add) | |
mtime = os.path.getmtime(fullname) | |
with open(fullname, 'rb') as f: | |
data = f.read() | |
with stopwatch('upload %d bytes' % len(data)): | |
try: | |
res = dbx.files_upload( | |
data, path, mode, | |
client_modified=datetime.datetime(*time.gmtime(mtime)[:6]), | |
mute=True) | |
except dropbox.exceptions.ApiError as err: | |
print('*** API error', err) | |
return None | |
print('uploaded as', res.name.encode('utf8')) | |
return res | |
def yesno(message, default, args): | |
"""Handy helper function to ask a yes/no question. | |
Command line arguments --yes or --no force the answer; | |
--default to force the default answer. | |
Otherwise a blank line returns the default, and answering | |
y/yes or n/no returns True or False. | |
Retry on unrecognized answer. | |
Special answers: | |
- q or quit exits the program | |
- p or pdb invokes the debugger | |
""" | |
if args.default: | |
print(message + '? [auto]', 'Y' if default else 'N') | |
return default | |
if args.yes: | |
print(message + '? [auto] YES') | |
return True | |
if args.no: | |
print(message + '? [auto] NO') | |
return False | |
if default: | |
message += '? [Y/n] ' | |
else: | |
message += '? [N/y] ' | |
while True: | |
answer = input(message).strip().lower() | |
if not answer: | |
return default | |
if answer in ('y', 'yes'): | |
return True | |
if answer in ('n', 'no'): | |
return False | |
if answer in ('q', 'quit'): | |
print('Exit') | |
raise SystemExit(0) | |
if answer in ('p', 'pdb'): | |
import pdb | |
pdb.set_trace() | |
print('Please answer YES or NO.') | |
@contextlib.contextmanager | |
def stopwatch(message): | |
"""Context manager to print how long a block of code took.""" | |
t0 = time.time() | |
try: | |
yield | |
finally: | |
t1 = time.time() | |
print('Total elapsed time for %s: %.3f' % (message, t1 - t0)) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Sync ~/Downloads to Dropbox') | |
parser.add_argument('folder', nargs='?', default='Downloads', | |
help='Folder name in your Dropbox') | |
parser.add_argument('rootdir', nargs='?', default='~/Downloads', | |
help='Local directory to upload') | |
parser.add_argument('--token', default=TOKEN, | |
help='Access token ' | |
'(see https://www.dropbox.com/developers/apps)') | |
parser.add_argument('--yes', '-y', action='store_true', | |
help='Answer yes to all questions') | |
parser.add_argument('--no', '-n', action='store_true', | |
help='Answer no to all questions') | |
parser.add_argument('--default', '-d', action='store_true', | |
help='Take default answer on all questions') | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment