Created
August 23, 2020 04:05
-
-
Save lamchau/8a173a65f372bb3cff04c952b45a69aa to your computer and use it in GitHub Desktop.
https://www.golinks.io client for learning python3 and argparse
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from collections import OrderedDict | |
from urllib.parse import urlencode | |
import json | |
import re | |
import subprocess | |
import sys | |
import argparse | |
parser = argparse.ArgumentParser(prog=__file__) | |
subparsers = parser.add_subparsers(dest='subcommand') | |
class QueryFilterChoice(argparse.Action): | |
''' | |
Allows for multiple flags using argparse. Returns a set. | |
For example: | |
choices = [a, b, c, d] | |
--filter a b c d e => 'error: e an invalid choice)' | |
--filter a <...> --output blah --filter b => (filter=(a, b)) | |
''' | |
_choices = ['slug', 'url', 'owner'] | |
_filters = {} | |
def __call__(self, parser, namespace, values, option_string=None): | |
if not values: | |
return | |
(choice, *patterns) = values | |
if choice not in self._choices: | |
message = f'invalid property: {choice}, (choose from {", ".join(self._choices)})' | |
raise argparse.ArgumentError(self, message) | |
if not patterns: | |
raise argparse.ArgumentError(self, f'{choice} requires a non-empty pattern') | |
if choice not in self._filters: | |
self._filters.setdefault(choice, OrderedDict()) | |
for p in patterns: | |
self._filters.get(choice).update({ p: None }) | |
setattr(namespace, self.dest, self._filters) | |
def argument(*names_or_flags, **kwargs): | |
return (list(names_or_flags), kwargs) | |
# inline subcommands for maintenance | |
# https://gist.github.com/mivade/384c2c41c3a29c637cb6c603d4197f9f | |
def subcommand(args=[], help=None, parent=subparsers): | |
def decorator(func): | |
parser = parent.add_parser(func.__name__, help=help, description=func.__doc__) | |
for arg in args: | |
parser.add_argument(*arg[0], **arg[1]) | |
parser.set_defaults(func=func) | |
return decorator | |
class GoLinkCLI: | |
@subcommand([argument('from_slug', help='go/<old_slug>'), | |
argument('to_slug', help='go/<new_slug>'), | |
argument('-f', '--force', action='store_true')], | |
help='copy go/link you own') | |
def copy(args): | |
'''Create a copy of a go/link''' | |
GoLinkCLI.render(GoLink.copy(args.from_slug, args.to_slug)) | |
@subcommand([argument('slug', help='go/<slug>'), | |
argument('url', help='the url to shorten')], | |
help='create a go/link') | |
def create(args): | |
'''Shortens a URL to a specified go/link''' | |
response = GoLink.create(args.slug, args.url) | |
GoLinkCLI.render(response) | |
@subcommand([argument('slug', help='go/<slug>')], | |
help='delete your go/link you own') | |
def delete(args): | |
'''Delete a go/link you own''' | |
response = GoLink.delete(args.slug) | |
GoLinkCLI.render(response) | |
# this endpoiont doesn't provide sorting, so we'll mimic the flags of #query | |
@subcommand([argument('-p', '--page', default=1, type=int, help='page index (default: %(default)s)'), | |
argument('-f', '--filter', action=QueryFilterChoice, nargs='+', help='filter results by field name'), | |
argument('-b', '--sort-by', default='slug', choices=['slug', 'url'], help='property to sort by (default: %(default)s)'), | |
argument('-s', '--sort', default='asc', choices=['asc', 'desc'], help='search slugs/urls for text (default: %(default)s)'), | |
argument('-r', '--results-per-page', type=int, default=250, help='number of results (default: %(default)s)'), | |
argument('--table', action='store_true', help='render results as a table')], | |
help='only my go/links') | |
def mine(args): | |
''' Retrieve all go/links owned by you''' | |
response = GoLink.mine(args.page, args.results_per_page) | |
if response.get('error'): | |
GoLinkCLI.render(response) | |
GoLinkCLI.filter_response(response, args.filter) | |
response.update({ | |
'results': sorted(response.get('results'), | |
key=lambda x: x.get(args.sort_by), | |
reverse=False if args.sort == 'asc' else True) | |
}) | |
GoLinkCLI.render(response, args.table) | |
# sorting is limited server-side, don't provide optons not handled by the server | |
@subcommand([argument('query', nargs='?', default=None, help='search slugs/urls for text'), | |
argument('-p', '--page', default=1, type=int, help='page index (default: %(default)s)'), | |
argument('-f', '--filter', action=QueryFilterChoice, nargs='+', help='filter results by field name'), | |
argument('-b', '--sort-by', default='slug', choices=['created_at', 'slug'], help='property to sort by (default: %(default)s)'), | |
argument('-s', '--sort', default='asc', choices=['asc', 'desc'], help='search slugs/urls for text (default: %(default)s)'), | |
argument('-r', '--results-per-page', type=int, default=100, help='number of results (default: %(default)s)'), | |
argument('--table', action='store_true', help='render results as a table')], | |
help='search for all go/links with a slug/url containing <query>') | |
def query(args): | |
'''Search all go/links for a given text (substring only)''' | |
response = GoLink.query( | |
query=args.query, | |
page=args.page, | |
sort_by=args.sort_by, | |
direction=args.sort, | |
per_page=args.results_per_page | |
) | |
if response.get('error'): | |
GoLinkCLI.render(response) | |
response = GoLinkCLI.filter_response(response, args.filter) | |
GoLinkCLI.render(response, args.table) | |
@subcommand([argument('from_slug', help='go/<old_slug>'), | |
argument('to_slug', help='go/<new_slug>'), | |
argument('-f', '--force', action='store_true')], | |
help='rename go link you own') | |
def rename(args): | |
'''Renames your go/links from one to another''' | |
response = GoLink.rename(args.from_slug, args.to_slug, args.force) | |
GoLinkCLI.render(response) | |
@subcommand([argument('slug', help='go/<slug>')], help='resolve a given go/link') | |
def resolve(args): | |
'''Resolve a given slug to it's expanded URL''' | |
response = GoLink.get(args.slug) | |
GoLinkCLI.render({ | |
'slug': response.get('slug'), | |
'url': response.get('url') | |
}) | |
@subcommand([argument('slug', help='go/<slug>'), | |
argument('url', help='the url to shorten')], | |
help='update a go/link with a new url') | |
def update(args): | |
'''Update the URL go a go/link''' | |
GoLinkCLI.render(GoLink.update(args.slug, args.url)) | |
@staticmethod | |
def filter_response(response, filter_patterns): | |
if filter_patterns: | |
patterns = {} | |
for key, value in args.filter.items(): | |
compiled_patterns = '|'.join(value.keys()) | |
compiled_patterns = re.compile(f'.*{compiled_patterns}.*', re.IGNORECASE) | |
patterns.setdefault(key, compiled_patterns) | |
def match_predicate(link): | |
for prop, value in link.items(): | |
for f, pattern in patterns.items(): | |
value = value if value else 'null' | |
if prop == f and pattern.match(value): | |
return True | |
filtered_results = list(filter(match_predicate, response.get('results'))) | |
response.update({ | |
'results': filtered_results | |
}) | |
return response | |
@staticmethod | |
def render(response, render_table=False): | |
return_code = 0 if not response.get('error') else 1 | |
if render_table and return_code == 0: | |
try: | |
import prettytable | |
except (ImportError) as e: | |
print('in order to use table, `prettytable` must be installed: pip3 install prettytable', e) | |
sys.exit(1) | |
table = prettytable.PrettyTable() | |
has_owner = any(x.get('owner') for x in response.get('results')) | |
if has_owner: | |
table.field_names = ['owner', 'slug', 'url'] | |
else: | |
table.field_names = ['slug', 'url'] | |
for column in table.field_names: | |
if column in table.align: | |
table.align[column] = 'l' | |
for item in response.get('results'): | |
row = [] | |
if has_owner: | |
owner = item.get('owner') | |
row.append(owner if owner else '') | |
row.extend([item.get('slug'), item.get('url')]) | |
table.add_row(row) | |
print(table) | |
else: | |
print(json.dumps(response, indent=2)) | |
sys.exit(return_code) | |
class GoLink: | |
@staticmethod | |
def copy(from_slug, to_slug) -> dict: | |
old_link = GoLink.get(from_slug) | |
if not GoLink.exists(old_link): | |
return { | |
'error': f'{from_slug} does not exist' | |
} | |
response = GoLink.create(to_slug, old_link.get('url')) | |
if 'error' in response: | |
error = response.get('error') | |
response.setdefault( | |
'error', f'Failed to create {to_slug}: (reason: {error})') | |
return response | |
@staticmethod | |
def create(slug, url) -> dict: | |
link = GoLink.get(slug) | |
formatted_slug = link.get('slug') | |
go_link = f'https://go/{formatted_slug}' | |
if GoLink.exists(link): | |
return { | |
'go': go_link, | |
'url': link.get('url'), | |
'error': 'already exists' | |
} | |
payload = json.dumps({ | |
'link': { | |
'slug': formatted_slug, | |
'url': url | |
} | |
}) | |
cmd = ['beyond-curl', | |
'--silent', | |
'--request POST', | |
'--header "Content-Type: application/json"', | |
f"--data '{payload}'", | |
'"https://my.server.com/account/links.json"' | |
] | |
response = json.loads(GoLink._execute(cmd)) | |
response.setdefault('go', go_link) | |
return response | |
@staticmethod | |
def delete(slug) -> dict: | |
escaped_slug = GoLink.escape(slug) | |
cmd = ['beyond-curl', | |
'--silent', | |
'--request DELETE', | |
f'"https://my.server.com/account/{escaped_slug}.json"' | |
] | |
response = GoLink._execute(cmd) | |
delete_succeeded = not GoLink.exists(GoLink.get(escaped_slug)) | |
response = { | |
'slug': GoLink.unescape(escaped_slug), | |
'deleted': delete_succeeded | |
} | |
if not delete_succeeded: | |
response.setdefault('error', 'Permission denied') | |
return response | |
@staticmethod | |
def get(slug) -> dict: | |
escaped_slug = GoLink.escape(slug) | |
cmd = ['beyond-curl', | |
'--silent', | |
'--head', | |
f'"https://go.server.com/{escaped_slug}"' | |
] | |
lines = GoLink._execute(cmd).splitlines() | |
(_, status_code, _) = lines[0].split(' ', maxsplit=2) | |
# location is reliable on where it exists | |
location = None | |
for line in lines: | |
if line.startswith('Location'): | |
(_, location) = line.split(' ', maxsplit=2) | |
break | |
return { | |
'status_code': int(status_code), | |
'url': location if location != 'noopen' else None, | |
'slug': GoLink.unescape(escaped_slug) | |
} | |
@staticmethod | |
def mine(page=1, max_results=250) -> dict: | |
query = urlencode({ | |
'pagez': page, | |
'per_page': max_results | |
}) | |
cmd = ['beyond-curl', | |
'--silent', | |
f'"https://my.server.com/account/links.json?{query}"' | |
] | |
results = json.loads(GoLink._execute(cmd)) | |
# wrap in a dictionary for consistency with other functions | |
response = { | |
'results': results | |
} | |
if not results: | |
response.setdefault('error', f'You do not have any go/links {query}') | |
return response | |
@staticmethod | |
def query(query, page=1, sort_by='slug', direction='asc', per_page=100): | |
params = { | |
'query': query, | |
'page': page, | |
'sort': sort_by, | |
'dir': direction, | |
'per_page': per_page, | |
'table': 'true' | |
} | |
params = urlencode({ k: v for k, v in params.items() if v }) | |
cmd = ['beyond-curl', | |
'--silent', | |
f'"https://my.server.com/api/all.json?{params}"' | |
] | |
results = json.loads(GoLink._execute(cmd)) | |
# wrap in a dictionary for consistency with other functions | |
response = { | |
'results': results | |
} | |
if not results: | |
response.setdefault('error', f'No go/links matching {query}') | |
return response | |
@staticmethod | |
def rename(old_slug, new_slug, force=True) -> dict: | |
old_link = GoLink.get(old_slug) | |
if not GoLink.exists(old_link): | |
print(f'{old_slug} does not exist') | |
sys.exit(1) | |
new_link = GoLink.get(new_slug) | |
if GoLink.exists(new_link): | |
if force: | |
# gives a reasonable way to recover an overwritten URL | |
print(f'WARNING: Deleting existing slug {new_slug} => {new_link.get("url")}') | |
GoLink.delete(new_slug) | |
else: | |
return { | |
'error': f'Failed to rename {old_slug} => {new_slug}. Try again with --force' | |
} | |
response = GoLink.create(new_slug, old_link.get('url')) | |
if 'error' in response: | |
response.setdefault('error', f'Failed to rename {old_slug} => {new_slug}') | |
else: | |
GoLink.delete(old_slug) | |
return response | |
@staticmethod | |
def update(slug, url): | |
GoLink.validate(slug) | |
if GoLink.delete(slug): | |
return GoLink.create(slug, url) | |
return { | |
'error': f'Failed to update {url}, permission denied' | |
} | |
''' | |
Helper methods | |
''' | |
@staticmethod | |
def escape(slug) -> str: | |
# pasting in https://go/blah should be valid | |
matches = re.findall(r'(?:go\/)?([a-z0-9-\$]+)', slug, re.IGNORECASE) | |
if not matches: | |
return '' | |
extracted_slug = re.sub(r'\$', '\\$', matches[-1]) | |
# dashes must be ignored during creation and can be added | |
# arbitrarily post creation (e.g. mylink can be represented as | |
# m-y-l-i-n-k or my-link) | |
extracted_slug = re.sub(r'[^A-Za-z0-9\\$]', '', extracted_slug) | |
# not the best place to do this but convenient since all methods must escape their slugs | |
GoLink.validate(GoLink.unescape(extracted_slug)) | |
return extracted_slug | |
@staticmethod | |
def exists(link) -> bool: | |
return link.get('status_code') != 404 | |
@staticmethod | |
def unescape(slug) -> str: | |
'''Unescape slug to match what would be used/distributed''' | |
return re.sub(r'\\', '', slug) | |
@staticmethod | |
def validate(slug): | |
if not re.match(r'^[A-Za-z0-9\$]{1,40}$', slug): | |
raise Exception(f'Invalid {slug}, it must be at most 40 characters \ | |
long consisting of [A-Za-z0-9\$]') | |
@staticmethod | |
def _execute(cmd) -> object: | |
if isinstance(cmd, list): | |
cmd = ' '.join(cmd) | |
proc = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE, | |
stdout=subprocess.PIPE, universal_newlines=True) | |
(response, error) = proc.communicate() | |
if error: | |
print(error) | |
sys.exit(1) | |
return response | |
if __name__ == '__main__': | |
if len(sys.argv) and sys.argv[1] == 'help': | |
parser.print_help() | |
sys.exit(0) | |
args = parser.parse_args() | |
if args.subcommand is None: | |
parser.print_help() | |
else: | |
args.func(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment