Skip to content

Instantly share code, notes, and snippets.

@Ladsgroup
Created September 24, 2021 13:46
Show Gist options
  • Save Ladsgroup/b7b8d484c531a32ff6ed33baa1166e6f to your computer and use it in GitHub Desktop.
Save Ladsgroup/b7b8d484c531a32ff6ed33baa1166e6f to your computer and use it in GitHub Desktop.
from collections import defaultdict
import re
categories = defaultdict(dict)
c = 0
MARKER = '@@@@@@@@@@@@@@@'
# License: MIT
with open('res.tsv', 'r') as f:
for line in f:
c += 1
#if c > 10000:
# break
line = line.strip('\n').split('\t')
main_cat = ':'.join(line[1].split(':')[1:])
subcat = ':'.join(line[0].split(':')[1:])
if line[2]:
categories[main_cat]['with_sortkey'] = categories[main_cat].get('with_sortkey', {})
categories[main_cat]['with_sortkey'].update({subcat: line[2].strip('|')})
else:
categories[main_cat]['without_sortkey'] = categories[main_cat].get('without_sortkey', set())
categories[main_cat]['without_sortkey'].add(subcat)
new = {}
for cat in categories:
if categories[cat].get('with_sortkey') and categories[cat].get('without_sortkey'):
new[cat] = categories[cat]
del categories
with open('final_res.tsv', 'w') as f:
f.write('')
for cat in new:
bust = False
cat_pattern = None
c = 0
for subcat in new[cat]['with_sortkey']:
key = new[cat]['with_sortkey'][subcat]
if ' ' + key + ' ' not in ' ' + subcat + ' ':
bust = True
break
pattern = ' ' + subcat + ' '
pattern = pattern.replace(' ' + key + ' ', MARKER)
if pattern.count(MARKER) != 1:
bust = True
break
if cat_pattern and cat_pattern != pattern:
bust = True
break
cat_pattern = pattern
c += 1
if not cat_pattern or bust or c < 5:
continue
cat_pattern = pattern.strip()
re_pattern = re.compile(r'^' + re.escape(cat_pattern.split(MARKER)[0]) + r' (.+)' + re.escape(cat_pattern.split(MARKER)[1]) + r'$')
for subcat in new[cat]['without_sortkey']:
if not re_pattern.findall(subcat):
continue
with open('final_res.tsv', 'a') as f:
f.write('\t'.join([subcat, cat, cat_pattern]) + '\n')
from collections import defaultdict
import re
import sys
import pywikibot
import requests
categories = defaultdict(dict)
# License: MIT
c = 0
MARKER = '@@@@@@@@@@@@@@@'
with open('final_res.tsv', 'r') as f:
cases = f.read().split('\n')
site = pywikibot.Site('en')
reached = False
for case in cases:
if '\t' not in case:
continue
case = case.split('\t')
(subcat, cat, cat_pattern) = (case[0], case[1], case[2])
re_pattern = re.compile(r'^' + re.escape(cat_pattern.split(MARKER)[0]) + r' (.+)' + re.escape(cat_pattern.split(MARKER)[1]) + r'$')
subcat_page = pywikibot.Page(site, 'Category:' + subcat)
params = {
'action': 'query',
'prop': 'pageprops',
'ppprop': 'defaultsort',
'format': 'json',
'titles': 'Category:' + subcat,
'formatversion': 2,
}
if not re_pattern.findall(subcat):
continue
res = requests.get('https://en.wikipedia.org/w/api.php', params=params).json()
defaultsort = None
try:
defaultsort = res['query']['pages'][0].get('pageprops', {}).get('defaultsort')
except:
pass
if defaultsort:
print('DS', defaultsort)
continue
try:
text = subcat_page.get()
except:
continue
new_text = re.sub(r'\[\[((?:[cC]ategory|رده)\:' + re.escape(cat) + ')\]\]', r'[[\1|' + re_pattern.findall(subcat)[0].strip() + ']]', text)
if text == new_text:
print('boooo')
else:
subcat_page.put(new_text, summary='Bot: Setting category sortkey')
import json
import os
import signal
import sys
import time
import re
from collections import OrderedDict, defaultdict
from multiprocessing import Pool
from pywikibot import xmlreader
# License: MIT
cat_regexes = {
'fa': r'(?:[Cc]ategory|رده)',
'en': r'[Cc]ategory'
}
class TimeoutException(Exception):
pass
def check_text(text, lang):
cat_links = re.compile(r'\[\[(' + cat_regexes.get(lang, cat_regexes['en']) + r' *?\: *?.+?) *?(\| *?.*?)?\]\]')
return cat_links.findall(text)
def handler(signum, frame):
print('Timeout error recieved')
raise TimeoutException("timeout error")
def read_dump_first(path_to_dump):
print(path_to_dump)
lang = path_to_dump.split('/')[-1].split('wiki')[0]
dump = xmlreader.XmlDump(path_to_dump)
for case in dump.parse():
if int(case.id) % 10000 == 0:
print(case.id)
if int(case.ns) != 14:
continue
signal.signal(signal.SIGALRM, handler)
signal.alarm(5)
try:
data = check_text(case.text, lang)
except TimeoutException:
continue
signal.alarm(0)
for word in data:
with open('res.tsv', 'a') as f:
f.write('\t'.join([case.title] + list(word)) + '\n')
def main(*args):
stime = time.time()
dumps = []
for i in sys.argv[1:]:
if i.startswith('--'):
continue
dumps.append(i)
workers = 20
with open('res.tsv', 'w') as f:
f.write('')
with Pool(processes=workers) as pool:
pool.map(read_dump_first, dumps)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment