Created
November 2, 2022 13:30
-
-
Save joshmoore/ea8086cc706572324283cf2c49bf1694 to your computer and use it in GitHub Desktop.
Simple CLI for querying image.sc for certain tags
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from argparse import ( | |
ArgumentParser, | |
FileType, | |
) | |
from collections import ( | |
defaultdict, | |
) | |
from datetime import ( | |
datetime, | |
timedelta, | |
) | |
import re | |
import requests | |
import sys | |
from scraper_utils import StdoutOrAppend | |
PY3_TIMEFORMAT = '%Y-%m-%dT%H:%M:%S.%f%z' | |
PY2_TIMEFORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' | |
try: | |
from datetime import timezone | |
utctzinfo = timezone(timedelta(hours=0)) | |
timeformat = PY3_TIMEFORMAT | |
except ImportError: | |
print('WARNING: Python 2 ignoring timezone', file=sys.stderr) | |
utctzinfo = None | |
timeformat = PY2_TIMEFORMAT | |
# This is a list of tags that we expect to be present | |
OME_TAGS = set(["bio-formats", | |
"idr", | |
"ome", "ome-tiff", "ome-xml", | |
"omero", "omero-figure", "omero-iviewer", "omero-server", | |
"omero-web"]) | |
DISCOURSE = 'https://forum.image.sc' | |
def discourse(urlpath): | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36", | |
"Accept": "application/json" | |
} | |
url = DISCOURSE + urlpath | |
print('Fetching ' + url, file=sys.stderr) | |
r = requests.get(url, headers=headers) | |
r.raise_for_status() | |
return r.json() | |
parser = ArgumentParser() | |
parser.add_argument("--stale", default=1, type=int, help="Number of days between now and the last post. Information displayed as part of the html link") | |
parser.add_argument("--days", "-d", default=4, type=int) | |
parser.add_argument("--ignore", "-i", default="ignore.txt", type=FileType("r")) | |
parser.add_argument("--new", "-n", action="store_true", help="Only unreplied posts") | |
parser.add_argument("--solved", "-S", choices=("true", "false", "ignore"), | |
default="false") | |
parser.add_argument("--output", "-o", default="-", type=StdoutOrAppend()) | |
parser.add_argument("--preview", "-p", choices=("solved", "all"), | |
default="solved") | |
parser.add_argument("--style", "-s", choices=("html", "urls", "tsv"), default="html", | |
help="Format to print results in") | |
parser.add_argument("--missing", "-m", action="store_true", | |
help="Look for topics which are missing the tag") | |
parser.add_argument("--tags", "-t", action="append", default=[], type=str, | |
help="List of tags to search ('OR')") | |
parser.add_argument("--skip-group", default="ome", help="Skip topics where the last reply is from a member of this group, set to '' to disable") | |
ns = parser.parse_args() | |
tags = set() | |
tags.update(ns.tags) | |
if not tags: | |
# u'tags': [{u'count': 1, u'text': u'introduction', u'id': u'introduction' | |
data = discourse('/tags') | |
ts = data.pop('tags') | |
extras = data.pop('extras') | |
assert not data | |
categories = extras.pop('categories') | |
assert not extras | |
for t in ts: | |
text = t['text'] | |
for prefix in ("bioformats", "ome", "idr"): | |
if text.replace("-", "").startswith(prefix): | |
tags.add(text) | |
assert OME_TAGS.issubset(tags) | |
print('Discovered tags: {}\n'.format(", ".join(tags)), file=sys.stderr) | |
old = datetime.utcnow().replace(tzinfo=utctzinfo) - timedelta(days=ns.days) | |
stale = ns.stale | |
ometeam = set() | |
if ns.skip_group: | |
ometeamr = discourse('/groups/{}/members?limit=50'.format(ns.skip_group)) | |
ometeam = set(u['username'] for u in ometeamr['members']) | |
topics = [] | |
topics_url = '/latest' | |
fetch = True | |
while fetch: | |
r = discourse(topics_url) | |
for t in r['topic_list']['topics']: | |
try: | |
t['updated'] = max( | |
datetime.strptime(t['last_posted_at'], timeformat), | |
datetime.strptime(t['bumped_at'], timeformat)) | |
except ValueError: | |
print("Using fallback timezone") | |
t['updated'] = max( | |
datetime.strptime(t['last_posted_at'], PY2_TIMEFORMAT), | |
datetime.strptime(t['bumped_at'], PY2_TIMEFORMAT)) | |
delta = (datetime.now().date() - t['updated'].date()).days | |
t['days'] = "" | |
if delta > stale: | |
t['days'] = "Last updated: %s days ago" % delta | |
alltags = [x.replace("omero-", "") for x in t['tags']] | |
alltags = [x.replace("bio-formats", "bf") for x in alltags] | |
t['alltags'] = ','.join(alltags) | |
t['link'] = DISCOURSE + '/t/{id}/{highest_post_number}'.format(**t) | |
try: | |
newer = t['updated'] > old | |
except TypeError: | |
newer = t['updated'] > old.replace(tzinfo=None) | |
if newer: | |
topics.append(t) | |
# Pinned posts will be first but may be older than the cutoff | |
elif not t['pinned']: | |
fetch = False | |
break | |
topics_url = r['topic_list']['more_topics_url'] | |
print('Fetched {} topics from {} to {}\n'.format( | |
len(topics), topics[-1]['updated'], topics[0]['updated']), file=sys.stderr) | |
try: | |
with ns.ignore as f: | |
ignore_links = set(f.read().split()) | |
except IOError: | |
ignore_links = set() | |
def remove_formatting(txt, truncate=0): | |
s = re.sub('\<[^>]*\>', ' ', txt) | |
s = re.sub(r'\s+', ' ', s) | |
s = s.strip() | |
if truncate and len(s) > truncate: | |
s = s[:(truncate - 1)] + '…' | |
return s | |
def get_last_post(t): | |
thread = discourse('/t/{}'.format(t['id'])) | |
lastpost = thread['post_stream']['posts'][-1] | |
return lastpost | |
def keep_or_skip_post(t): | |
""" | |
Check whether the topic should be shown (reason: None) or skipped (reason: str) | |
Return: (reason, thread) | |
""" | |
reasons = ( | |
(t['closed'], 'closed'), | |
(t['archived'], 'archived'), | |
(not (set(t['tags']) & tags) and not ns.missing, 'no-tag'), | |
(ns.solved == "true" and not t['has_accepted_answer'], 'solved'), | |
(ns.new and t['highest_post_number'] > 1, '???'), | |
(t['last_poster_username'] in ometeam, 'responded'), | |
(t['link'] in ignore_links, 'ignored'), | |
) | |
for check, reason in reasons: | |
if check: | |
return reason, t | |
lastpost = None | |
t['preview'] = '' | |
if ns.preview == 'all': | |
lastpost = get_last_post(t) | |
t['preview'] = "Preview: " + remove_formatting(lastpost['cooked'], 80) | |
# Accepted answers: check whether there's a post after the accepted one | |
# since this might be a follow-up problem | |
if ns.solved == "false" and t['has_accepted_answer']: | |
if not lastpost: | |
lastpost = get_last_post(t) | |
if lastpost['accepted_answer']: | |
return 'accepted', t | |
t['preview'] = 'Solved? ' + remove_formatting(lastpost['cooked'], 80) | |
return None, t | |
if ns.style == "urls": | |
format = '{link}' | |
elif ns.style == "tsv": | |
format = "{n}\t{updated}\t{last_poster_username}\t{link}\t{title}" | |
else: | |
format = '<li>[???] <a href="{link}">[sc:{alltags}] {title} ({last_poster_username})</a> {preview}  {days}</li>' | |
standup_forums = [] | |
count = 0 | |
skipped = defaultdict(int) | |
for t in topics: | |
reason, t = keep_or_skip_post(t) | |
if reason: | |
skipped[reason] += 1 | |
else: | |
standup_forums.append(format.format(n=count, **t)) | |
skipped = ["{}: {}".format(k, v) for k, v in skipped.items()] | |
skipped = ", ".join(skipped) | |
print('Skipped: {}\n'.format(skipped), file=sys.stderr) | |
with ns.output as f: | |
f.write('\n'.join(standup_forums)) | |
if standup_forums: | |
f.write('\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment