Skip to content

Instantly share code, notes, and snippets.

@sanyamsmulay
Created August 2, 2023 17:17
Show Gist options
  • Save sanyamsmulay/d25e43c580da6cb6705fa02843d87606 to your computer and use it in GitHub Desktop.
Save sanyamsmulay/d25e43c580da6cb6705fa02843d87606 to your computer and use it in GitHub Desktop.
Google Workspace --> Transfer files and delete user
asttokens==2.2.1
backcall==0.2.0
cachetools==5.3.1
certifi==2023.7.22
charset-normalizer==3.2.0
decorator==5.1.1
executing==1.2.0
google-api-python-client==1.7.9
google-auth==2.22.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.0
httplib2==0.22.0
idna==3.4
ipython==8.14.0
jedi==0.19.0
matplotlib-inline==0.1.6
oauthlib==3.2.2
parso==0.8.3
pexpect==4.8.0
pickleshare==0.7.5
prompt-toolkit==3.0.39
ptyprocess==0.7.0
pure-eval==0.2.2
pyasn1==0.5.0
pyasn1-modules==0.3.0
Pygments==2.15.1
pyparsing==3.1.1
requests==2.31.0
requests-oauthlib==1.3.1
rsa==4.9
six==1.16.0
stack-data==0.6.2
traitlets==5.9.0
uritemplate==3.0.1
urllib3==1.26.16
wcwidth==0.2.6
import os.path
import time
import csv
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
# If modifying these scopes, delete the file token.json.
SCOPES = [
'https://www.googleapis.com/auth/admin.directory.user',
'https://www.googleapis.com/auth/admin.datatransfer'
]
# this function is taken from
# https://github.com/googleworkspace/python-samples/blob/main/admin_sdk/directory/quickstart.py
def get_latest_creds():
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists('token.json'):
print("credentials exist")
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
print("vaid credentials not present, requesting")
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.json', 'w') as token:
token.write(creds.to_json())
return creds
def transfer_drive_data(transfer_from_email, transfer_to_email, services):
# get users by email
# TODO: need better error handling in case users are not found
print("getting user ids")
trf_from_user = services['directoryService'].users().get(userKey=transfer_from_email).execute()
trf_to_user = services['directoryService'].users().get(userKey=transfer_to_email).execute()
print("\n Will transfer files:")
print("from user: ", f'Name: {trf_from_user["name"]["fullName"]}, id: {trf_from_user["id"]}')
print("to user: ", f'Name: {trf_to_user["name"]["fullName"]}, id: {trf_to_user["id"]}')
applications_available = services['transferService'].applications().list(customerId=trf_from_user['id']).execute()
print("applications available for transfer")
googleDriveApplication = next(
(appli for appli in applications_available['applications'] if appli['name'] == 'Drive and Docs'),
None
)
if googleDriveApplication:
print(f"Found google drive application id: {googleDriveApplication['id']}, name: {googleDriveApplication['name']}")
dataTransferConf = {
"oldOwnerUserId": trf_from_user['id'],
"newOwnerUserId": trf_to_user['id'],
"kind": "admin#datatransfer#DataTransfer",
"applicationDataTransfers": [
{
"applicationId": googleDriveApplication['id'],
"applicationTransferParams": googleDriveApplication['transferParams']
}
]
}
transfer = services['transferService'].transfers().insert(body=dataTransferConf).execute()
# print("transfer queued: ", transfer)
print("transfer queued: ")
return transfer
else:
return None
def transfer_files_and_delete_user(transfer_from_email, transfer_to_email):
# get latest credentials
creds = get_latest_creds()
# transfer ownership
services = {
"directoryService": build('admin', 'directory_v1', credentials=creds),
"transferService": build('admin', 'datatransfer_v1', credentials=creds)
}
transfer = transfer_drive_data(transfer_from_email, transfer_to_email, services)
# wait till the transfer is complete
transfer_done = False
while not transfer_done:
print("waiting for transfer to complete")
# no event to subscribe to, polling is the only option
time.sleep(10)
transfer_upd = services['transferService'].transfers().get(dataTransferId=transfer['id']).execute()
all_application_transfers_done = all(i['applicationTransferStatus']=='completed' for i in transfer_upd['applicationDataTransfers'])
if all_application_transfers_done and transfer_upd['overallTransferStatusCode'] == 'completed':
transfer_done = True
print("transfer completed")
# delete user - will be falsy if all goes well
user_deletion_error = services['directoryService'].users().delete(userKey=transfer_from_email).execute()
if not user_deletion_error:
print("deleted user with email: ", transfer_from_email)
else:
print("error deleting user with email: ", transfer_from_email)
return (transfer_done, not user_deletion_error)
if __name__ == '__main__':
# stats
total = 0
transfer_complete_count = transfer_error_count = 0
user_deleted_count = user_deletion_error_count = 0
with (open('transfers_n_deletions_input.csv', 'r', newline='') as tnf_del_data_file,
open('transfer_n_deletions_output.csv', 'a', newline='\n') as tnf_del_out_file):
fieldnames = ["transfer_from_email","transfer_to_email", "transfer_done", "user_deleted"]
input_reader = csv.DictReader(tnf_del_data_file, delimiter=',', quotechar='"')
output_writer = csv.DictWriter(tnf_del_out_file,
fieldnames=fieldnames, delimiter=',', quotechar='"')
# output_writer.writeheader() # can use it first time
for transfer_pair in input_reader:
total+=1
transfer_from_email = transfer_pair['transfer_from_email']
transfer_to_email = transfer_pair['transfer_to_email']
print("transfer_from_email: ", transfer_from_email)
print("transfer_to_email: ", transfer_to_email)
# call main function
transfer_done, user_deleted = False, False
try:
# TODO: can be made parallel, takes a lot of time to run if there are users that have lot of data to transfer
transfer_done, user_deleted = transfer_files_and_delete_user(transfer_from_email, transfer_to_email)
except Exception as e:
print("error occurred in transfer and / or deletion ")
# transfer_done, user_deleted = True, True # for testing
## TODO: find a better way to write this
# if transfer_done:
# transfer_complete_count+=1
# else:
# transfer_error_count+=1
if transfer_done: transfer_complete_count+=1
if not transfer_done: transfer_error_count+=1
if user_deleted: user_deleted_count+=1
if not transfer_done: user_deletion_error_count+=1
# writing to file
output_writer.writerow({**transfer_pair,
"transfer_done": transfer_done,
"user_deleted": user_deleted})
tnf_del_out_file.flush() # someone running a tail on the output file will see a running output
# print a nice separator
print("-c-c-c-\n\n\n-s-s-s-")
print(f'\n\ntotal jobs: {total}')
print(f'transfers_completed: {transfer_complete_count}, transfer_errored: {transfer_error_count}')
print(f'users_deleted: {user_deleted_count}, users_deletion_errored: {user_deletion_error_count}')
transfer_from_email transfer_to_email transfer_done user_deleted
[email protected] [email protected] True True
person2@example [email protected] True True
@sanyamsmulay
Copy link
Author

sanyamsmulay commented Aug 2, 2023

How to use:

  • create organisation - google cloud console
  • under this organisation - register a project
  • enable Admin SDK API for this project
  • Create a Oauth 2 desktop client for this project
  • download the credentials.json for this client
  • install virtual env
  • install dependencies from requirements.txt (also installs ipython, I like to work with ipython :D)

BE VERY CAREFUL AFTER THIS - many actions are not reversible - you may seriously mess things up

Improvements / enhancements welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment