Last active
July 27, 2020 13:53
-
-
Save sirosen/0d6a8afa87130281320cb1df84d0da59 to your computer and use it in GitHub Desktop.
Copy Globus Endpoint ACL Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Copy all of the Access Control Lists from one Globus Endpoint to another, by | |
means of the globus-cli | |
This will not remove any ACLs from either endpoints and it will not stop to | |
check for duplicates | |
""" | |
import argparse | |
import json | |
import subprocess | |
import sys | |
import textwrap | |
def fail(message, code=1): | |
print(message, file=sys.stderr) | |
sys.exit(code) | |
def _run(*args): | |
return subprocess.run( | |
["globus"] + list(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
) | |
def check_logged_in(): | |
rc = _run("whoami").returncode | |
if rc != 0: | |
print( | |
"you must be logged in with the globus-cli to run this script", | |
file=sys.stderr, | |
) | |
sys.exit(2) | |
def check_endpoint_exists(epid): | |
rc = _run("endpoint", "show", epid).returncode | |
if rc != 0: | |
print("no endpoint found with ID {}".format(epid), file=sys.stderr) | |
sys.exit(1) | |
def copy_acls(src, dst, checkpoint_file): | |
# fetch and parse source ACLs | |
src_acls_raw = _run( | |
"endpoint", "permission", "list", src, "--jmespath", "DATA[?id!=null]" | |
).stdout | |
src_acls = json.loads(src_acls_raw) | |
# TODO: filter on an existing checkpoint_file here | |
# get an open handle on the checkpoint file in append mode, if one was | |
# passed (otherwise, none) | |
fp = open(checkpoint_file, "a") if checkpoint_file else None | |
# recreate on destination | |
try: | |
for doc in src_acls: | |
path = doc["path"] | |
permissions = doc["permissions"] | |
principal = doc["principal"] | |
principal_type = doc["principal_type"] | |
principal_args = ( | |
"--group" if principal_type == "group" else "--identity", | |
principal, | |
) | |
create_result = _run( | |
"endpoint", | |
"permission", | |
"create", | |
"{}:{}".format(dst, path), | |
"--permissions", | |
permissions, | |
*principal_args | |
) | |
if create_result.returncode == 0 and fp: | |
fp.write(json.dumps(doc) + "\n") | |
else: | |
print("error creating permission:", file=sys.stderr) | |
print(textwrap.indent(create_result.stderr.decode("utf-8"), " "), file=sys.stderr) | |
sys.exit(1) | |
finally: | |
if fp: | |
fp.close() | |
def main(): | |
parser = argparse.ArgumentParser( | |
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter | |
) | |
parser.add_argument("SOURCE_ENDPOINT_ID") | |
parser.add_argument("TARGET_ENDPOINT_ID") | |
# this script could be extended to skip rules found in any existing | |
# checkpoint file, so you can resume a failed copy | |
parser.add_argument( | |
"--checkpoint-file", | |
help=( | |
"An optional file where checkpoints will be written so you can " | |
"track which rules have already been created." | |
), | |
) | |
args = parser.parse_args() | |
check_logged_in() | |
check_endpoint_exists(args.SOURCE_ENDPOINT_ID) | |
check_endpoint_exists(args.TARGET_ENDPOINT_ID) | |
copy_acls( | |
args.SOURCE_ENDPOINT_ID, | |
args.TARGET_ENDPOINT_ID, | |
checkpoint_file=args.checkpoint_file, | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment