Created
June 26, 2024 07:08
-
-
Save blackle/357be123c33c998e4e2b3de6b8f1ad0b to your computer and use it in GitHub Desktop.
wikipedia perennial sources changes RSS feed generator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
import requests | |
import wikitextparser as wtp | |
from difflib import SequenceMatcher | |
from datetime import datetime, timezone | |
import dateutil.relativedelta | |
import schedule | |
import http.server | |
import socketserver | |
import threading | |
import time | |
from feedgen.feed import FeedGenerator | |
from urllib.parse import urlparse | |
STATUSES = { | |
"gr": ("https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/Yes_Check_Circle.svg/20px-Yes_Check_Circle.svg.png", "Generally reliable"), | |
"nc": ("https://upload.wikimedia.org/wikipedia/commons/thumb/1/12/Achtung-orange.svg/20px-Achtung-orange.svg.png", "No consensus, unclear, or additional considerations apply"), | |
"gu": ("https://upload.wikimedia.org/wikipedia/commons/thumb/4/41/Argentina_-_NO_symbol.svg/20px-Argentina_-_NO_symbol.svg.png", "Generally unreliable"), | |
"d": ("https://upload.wikimedia.org/wikipedia/en/thumb/8/8b/Stop_hand.svg/20px-Stop_hand.svg.png", "Deprecated"), | |
"b=y": ("https://upload.wikimedia.org/wikipedia/commons/thumb/7/7c/X-circle.svg/20px-X-circle.svg.png", "Blacklisted"), | |
} | |
def editdistance(a, b): | |
s = SequenceMatcher(None, a, b) | |
return 1-s.ratio() | |
def distance_below_threshold(a, b): | |
return editdistance(str(a), str(b)) < .25 #arbitrary choice | |
def get_table_for_date(ymd): | |
WIKI_URL = f"https://en.wikipedia.org/w/api.php?action=query&prop=revisions&titles=Wikipedia:Reliable_sources/Perennial_sources&rvslots=*&rvprop=timestamp|ids|content&format=json&formatversion=2&rvstart={ymd}T00:00:00.000Z&rvlimit=1" | |
res = requests.get(WIKI_URL) | |
data = res.json() | |
rev = data['query']['pages'][0]['revisions'][0]['slots']['main'] | |
content = rev['content'] | |
parsed = wtp.parse(content) | |
sources = [s for s in parsed.sections if (s.title and "Sources" in s.title)][0] | |
table = sources.tables[0].data() | |
rows = [] | |
for row in table: | |
if row[0] == "Source": | |
continue | |
title = wtp.parse(row[0]).plain_text().strip() | |
status = wtp.parse(row[1]).templates[0].arguments | |
status = ",".join([arg.value for arg in status]) | |
desc = wtp.parse(row[4]).plain_text().strip() | |
rows.append((title, status, desc)) | |
return set(rows) | |
def rich_diff_for_span(start_date, end_date): | |
start = get_table_for_date(start_date) | |
end = get_table_for_date(end_date) | |
start_d = start - end | |
end_d = end - start | |
pairs = [] | |
found = set() | |
for s in start_d: | |
for e in end_d: | |
if (e[0] == s[0] or e[2] == s[2]) and e not in found: | |
pairs.append([s, e]) | |
found.add(s) | |
found.add(e) | |
break | |
start_d = start_d - found | |
end_d = end_d - found | |
found = set() | |
for s in start_d: | |
for e in end_d: | |
if distance_below_threshold(e, s) and e not in found: | |
pairs.append([s, e]) | |
found.add(s) | |
found.add(e) | |
start_d = start_d - found | |
end_d = end_d - found | |
pairs = sorted(pairs, key=lambda x: x[0]) | |
removed = sorted(list(start_d)) | |
added = sorted(list(end_d)) | |
def body_to_html(body): | |
return body.replace("\n\n", "</p><p>").replace("\n", "</p><p>") | |
def get_title_diff(a, b): | |
title = b[0] | |
if b[0] != a[0]: | |
title = f"<s>{a[0]}</s> {b[0]}" | |
return title | |
def format_status(a): | |
statuses = a[1].split(",") | |
statuses = [s for s in statuses if s in STATUSES] | |
imgs = " ".join([f"<img src='{STATUSES[s][0]}'/>" for s in statuses]) | |
msgs = ", ".join([STATUSES[s][1] for s in statuses]) | |
return f"<p>{imgs} {msgs}</p>" | |
def get_description_diff(a, b): | |
sq = SequenceMatcher(None, a[2], b[2]) | |
out = "" | |
for tag, i1, i2, j1, j2 in sq.get_opcodes(): | |
a_ = a[2][i1:i2] | |
b_ = b[2][j1:j2] | |
if tag == 'replace': | |
out += f'<s>{a_}</s><b>{b_}</b>' | |
if tag == 'delete': | |
out += f'<s>{a_}</s>' | |
if tag == 'insert': | |
out += f'<b>{b_}</b>' | |
if tag == 'equal': | |
out += f'<span style="opacity:0.7">{b_}</span>' | |
return f"<p>{out}</p>" | |
def get_status_diff(a, b): | |
if a[1] == b[1]: | |
return format_status(b) | |
return f"<s>{format_status(a)}</s>{format_status(b)}" | |
def format_pair(a, b): | |
title = get_title_diff(a, b) | |
return f"<h3>{title}</h3>\n{get_status_diff(a,b)}\n{get_description_diff(a, b)}\n<hr/>\n" | |
def format_entry(a): | |
return f"<h3>{a[0]}</h3>\n{format_status(a)}\n<p>{body_to_html(a[2])}</p>\n<hr/>\n" | |
title = f"Changes from {start_date} to {end_date}" | |
out = "" | |
if pairs: | |
out += "<h2>Changed</h2>\n" | |
for a, b in pairs: | |
out += format_pair(a, b) | |
if removed: | |
out += "<h2>Removed</h2>\n" | |
for a in removed: | |
out += f"<s>{format_entry(a)}</s>" | |
if added: | |
out += "<h2>Added</h2>\n" | |
for a in added: | |
out += format_entry(a) | |
return (title, out) | |
report = (datetime.today().replace(day=1), "", "") | |
def update_report(): | |
global report | |
end_date = datetime.today().replace(day=1) | |
start_date = end_date - dateutil.relativedelta.relativedelta(months=1) | |
report_title, report_body = rich_diff_for_span(start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d')) | |
date = start_date.replace(tzinfo=timezone.utc) | |
report = (date, report_title, report_body) | |
def genfeed(): | |
global report | |
date, report_title, report_body = report | |
fg = FeedGenerator() | |
fg.id(f'https://lethargic.talkative.fish:9187/') | |
fg.title(f'Wikipedia perennial sources updates') | |
fg.author( {'name':'Wikipedia Editors'} ) | |
fg.language('en') | |
fe = fg.add_entry() | |
fe.id(date.strftime('%Y-%m-%d')) | |
fe.title(report_title) | |
fe.content(report_body,type="xhtml") | |
fe.published(date) | |
return fg | |
class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler): | |
def do_GET(self): | |
parsedpath = urlparse(self.path) | |
if parsedpath.path == '/': | |
fg = genfeed() | |
self.send_response(200) | |
self.send_header("Content-type", "application/atom+xml; charset=utf-8") | |
self.end_headers() | |
fg.atom_file(self.wfile,pretty=True) | |
return | |
return None | |
class ThreadingSimpleServer(socketserver.ThreadingMixIn, http.server.HTTPServer): | |
pass | |
if __name__ == "__main__": | |
update_report() | |
PORT = 9187 | |
print("port:",PORT) | |
socketserver.TCPServer.allow_reuse_address = True | |
server = ThreadingSimpleServer(("", PORT), MyHttpRequestHandler) | |
schedule.every().day.do(update_report) | |
def schedule_thread(): | |
while True: | |
schedule.run_pending() | |
time.sleep(1) | |
thread = threading.Thread(target=schedule_thread) | |
thread.start() | |
server.serve_forever() | |
print(report) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment