Created
April 15, 2021 02:44
-
-
Save alanhuang122/187b5e17e87e174d7a80c8e2cc27760a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from datetime import datetime | |
import json | |
import mwclient | |
import mysql.connector | |
import netrc | |
import parser | |
import pypandoc | |
import random | |
import re | |
import requests | |
import sys | |
from urllib.parse import unquote, urlparse, parse_qs | |
import warnings | |
warnings.filterwarnings("ignore", category=DeprecationWarning) | |
# This script converts FANDOM comments, retrieved from the services.fandom.com API, to records in the tables used by Extension:Comments. | |
# Flow: | |
# Get all comments | |
# Insert all top-level comments and track all IDs using cursor.lastrowid | |
# Insert all second-level comments, using said IDs | |
connection = mysql.connector.connect(unix_socket='/run/mysqld/mysqld.sock', | |
user = 'wikiuser', | |
password = ';rEnXT>eMX6<[=;78zuG[>h_*wm^#uQhh+!$p)Z2', | |
database = 'fallenlondonwiki') | |
cursor = connection.cursor(buffered=True) | |
login = netrc.netrc().authenticators('ebzwiki') | |
site = mwclient.Site('backend.fallenlondon.wiki') | |
site.login(login[0], login[2]) | |
api = 'https://fallenlondon.fandom.com/wikia.php' | |
discussions_url = 'https://services.fandom.com/discussion' | |
path = '/189869/posts?viewableOnly=true&limit=100&page=0' | |
pages = [] | |
users = [] | |
posts = {} | |
talks = {} | |
forum = {} | |
print('Retrieving data from Discussions...') | |
r = requests.get(discussions_url + path) | |
last_page = r.json()['_links']['last'][0]['href'].rsplit('=', 1)[1] | |
while path: | |
print(f'page {path.rsplit("=", 1)[1]}/{last_page}') | |
r = requests.get(discussions_url + path) | |
if not r.status_code == 200: | |
print(f'error: received non-ok status code {r.status_code} for url {discussions_url + path}') | |
sys.exit(1) | |
page = r.json() | |
pages.append(page) | |
users += page['_embedded']['contributors'][0]['userInfo'] | |
posts.update({post['id']: post for post in page['_embedded']['doc:posts'] if post['_embedded']['thread'][0]['containerType'] == 'ARTICLE_COMMENT'}) | |
talks.update({post['id']: post for post in page['_embedded']['doc:posts'] if post['_embedded']['thread'][0]['containerType'] == 'WALL'}) | |
forum.update({post['id']: post for post in page['_embedded']['doc:posts'] if post['_embedded']['thread'][0]['containerType'] == 'FORUM'}) | |
try: | |
path = page['_links']['next'][0]['href'] | |
except KeyError: | |
path = None | |
#with open('posts.json', 'w') as f: | |
# json.dump(posts, f) | |
#with open('talks.json', 'w') as f: | |
# json.dump(talks, f) | |
print('Building comment trees...') | |
# Top level comment IDs -> reply IDs | |
comment_trees = {post['id']: [] for post in posts.values() if not post['isReply']} | |
for post in posts.values(): | |
if not post['isReply']: | |
continue | |
parent_id = post['_embedded']['thread'][0]['firstPost']['id'] | |
try: | |
comment_trees[parent_id].append(post['id']) | |
except KeyError: | |
print(f'Error: parent post for post with ID {post["id"]} and parent ID {parent_id} not found!') | |
print('Comment trees built. Sorting...') | |
# Sort replies by comment date ascending | |
comment_trees = {k: sorted(v, key=lambda x: posts[x]['creationDate']['epochSecond']) for k,v in comment_trees.items()} | |
print('Retrieving page titles from API...') | |
# forumIds; need to be translated into page IDs | |
forum_ids = list({post['forumId'] for post in posts.values()}) | |
chunks = [forum_ids[i:i+1000] for i in range(0, len(forum_ids), 1000)] | |
# maps forums to wiki page objects returned by the API | |
forum_title_map = {} | |
for chunk in chunks: | |
r = requests.get(api, params={'controller': 'FeedsAndPosts', 'method': 'getArticleNamesAndUsernames', 'stablePageIds': ','.join(chunk), 'format': 'json'}) | |
chunk = {k: {l:w.replace('User blog:', '') for l,w in v.items()} for k,v in r.json()['articleNames'].items()} | |
forum_title_map.update(chunk) | |
# Now that we have a map of forum IDs to page names/URLs, now we need to lookup page IDs | |
print('Querying database for page IDs...') | |
cursor.execute("select page_id, ns_name, page_title from page join fl_namespaces on page_namespace = ns_id where page_title in ({})".format(", ".join(['%s']*len(forum_title_map))), tuple(value['title'].replace(' ', '_') for value in forum_title_map.values())) | |
collation = {} | |
for page_id, ns_name, page_title in cursor: | |
try: | |
collation[page_title.replace('_', ' ')].append((page_id, ns_name, page_title)) | |
except KeyError: | |
collation[page_title.replace('_', ' ')] = [(page_id, ns_name, page_title)] | |
forum_page_map = {} | |
for forum_id, page_info in forum_title_map.items(): | |
if (title := page_info['title']) not in collation: | |
print(f'{title} not in list of pages - was it deleted?') | |
continue | |
for page in collation[title]: | |
if not page[1] and f'/wiki/{page[2].replace(" ", "_")}' == unquote(page_info['relativeUrl']): | |
forum_page_map[forum_id] = page[0] | |
elif page[1] and f'/wiki/{page[1]}:{page[2].replace(" ", "_")}' == unquote(page_info['relativeUrl']): | |
forum_page_map[forum_id] = page[0] | |
elif page[1] and page[1] == 'Blog' and f'/wiki/User_blog:{page[2].replace(" ", "_")}' == unquote(page_info['relativeUrl']): | |
forum_page_map[forum_id] = page[0] | |
if forum_id not in forum_page_map: | |
print(f'No match was found for page {page_info} in database results: {collation[title]}') | |
print('The following pages were not found in the database: ', [page_info for forum_id, page_info in forum_title_map.items() if forum_id not in forum_page_map]) | |
# Now we have a map of forum IDs to page IDs. We can now begin populating data. | |
print('Map of forum IDs to page IDs constructed.') | |
authors = {(int(post['creatorId']), name if (name := post['createdBy']['name']) else post['creatorIp']) for post in posts.values()} | |
user_ids = set() | |
anonymous_ips = set() | |
for author in authors: | |
if author[0]: | |
user_ids.add(author) | |
else: | |
anonymous_ips.add(author[1]) | |
users = {} | |
anon_actors = {} | |
users[48516131] = (27162, 'Uncool75') | |
cursor.execute("select * from actor") | |
for actor_id, actor_user, actor_name in cursor: | |
if actor_user: | |
users[actor_user] = (actor_id, actor_name) | |
else: | |
anon_actors[actor_name] = actor_id | |
# Sanity check - ensure that all names match for all matched user IDs | |
if any(mismatches := [user_id for user_id in user_ids if user_id[0] in users and user_id[1] != users[user_id[0]][1]]): | |
print('Some mismatches between db users and user_id') | |
print(mismatches) | |
# user_id, user_name, user_real_name, user_password, user_newpassword, user_email, user_touched, user_token | |
missing_users = [(*user_id, '', '', '', '', int(datetime.utcnow().strftime('%Y%m%d%H%M%S')), '') for user_id in user_ids if user_id[0] not in users] | |
missing_user_actors = [user_id for user_id in user_ids if user_id[0] not in users] | |
# actor_name only | |
missing_actors = [(modified_ip,) for ip in anonymous_ips if (modified_ip := ip.strip('/').upper()) not in anon_actors] | |
cursor.executemany('insert ignore into user (user_id, user_name, user_real_name, user_password, user_newpassword, user_email, user_touched, user_token) values (%s,%s,%s,%s,%s,%s,%s,%s)', missing_users) | |
cursor.executemany('insert ignore into actor (actor_name) values (%s)', missing_actors) | |
cursor.executemany('insert ignore into actor (actor_user, actor_name) values (%s, %s)', missing_user_actors) | |
# Need to re-query to map properly | |
cursor.execute("select * from actor") | |
for actor_id, actor_user, actor_name in cursor: | |
if actor_user: | |
users[actor_user] = (actor_id, actor_name) | |
else: | |
anon_actors[actor_name] = actor_id | |
users_not_inserted = [user_id for user_id in user_ids if user_id[0] not in users] | |
ips_not_inserted = [ip for ip in anonymous_ips if ip.strip('/').upper() not in anon_actors] | |
if any(users_not_inserted) or any(ips_not_inserted): | |
print('Error: some users or IPs were not inserted into the actor table:') | |
print(users_not_inserted) | |
print(ips_not_inserted) | |
thread_user_timestamp_map = defaultdict(lambda:defaultdict(defaultdict)) | |
for post_id, post_data in posts.items(): | |
creator = post_data['creatorIp'] or post_data['createdBy']['id'] | |
thread_user_timestamp_map[post_data['forumId']][creator][post_data['creationDate']['epochSecond']] = post_id | |
# Need to map existing comment pages to Discussions posts - how? Match title and timestamp and UID/IP | |
forum_user_timestamp__post_id_map = defaultdict(lambda: defaultdict(defaultdict)) | |
for post_id, post in posts.items(): | |
try: | |
page_title = forum_title_map[post['forumId']] | |
except KeyError: # Comment on deleted page | |
continue | |
author = post['creatorIp'].lstrip('/').upper() or post['creatorId'] | |
timestamp = datetime.fromtimestamp(post['creationDate']['epochSecond']).strftime('%Y%m%d%H%M%S') | |
forum_user_timestamp__post_id_map[page_title['title']][author][timestamp] = post_id | |
cursor.execute('select page_id, page_title from page join slots on page_latest = slot_revision_id join content on slot_content_id = content_id join text on trim(leading "tt:" from content_address) = old_id where page_namespace = 844 and page_title like "%%@comment%%" and old_text not like "%%#REDIRECT %%"') | |
talk_id_map = {page_title:page_id for page_id, page_title in cursor} | |
talk_parents = {} | |
talk_replies = {} | |
talk_trees = defaultdict(list) | |
post_page_map = {} | |
pages_missing_from_posts = [] | |
problematic_uid_map = {'27182508': '26355824', | |
'28002690': '24828557'} | |
def closeness(title): | |
parts = title.split('/@') | |
parent_page = parts[0].replace('_', ' ') | |
comment_info = parts[-1] | |
_, user_or_ip, timestamp = comment_info.split('-') | |
try: | |
leaf_map = forum_user_timestamp__post_id_map[parent_page][user_or_ip] | |
ts = int(timestamp) | |
if len(leaf_map) == 0: | |
return None | |
min_value = min([abs(int(t) - ts) for t in leaf_map]) | |
for candidate in leaf_map: | |
if abs(int(candidate) - ts) == min_value: | |
return leaf_map[candidate] | |
except KeyError: | |
return None | |
return None | |
for page_title, page_id in talk_id_map.items(): | |
if page_title.count('@comment') == 1: | |
talk_parents[page_title] = page_id | |
elif page_title.count('@comment') == 2: | |
talk_replies[page_title] = page_id | |
parent = page_title.rsplit('/@comment', 1)[0] | |
talk_trees[parent].append(page_title) | |
else: | |
print(f'WARN: page {page_title} had a weird number of "@comment"s') | |
parts = page_title.split('/@') | |
parent_page = parts[0].replace('_', ' ') | |
comment_info = parts[-1] | |
_, user_or_ip, timestamp = comment_info.split('-') | |
try: | |
post_page_map[forum_user_timestamp__post_id_map[parent_page][user_or_ip][timestamp]] = page_id | |
except KeyError: | |
canonical_title = page_title | |
if not any((user_timestamp__post_id_map := forum_user_timestamp__post_id_map[parent_page]).values()): | |
r = requests.get(f'https://fallenlondon.fandom.com/wiki/{parts[0]}') | |
rd_target = r.url.split('/wiki/')[1] | |
canonical_title = canonical_title.replace(parts[0], rd_target) | |
else: | |
if (replacement := problematic_uid_map.get(user_or_ip)): | |
canonical_title = canonical_title.replace(user_or_ip, replacement) | |
elif not any(user_timestamp__post_id_map[user_or_ip].values()): | |
cursor.execute('select user_id from user where user_name = %s', (user_or_ip,)) | |
if cursor.rowcount: | |
(user_id,) = cursor.fetchone() | |
canonical_title = canonical_title.replace(user_or_ip, str(user_id)) | |
if (pid := closeness(canonical_title)): | |
post_page_map[pid] = page_id | |
else: | |
r = requests.get(f'https://fallenlondon.fandom.com/wiki/Talk:{canonical_title}') | |
params = parse_qs(urlparse(r.url).query) | |
if (reply := params.get('replyId')): | |
if reply[0] in posts: | |
post_page_map[reply[0]] = page_id | |
else: | |
print(f'post ID {reply[0]} for comment {canonical_title} not found in discussions!') | |
elif (comment := params.get('commentId')): | |
if (parent := [post for post in posts.values() if post['threadId'] == comment[0] and not post['isReply']]): | |
post_page_map[parent[0]['id']] = page_id | |
else: | |
print(f'post ID {comment[0]} for comment {canonical_title} not found in discussions!') | |
else: | |
pages_missing_from_posts.append((page_title, canonical_title)) | |
if pages_missing_from_posts: | |
print('The following comment talk pages were not found in Discussions:') | |
for page in pages_missing_from_posts: | |
print(page) | |
posts_need_conversion = {pid:p for pid,p in posts.items() if pid not in post_page_map} | |
converted_posts = defaultdict(dict) | |
with open('html_wikitext_map_v3.json') as f: | |
html_wikitext_map = json.load(f) | |
users[34441176] = (27162, 'Uncool75') | |
for post_id, post in posts_need_conversion.items(): | |
try: | |
converted_posts[post_id]['page'] = forum_page_map[post['forumId']] | |
except KeyError: | |
r = requests.get(api, params={'controller': 'FeedsAndPosts', 'method': 'getArticleNamesAndUsernames', 'stablePageIds': post['forumId'], 'format': 'json'}) | |
print(f'forumId {post["forumId"]} corresponding to page {r.json()["articleNames"][post["forumId"]]["title"]} was not found') | |
converted_posts[post_id]['actor'] = anon_actors[ip.strip('/').upper()] if (ip := post['creatorIp']) else users[int(post['creatorId'])][0] | |
converted_posts[post_id]['timestamp'] = datetime.fromtimestamp(int(post['creationDate']['epochSecond'])).strftime('%Y%m%d%H%M%S') | |
if (content := post['renderedContent']): | |
# use map for most comments and pandoc the stragglers | |
if (converted := html_wikitext_map.get(content)): | |
converted_posts[post_id]['text'] = converted.strip() | |
else: | |
converted_posts[post_id]['text'] = (converted := pypandoc.convert_text(content, 'mediawiki', format='html').strip()) | |
html_wikitext_map[content] = converted | |
# Handle external image links | |
converted_posts[post_id]['text'] = re.sub(r'\[\[File:.+?/([^/]+?\.png).+?]]', r'[[File:\1]]', converted_posts[post_id]['text']) | |
elif (content := post['jsonModel']): | |
if (converted := html_wikitext_map.get(content)): | |
converted_posts[post_id]['text'] = converted.strip() | |
else: | |
dp = parser.DiscussionsFromHellParser(post) | |
converted_posts[post_id]['text'] = (converted := pypandoc.convert_text(dp.parse(), 'mediawiki', format='markdown').strip()) | |
html_wikitext_map[content] = converted | |
else: | |
print(f'Neither renderedContent nor jsonModel was present for this post: {post}') | |
converted_posts[post_id]['text'] = converted_posts[post_id]['text'].replace('[[wiki/', '[[') | |
#| Field | Type | Null | Key | Default | Extra | | |
#+--------------------+------------------+------+-----+-------------+-------+ | |
#| cst_page_id | int(10) unsigned | NO | PRI | NULL | | | |
#| cst_assoc_page_id | int(10) unsigned | YES | | NULL | | | |
#| cst_parent_page_id | int(10) unsigned | YES | | NULL | | | |
#| cst_comment_title | varbinary(255) | YES | | NULL | | | |
#| cst_id | varchar(50) | YES | | cs-comments | | | |
parent_records = [] | |
reply_records = [] | |
revision_records = [] | |
revactor_records = [] | |
for comment_id, replies in comment_trees.items(): | |
post = posts[comment_id] | |
if (cst_page_id := post_page_map.get(comment_id)): # Page with comment data | |
try: | |
cst_assoc_page_id = forum_page_map[post['forumId']] | |
except KeyError: | |
r = requests.get(api, params={'controller': 'FeedsAndPosts', 'method': 'getArticleNamesAndUsernames', 'stablePageIds': post['forumId'], 'format': 'json'}) | |
print(f'forumId {post["forumId"]} corresponding to page {r.json()["articleNames"][post["forumId"]]["title"]} was not found') | |
continue | |
pass | |
parent_records.append((cst_page_id, cst_assoc_page_id)) | |
else: | |
# Need to edit, reassign timestamp, author, etc. | |
post_data = converted_posts[comment_id] | |
if not all([key in post_data for key in ['page', 'actor', 'timestamp', 'text']]): | |
print(f'{forum_title_map[post["forumId"]]} not in list of pages - was it deleted?') | |
continue | |
cst_assoc_page_id = post_data['page'] | |
actor = post_data['actor'] | |
timestamp = post_data['timestamp'] | |
text = post_data['text'] | |
random_title = f'{random.randrange(16**32):032x}' | |
while (page := site.pages[f'CommentStreams:{random_title}']).exists: | |
random_title = f'{random.randrange(16**32):032x}' | |
if not site.logged_in: | |
site.login(login[0], login[2]) | |
while True: | |
try: | |
ret = page.save(text, 'Migrating comments') | |
break; | |
except requests.exceptions.ReadTimeout: | |
pass | |
cst_page_id = ret['pageid'] | |
rev_id = ret['newrevid'] | |
revision_records.append((timestamp, rev_id)) | |
revactor_records.append((timestamp, actor, rev_id)) | |
parent_records.append((cst_page_id, cst_assoc_page_id)) | |
for reply_id in replies: | |
reply = posts[reply_id] | |
if (reply_cst_page_id := post_page_map.get(reply_id)): # Page with comment data | |
try: | |
reply_cst_assoc_page_id = forum_page_map[reply['forumId']] | |
except KeyError: | |
r = requests.get(api, params={'controller': 'FeedsAndPosts', 'method': 'getArticleNamesAndUsernames', 'stablePageIds': reply['forumId'], 'format': 'json'}) | |
print(f'forumId {reply["forumId"]} corresponding to page {r.json()["articleNames"][reply["forumId"]]["title"]} was not found') | |
continue | |
pass | |
cursor.execute('insert into cs_comment_data (cst_page_id, cst_assoc_page_id, cst_parent_page_id, cst_id) values (%s, %s, %s, 0)', (reply_cst_page_id, reply_cst_assoc_page_id, cst_page_id)) | |
else: | |
# Need to edit, reassign timestamp, author, etc. | |
post_data = converted_posts[reply_id] | |
if not all([key in post_data for key in ['page', 'actor', 'timestamp', 'text']]): | |
print(f'{forum_title_map[reply["forumId"]]} not in list of pages - was it deleted?') | |
continue | |
reply_cst_assoc_page_id = post_data['page'] | |
actor = post_data['actor'] | |
timestamp = post_data['timestamp'] | |
text = post_data['text'] | |
random_title = f'{random.randrange(16**32):032x}' | |
while (page := site.pages[f'CommentStreams:{random_title}']).exists: | |
random_title = f'{random.randrange(16**32):032x}' | |
if not site.logged_in: | |
site.login(login[0], login[2]) | |
while True: | |
try: | |
ret = page.save(text, 'Migrating comments') | |
break; | |
except requests.exceptions.ReadTimeout: | |
pass | |
reply_cst_page_id = ret['pageid'] | |
rev_id = ret['newrevid'] | |
revision_records.append((timestamp, rev_id)) | |
revactor_records.append((timestamp, actor, rev_id)) | |
reply_records.append((reply_cst_page_id, reply_cst_assoc_page_id, cst_page_id)) | |
cursor.executemany('insert into cs_comment_data (cst_page_id, cst_assoc_page_id) values (%s, %s)', parent_records) | |
cursor.executemany('update revision set rev_timestamp = %s where rev_id = %s', revision_records) | |
cursor.executemany('update revision_actor_temp set revactor_timestamp = %s, revactor_actor = %s where revactor_rev = %s', revactor_records) | |
cursor.executemany('insert into cs_comment_data (cst_page_id, cst_assoc_page_id, cst_parent_page_id, cst_id) values (%s, %s, %s, 0)', reply_records) | |
connection.commit() | |
# Now process talk posts | |
print('building talk post threads...') | |
# Build reply trees | |
talk_threads = {k: [] for k,v in talks.items() if not v['isReply']} | |
for talk_id, talk_post in talks.items(): | |
if talk_post['isReply']: | |
parent_id = talk_post['_embedded']['thread'][0]['firstPost']['id'] | |
talk_threads[parent_id].append(talk_id) | |
talk_threads = {k: sorted(v, key=lambda x: talks[x]['creationDate']['epochSecond']) for k,v in talk_threads.items()} | |
print('building talk page threads...') | |
talk_page_threads = defaultdict(list) | |
for talk_id in talk_threads: | |
talk_post = talks[talk_id] | |
# We already know that all keys are first-level posts | |
username = talk_post['forumName'].replace(' Message Wall', '').replace('_', ' ') | |
talk_page_threads[username].append(talk_id) | |
talk_page_threads = {k: sorted(v, key=lambda x: talks[x]['creationDate']['epochSecond']) for k,v in talk_page_threads.items()} | |
# Need to ensure that all users for which there are talk pages exist | |
target_usernames = set(talk['forumName'].replace(' Message Wall', '').replace('_', ' ') for talk in talks.values()) | |
# Should ensure that all users (not IPs) who have messaged exist in the db | |
source_usernames = set(name for talk in talks.values() if (name := talk['createdBy']['name'])) | |
cursor.execute("select user_name from user") | |
all_db_users = set(name for (name,) in cursor) | |
missing_targets = target_usernames - all_db_users | |
missing_sources = source_usernames - all_db_users | |
if missing_targets: | |
print(f'the following users who have message walls are missing - WILL SKIP: {missing_targets}') | |
if missing_sources: | |
print(f'the following users who have sent messages are missing: {missing_sources}') | |
# in this case, need to add to db using user ID provided | |
authors = {(int(post['creatorId']), name if (name := post['createdBy']['name']) else post['creatorIp']) for post in talks.values()} | |
# user_id, user_name, user_real_name, user_password, user_newpassword, user_email, user_touched, user_token | |
missing_users = [(*user_id, '', '', '', '', int(datetime.utcnow().strftime('%Y%m%d%H%M%S')), '') for user_id in authors if user_id[1] in missing_sources] | |
cursor.executemany('insert ignore into user (user_id, user_name, user_real_name, user_password, user_newpassword, user_email, user_touched, user_token) values (%s,%s,%s,%s,%s,%s,%s,%s)', missing_users) | |
print('converting talk posts to wikitext...') | |
talk_posts = {} | |
for talk_id, talk in talks.items(): | |
if (content := talk['renderedContent']): | |
# use map for most comments and pandoc the stragglers | |
if (converted := html_wikitext_map.get(content)): | |
talk_posts[talk_id] = converted | |
else: | |
talk_posts[talk_id] = pypandoc.convert_text(content, 'mediawiki', format='html') | |
# Handle external image links | |
talk_posts[talk_id] = re.sub(r'\[\[File:.+?/([^/]+?\.png).+?]]', r'[[File:\1]]', talk_posts[talk_id]) | |
elif (content := talk['jsonModel']): | |
if (converted := html_wikitext_map.get(content)): | |
talk_posts[talk_id] = converted | |
else: | |
dp = parser.DiscussionsFromHellParser(talk) | |
talk_posts[talk_id] = pypandoc.convert_text(dp.parse(), 'mediawiki', format='markdown') | |
else: | |
print(f'Neither renderedContent nor jsonModel was present for this talk: {talk}') | |
talk_posts[talk_id] = talk_posts[talk_id].replace('[[wiki/', '[[') | |
print('editing talk pages...') | |
for user, threads in talk_page_threads.items(): | |
talk_page = site.pages[f'User talk:{user}'] | |
page_text = talk_page.text() | |
if not talk_page.exists: | |
page_text = 'This is the default talk page text.' | |
for thread in threads: | |
post = talks[thread] | |
replies = talk_threads[thread] | |
title = (post['title'] or '(no title)').strip() | |
post_text = talk_posts[thread].replace('\n', '<br>') | |
post_text = re.sub(r'{{(.+?)}}', r'<nowiki>{{\1}}</nowiki>', post_text) | |
post_text = re.sub(r'\[\[Category:(.+?)\]\]', r'[[:Category:\1]]', post_text) | |
timestamp = datetime.fromtimestamp(post['creationDate']['epochSecond']).strftime('%H:%M, %B %-d, %Y (UTC)') | |
if int(post['creatorId']) == 0: | |
# Anon | |
ip = post['creatorIp'].strip('/').upper() | |
signature = f'[[Special:Contributions/{ip}|{ip}]] {timestamp}' | |
else: | |
# User | |
username = post['createdBy']['name'] | |
signature = f'[[User:{username}|{username}]] ([[User talk:{username}|talk]]) {timestamp}' | |
page_text += f'\n\n=={title}==\n{post_text} {signature if signature not in post_text else ""}' | |
for reply in replies: | |
post = talks[reply] | |
title = (post['title'] or '(no title)').strip() | |
post_text = talk_posts[reply].replace('\n', '<br>') | |
post_text = re.sub(r'{{(.+?)}}', r'<nowiki>{{\1}}</nowiki>', post_text) | |
post_text = re.sub(r'\[\[Category:(.+?)\]\]', r'[[:Category:\1]]', post_text) | |
timestamp = datetime.fromtimestamp(post['creationDate']['epochSecond']).strftime('%H:%M, %B %-d, %Y (UTC)') | |
if int(post['creatorId']) == 0: | |
# Anon | |
ip = post['creatorIp'].strip('/').upper() | |
signature = f'[[Special:Contributions/{ip}|{ip}]] {timestamp}' | |
else: | |
# User | |
username = post['createdBy']['name'] | |
signature = f'[[User:{username}|{username}]] ([[User talk:{username}|talk]]) {timestamp}' | |
page_text += f'\n: {post_text} {signature if signature not in post_text else ""}' | |
if not site.logged_in: | |
site.login(login[0], login[2]) | |
talk_page.save(page_text, 'Migrating message wall posts') | |
connection.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment