Created
September 29, 2017 22:55
-
-
Save telenieko/8050c92849414b3f42d0d090de1be033 to your computer and use it in GitHub Desktop.
Script to do maintenance on Google Drive, example of gevent, google drive sdk, rate limiting, automatic retries, ...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# pylint: disable=missing-docstring,wrong-import-position,invalid-name,len-as-condition | |
# pylint: disable=superfluous-parens,wrong-import-order,ungrouped-imports,unsubscriptable-object | |
""" gdrive_cleanup.py <[email protected]> [start folder_id] | |
License: This code is release to the Public Domain under "CC0 1.0 Universal" License | |
Author: Marc Fargas <[email protected]> | |
This script is one we use to do some maintenance on user's Google Drive folders. | |
Things it does: | |
- Find duplicate files owned by the same user and "merge" them | |
(put one of them on the same parents as the others and delete them) | |
- Find similarly named folders next to each other and merge its contents | |
(ie: after failed syncs because of nice non-ascii characters with wrong | |
enoding) | |
- Find files in 'root' that are also in other folders and "unroot" them. | |
This is not a one size-fits-all thing, it is published to the Public Domain | |
for reference and example for anyone looking for example code on how to work | |
with Google Drive SDK. | |
This code: | |
- Is written with gevent in order to go as fast as possible (ie: we | |
keep parsing trees while waiting to receive other trees). | |
- Calls to the API are wrapped by the rate_limited decorator which | |
tries to minimize how many 'rate limit exceeded' you get. | |
- Calls to the API are wrapped by an automatic retrier to simplify | |
how to manage errors (you will get 'rate limit exceeded' no matter how | |
good your rate limit is). | |
- httplib2shim is used to avoid httplib2 thread safety issues (the code | |
was originally written with threads before using gevent) it might work | |
with normal httplib2... | |
- gevents are grouped inside GROUP so we can easily wait when needed | |
(see calls to GROUP.join()) | |
- I tried to be nice with pylint. | |
This code is dated September 2017 using Google Drive v3 API & Python 3.5 | |
Details: | |
This tool uses a Service Account to impersonate the users on Google G Suite, | |
if you got here you're likely to know what that means or have to means to find | |
out and set it up. If you can't ... well, then maybe you should not attempt to | |
use this code!! | |
""" | |
# pip install attrs gevent google-api-python httplib2shim retry | |
from gevent import monkey; monkey.patch_all() # pylint: disable=multiple-statements | |
import logging | |
import os | |
import re | |
import string | |
import sys | |
from timeit import default_timer | |
import apiclient.discovery # pylint: disable=import-error | |
import apiclient.errors # pylint: disable=import-error | |
import apiclient.http # pylint: disable=import-error | |
import attr | |
import gevent | |
import httplib2shim | |
from gevent.pool import Group | |
from oauth2client.service_account import ServiceAccountCredentials | |
from retry import retry | |
from functools import wraps | |
from typing import List, Dict, Any # pylint: disable=unused-import | |
BASE = os.path.dirname(__file__) | |
logging.basicConfig(level=logging.WARNING) | |
logger = logging.getLogger('gdrive') | |
GROUP = Group() #gevent group. | |
### CONFIGS: | |
NOOP = False # If TRUE then dry-run (do not change anything on Drive) | |
KEYFILE = os.path.join(BASE, "google-service-account-key.json") | |
logger.setLevel(logging.INFO) | |
def is_same_title(one, two): | |
o = re.sub(r'[\W_]+', one, string.printable) | |
t = re.sub(r'[\W_]+', two, string.printable) | |
return o.lower() == t.lower() | |
### END CONFIGS. | |
def gevent_throttle(calls_per_sec=0): | |
"""Decorates a Greenlet function for throttling. | |
https://gist.github.com/p7k/4238388 | |
""" | |
interval = 1. / calls_per_sec if calls_per_sec else 0 | |
def decorate(func): | |
blocked = [False] # has to be a list to not get localised inside the while loop | |
# otherwise, UnboundLocalError: local variable 'blocked' referenced before assignment | |
last_time = [0] # ditto | |
@wraps(func) # propagates docstring | |
def throttled_func(*args, **kwargs): | |
while True: | |
# give other greenlets a chance to run, otherwise we | |
# might get stuck while working thread is sleeping and the block is ON | |
gevent.sleep(0) | |
if not blocked[0]: | |
blocked[0] = True | |
# check if actually might need to pause | |
if calls_per_sec: | |
last, current = last_time[0], default_timer() | |
elapsed = current - last | |
if elapsed < interval: | |
gevent.sleep(interval - elapsed) | |
last_time[0] = default_timer() | |
blocked[0] = False | |
return func(*args, **kwargs) | |
return throttled_func | |
return decorate | |
rate_limited = gevent_throttle(1000/100) | |
def spawn(func): | |
@wraps(func) | |
def with_greenlet(*args, **kwargs): | |
g = gevent.spawn(func, *args, **kwargs) | |
GROUP.add(g) | |
return g | |
return with_greenlet | |
@attr.s(repr=False) | |
class DriveItem(object): | |
""" Simple representation of a Google Drive item. """ | |
# The original item data as received from Google. | |
source = attr.ib() # type: Dict[str, Any] | |
parent = attr.ib() # The DriveItem instance parent of this. | |
owned_by_me = attr.ib() # Does the current user own this item? | |
# List of children items (aka: child.parent == this) | |
children = attr.ib(default=attr.Factory(list), repr=False) # type: List[DriveItem] | |
def __repr__(self): | |
return u"%s %s" % (self.source['id'], self.path()) | |
def path(self): | |
""" Return the full path as a string. """ | |
if self.parent: | |
return self.parent.path() + '/' + self.source['name'] | |
return '/' + self.source['name'] | |
def is_folder(self): | |
""" Am I a folder? """ | |
return self.source['mimeType'] == 'application/vnd.google-apps.folder' | |
@retry(exceptions=apiclient.errors.HttpError, delay=5, jitter=(1, 15), max_delay=20) | |
def get_parents(self, service): | |
return service.files().get(fileId=self.source['id'], | |
fields='parents').execute() | |
@spawn | |
@retry(exceptions=apiclient.errors.HttpError, delay=1, jitter=(1, 5), max_delay=5) | |
@rate_limited | |
def trash(self, service): | |
""" Send me to the trash. """ | |
logger.info("Trash: %s", self) | |
if not NOOP: | |
service.files().update(fileId=self.source['id'], | |
body={'trashed': True}).execute() | |
self.parent.children.remove(self) | |
self.parent = None | |
@spawn | |
@retry(exceptions=apiclient.errors.HttpError, delay=1, jitter=(1, 5), max_delay=5) | |
@rate_limited | |
def change_parent(self, service, new_parent, remove_parent=None): | |
""" Add the new_parent as a parent of self, | |
if remove_parent is not None we remove it too. | |
""" | |
remove_parent_id = None | |
if not remove_parent: | |
logger.info("AddParent: %s \n\tTo: %s", self, new_parent) | |
else: | |
remove_parent_id = remove_parent.source['id'] | |
logger.info("ChangeParent: %s \n\tFrom: %s\n\tTo: %s", | |
self, remove_parent, new_parent) | |
if new_parent.source['id'] == remove_parent_id: | |
return | |
if not NOOP: | |
service.files().update(fileId=self.source['id'], | |
addParents=new_parent.source['id'], | |
removeParents=remove_parent_id, | |
fields='id, parents').execute() | |
self.parent = new_parent | |
if remove_parent: | |
remove_parent.children.remove(self) | |
new_parent.children.append(self) | |
def die_for(self, service, stays): | |
""" Kill Mr. self and add his parent to stays. | |
Presumably they're the same guy. """ | |
if stays.source['id'] == self.source['id']: | |
return # They ARE the same. | |
if stays.source['md5Checksum'] != self.source['md5Checksum']: | |
logger.warning("md5Checksums not equal: \n\t'%s' \n\t'%s'", | |
stays.path(), self.path()) | |
return | |
stays.change_parent(service, self.parent) | |
self.trash(service) | |
self.parent.children.append(stays) | |
self.parent.children.remove(self) | |
def merge_to(self, service, stays): | |
""" Reparent all children of source to stays. """ | |
if stays.source['id'] == self.source['id']: | |
return # They ARE the same. | |
if not stays.is_folder(): | |
logger.warning("Got a non-folder in merge_to stays: %s", stays) | |
return | |
if not self.is_folder(): | |
logger.warning("Got a non-folder in merge_to source: %s", self) | |
return | |
for ch in self.children: # pylint: disable=not-an-iterable | |
ch.change_parent(service, stays, remove_parent=self) | |
self.trash(service) | |
@attr.s | |
class KnownMD5(object): | |
ALL_KNOWN = [] | |
md5 = attr.ib(cmp=True) | |
known = attr.ib(default=attr.Factory(list), cmp=False) | |
def __str__(self): | |
return self.md5 | |
@staticmethod | |
def add(md5, item): | |
found = False | |
for i in KnownMD5.ALL_KNOWN: | |
if i.md5 == md5: | |
i.known += [item] | |
found = True | |
break | |
if not found: | |
k = KnownMD5(md5=md5) | |
k.known = [item, ] | |
KnownMD5.ALL_KNOWN += [k] | |
def unify(self, service): | |
logger.debug("KnownMD5.unify(%s)", self.md5) | |
only_mine = filter(lambda x: x.owned_by_me, self.known) | |
longest = sorted(only_mine, key=lambda x: x.source['name'].split(' '), reverse=True) | |
if len(longest) == 0: | |
return | |
stays = longest[0] | |
logger.info("%s Stays", stays) | |
for d in longest[1:]: | |
if d.source['id'] == stays.source['id']: | |
continue | |
if d.parent.source['id'] != stays.parent.source['id']: | |
stays.change_parent(service, d.parent) | |
logger.info("%s is dupe of %s (%s vs %s)", d.path(), stays.path(), | |
d.source['id'], stays.source['id']) | |
d.trash(service) | |
GROUP.join() | |
def get_service_creds(email, scope): | |
""" Get Credentials for a given user using the ServiceAccountCredentials. """ | |
main_credentials = ServiceAccountCredentials.from_json_keyfile_name(KEYFILE, scopes=scope) | |
credentials = main_credentials.create_delegated(email) | |
http = credentials.authorize(httplib2shim.Http()) | |
credentials.refresh(http) | |
return http | |
def get_drive_service(email): | |
""" Get a service instance for Google Drive API v3. """ | |
scope = 'https://www.googleapis.com/auth/drive' | |
http = get_service_creds(email, scope=scope) | |
drive_service = apiclient.discovery.build('drive', 'v3', http=http) | |
return drive_service | |
def gdrive_am_i_owner(item): | |
""" Is the currently authenticated user the (or an) owner of the item? """ | |
return bool(item['ownedByMe']) | |
class SlowDown(Exception): | |
pass | |
@retry(exceptions=SlowDown, delay=5, jitter=(1, 15), max_delay=20) | |
@rate_limited | |
def get_list(service, q, page_token, fields=None): | |
if not fields: | |
fields = ('nextPageToken, files(id, name, mimeType, ownedByMe, ' | |
'kind, md5Checksum, modifiedTime)') | |
try: | |
return service.files().list(q=q, pageToken=page_token, fields=fields).execute() | |
except apiclient.errors.HttpError as err: | |
if err.resp.status in [403, 503]: | |
raise SlowDown() | |
else: | |
raise | |
@spawn | |
def gdrive_build_tree(service, this, level=0): | |
""" Build a tree of DriveItem instances, | |
pad is a padding for print calls. | |
this is the current DriveItem instance. | |
service is a service from get_drive_service(). | |
""" | |
folder_id = this.source['id'] | |
logger.info('%s: Gathering item listings for %s...', | |
this.path(), folder_id) | |
q = '\'{}\' in parents and not trashed'.format(folder_id) | |
page_token = None | |
while True: | |
try: | |
param = {} | |
if page_token: | |
param['pageToken'] = page_token | |
children = get_list(service, q, page_token) | |
for item in children.get('files', []): | |
sub = DriveItem(source=item, parent=this, owned_by_me=gdrive_am_i_owner(item)) | |
this.children.append(sub) | |
if sub.is_folder(): | |
logger.debug(u'%s Subdir Tree: %s (owned=%s)', | |
this.path(), item['name'], sub.owned_by_me) | |
if sub.owned_by_me: | |
gdrive_build_tree(service, sub, level+1) | |
if item['kind'] == 'drive#file': | |
logger.debug(u'%s File: (%s)', sub.path(), item['id']) | |
page_token = children.get('nextPageToken') | |
if not page_token: | |
break | |
except apiclient.errors.HttpError as e: | |
logger.exception('An error occurred: %s', e) | |
break | |
def merge_folders(service, tree): | |
""" Check the children of a folder for the ones with same name. | |
We then merge those. | |
""" | |
for ch in tree.children: | |
if not ch.is_folder(): | |
continue | |
aname = ch.source['name'] | |
for ch2 in tree.children: | |
if ch == ch2: | |
continue | |
if not ch2.is_folder(): | |
continue | |
if not ch.owned_by_me or not ch2.owned_by_me: | |
continue | |
bname = ch2.source['name'] | |
if is_same_title(aname, bname): | |
logger.info("%s, Merging: %s and %s", tree.path(), aname, bname) | |
ch2.merge_to(service, ch) | |
GROUP.join() | |
merge_folders(service, ch) | |
def find_known_md5(tree): | |
""" Build the list of known MD5 sums. """ | |
for ch in tree.children: | |
if ch.is_folder(): | |
find_known_md5(ch) | |
md5 = ch.source.get('md5Checksum', None) | |
if md5: | |
KnownMD5.add(md5, ch) | |
def clean_up_root(service, root): | |
""" Find files in root that have other parents, | |
then we remove from root. """ | |
@spawn | |
@retry(exceptions=apiclient.errors.HttpError, delay=1, jitter=(1, 5), max_delay=5) | |
@rate_limited | |
def remove_parent(service, item_id, parent_id): | |
logger.info("Taking %s out of %s", item_id, parent_id) | |
if not NOOP: | |
service.files().update(fileId=item_id, | |
removeParents=parent_id, | |
fields='id').execute() | |
folder_id = root.source['id'] | |
q = '\'{}\' in parents and not trashed'.format(folder_id) | |
fields = 'nextPageToken, files(id, parents)' | |
page_token = None | |
while True: | |
try: | |
param = {} | |
if page_token: | |
param['pageToken'] = page_token | |
children = get_list(service, q, page_token, fields=fields) | |
for item in children.get('files', []): | |
parents = item['parents'] | |
if len(parents) > 1: | |
remove_parent(service, item['id'], folder_id) | |
page_token = children.get('nextPageToken') | |
if not page_token: | |
break | |
except apiclient.errors.HttpError as e: | |
logger.exception('An error occurred: %s', e) | |
break | |
if __name__ == '__main__': | |
usuario = sys.argv[1] | |
try: | |
begin_at = sys.argv[2] | |
except IndexError: | |
begin_at = 'root' | |
groot = None | |
svc = get_drive_service(usuario) | |
if not groot: | |
logger.info("Getting Google Drive listing for %s", usuario) | |
root_item = svc.files().get(fileId=begin_at).execute() # pylint: disable=no-member | |
groot = DriveItem(source=root_item, parent=None, owned_by_me=True) | |
gdrive_build_tree(svc, groot) | |
GROUP.join() | |
merge_folders(svc, groot) | |
find_known_md5(groot) | |
for md5sum in KnownMD5.ALL_KNOWN: | |
if len(md5sum.known) > 1: | |
uniques = set([x.source['id'] for x in md5sum.known]) | |
logger.info("%s has %d matches (%d unique)", md5sum, | |
len(md5sum.known), len(uniques)) | |
md5sum.unify(svc) | |
clean_up_root(svc, groot) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment