Created
March 25, 2023 15:39
-
-
Save dchaplinsky/f7bf86837837778f75b704ef57e3811c to your computer and use it in GitHub Desktop.
A custom fork of the gensim's library wikipedia reader which is better suited for the dump of Ukrainian wikipedia
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bz2 | |
import logging | |
import multiprocessing | |
import re | |
from pickle import PicklingError | |
# LXML isn't faster, so let's go with the built-in solution | |
from xml.etree.ElementTree import iterparse | |
from gensim import utils | |
from gensim.corpora.textcorpus import TextCorpus | |
from gensim.corpora.wikicorpus import ( | |
init_to_ignore_interrupt, | |
extract_pages, | |
IGNORED_NAMESPACES, | |
RE_P0, | |
RE_P1, | |
RE_P9, | |
RE_P10, | |
RE_P11, | |
RE_P14, | |
RE_P5, | |
RE_P6, | |
RE_P13, | |
RE_P17, | |
RE_P2, | |
remove_template, | |
get_namespace, | |
) | |
logger = logging.getLogger("__name__") | |
RE_P12 = re.compile( | |
r"\s(({\|)|(\|-(?!\d))|(\|}))(.*?)(?=\n)", re.UNICODE | |
) # table formatting | |
RE_P15 = re.compile( | |
r"\[\[([fF]ile:|[iI]mage:|[фФ]айл:|[кК]атегорія:)[^\]]*(\]\])", re.UNICODE | |
) | |
# Remove galleries | |
RE_P18 = re.compile(r"<gallery([> ].*?)(</gallery>|/>)", re.DOTALL | re.UNICODE) | |
# Remove headers formatting | |
RE_P19 = re.compile(r"={2,4}\s+([^=]*)\s+={2,4}", re.UNICODE) | |
RE_P20 = re.compile(r"'{2,3}([^']*)'{2,3}", re.UNICODE) | |
def parse_article(args): | |
"""Parse a Wikipedia article, extract plain texts and filter non-useful elements. | |
Parameters | |
---------- | |
args : (int, str, str) | |
Page identificator, article title, article text. | |
Returns | |
------- | |
(int, str, str) | |
Page identificator, article title, article text. | |
""" | |
pageid, title, text, date_of_publish = args | |
text = filter_wiki(text) | |
return pageid, title, text, date_of_publish | |
def remove_file(s): | |
"""Remove the 'File:' and 'Image:' markup, keeping the file caption. | |
Parameters | |
---------- | |
s : str | |
String containing 'File:' and 'Image:' markup. | |
Returns | |
------- | |
str | |
Сopy of `s` with all the 'File:' and 'Image:' markup replaced by their `corresponding captions | |
<http://www.mediawiki.org/wiki/Help:Images>`_. | |
""" | |
# The regex RE_P15 match a File: or Image: markup | |
for match in re.finditer(RE_P15, s): | |
m = match.group(0) | |
caption = "" | |
if "|" in m: | |
caption = m[:-2].split("|")[-1] | |
if re.match(r"\d+.{2,3}$", caption): | |
caption = "" | |
s = s.replace(m, caption, 1) | |
return s | |
def filter_wiki(raw, promote_remaining=True, simplify_links=True): | |
"""Filter out wiki markup from `raw`, leaving only text. | |
Parameters | |
---------- | |
raw : str | |
Unicode or utf-8 encoded string. | |
promote_remaining : bool | |
Whether uncaught markup should be promoted to plain text. | |
simplify_links : bool | |
Whether links should be simplified keeping only their description text. | |
Returns | |
------- | |
str | |
`raw` without markup. | |
""" | |
# parsing of the wiki markup is not perfect, but sufficient for our purposes | |
# contributions to improving this code are welcome :) | |
text = utils.to_unicode(raw, "utf8", errors="ignore") | |
text = utils.decode_htmlentities(text) # '&nbsp;' --> '\xa0' | |
return remove_markup(text, promote_remaining, simplify_links) | |
def remove_markup(text, promote_remaining=True, simplify_links=True): | |
"""Filter out wiki markup from `text`, leaving only text. | |
Parameters | |
---------- | |
text : str | |
String containing markup. | |
promote_remaining : bool | |
Whether uncaught markup should be promoted to plain text. | |
simplify_links : bool | |
Whether links should be simplified keeping only their description text. | |
Returns | |
------- | |
str | |
`text` without markup. | |
""" | |
text = re.sub(RE_P2, "", text) # remove the last list (=languages) | |
# the wiki markup is recursive (markup inside markup etc) | |
# instead of writing a recursive grammar, here we deal with that by removing | |
# markup in a loop, starting with inner-most expressions and working outwards, | |
# for as long as something changes. | |
text = remove_template(text) | |
text = remove_file(text) | |
iters = 0 | |
while True: | |
old, iters = text, iters + 1 | |
text = re.sub(RE_P0, "", text) # remove comments | |
text = re.sub(RE_P1, "", text) # remove footnotes | |
text = re.sub(RE_P9, "", text) # remove outside links | |
text = re.sub(RE_P10, "", text) # remove math content | |
text = re.sub(RE_P18, "", text) # remove gallery content | |
text = re.sub(RE_P11, "", text) # remove all remaining tags | |
text = re.sub(RE_P14, "", text) # remove categories | |
text = re.sub(RE_P5, "\\3", text) # remove urls, keep description | |
if simplify_links: | |
text = re.sub(RE_P6, "\\2", text) # simplify links, keep description only | |
# remove table markup | |
text = text.replace("!!", "\n|") # each table head cell on a separate line | |
text = text.replace("|-||", "\n|") # for cases where a cell is filled with '-' | |
text = re.sub(RE_P12, "\n", text) # remove formatting lines | |
text = text.replace( | |
"|||", "|\n|" | |
) # each table cell on a separate line(where |{{a|b}}||cell-content) | |
text = text.replace("||", "\n|") # each table cell on a separate line | |
text = re.sub(RE_P13, "\n", text) # leave only cell content | |
text = re.sub(RE_P17, "\n", text) # remove formatting lines | |
# remove empty mark-up | |
text = text.replace("[]", "") | |
text = re.sub(RE_P19, "\\1", text) | |
text = re.sub(RE_P20, "\\1", text) | |
# stop if nothing changed between two iterations or after a fixed number of iterations | |
if old == text or iters > 2: | |
break | |
if promote_remaining: | |
text = text.replace("[", "").replace( | |
"]", "" | |
) # promote all remaining markup to plain text | |
return text | |
def extract_pages(f, filter_namespaces=False, filter_articles=None): | |
"""Extract pages from a MediaWiki database dump. | |
Parameters | |
---------- | |
f : file | |
File-like object. | |
filter_namespaces : list of str or bool | |
Namespaces that will be extracted. | |
Yields | |
------ | |
tuple of (str or None, str, str) | |
Title, text and page id. | |
""" | |
elems = (elem for _, elem in iterparse(f, events=("end",))) | |
# We can't rely on the namespace for database dumps, since it's changed | |
# it every time a small modification to the format is made. So, determine | |
# those from the first element we find, which will be part of the metadata, | |
# and construct element paths. | |
elem = next(elems) | |
namespace = get_namespace(elem.tag) | |
ns_mapping = {"ns": namespace} | |
page_tag = "{%(ns)s}page" % ns_mapping | |
text_path = "./{%(ns)s}revision/{%(ns)s}text" % ns_mapping | |
title_path = "./{%(ns)s}title" % ns_mapping | |
date_of_publish_path = "./{%(ns)s}revision/{%(ns)s}timestamp" % ns_mapping | |
ns_path = "./{%(ns)s}ns" % ns_mapping | |
pageid_path = "./{%(ns)s}id" % ns_mapping | |
for elem in elems: | |
if elem.tag == page_tag: | |
title = elem.find(title_path).text | |
date_of_publish = elem.find(date_of_publish_path).text | |
text = elem.find(text_path).text | |
if filter_namespaces: | |
ns = elem.find(ns_path).text | |
if ns not in filter_namespaces: | |
text = None | |
if filter_articles is not None: | |
if not filter_articles( | |
elem, | |
namespace=namespace, | |
title=title, | |
text=text, | |
page_tag=page_tag, | |
text_path=text_path, | |
title_path=title_path, | |
ns_path=ns_path, | |
pageid_path=pageid_path, | |
): | |
text = None | |
pageid = elem.find(pageid_path).text | |
yield title, text or "", pageid, date_of_publish # empty page will yield None | |
# Prune the element tree, as per | |
# http://www.ibm.com/developerworks/xml/library/x-hiperfparse/ | |
# except that we don't need to prune backlinks from the parent | |
# because we don't use LXML. | |
# We do this only for <page>s, since we need to inspect the | |
# ./revision/text element. The pages comprise the bulk of the | |
# file, so in practice we prune away enough. | |
elem.clear() | |
class StreamWikiCorpusReader(TextCorpus): | |
"""Treat a Wikipedia articles dump retrieved by scrapy as a read-only, streamed, memory-efficient corpus. | |
WARNING: NOT compatible with original TextCorpus contracts. DOES NOT handle tokenization (it sucks anyway). | |
TUNED to work properly with Ukrainian language | |
Supported dump formats: | |
* <LANG>wiki-<YYYYMMDD>-pages-articles.xml.bz2 | |
* <LANG>wiki-latest-pages-articles.xml.bz2 | |
The documents are extracted on-the-fly, so that the whole (massive) dump can stay compressed on disk. | |
Notes | |
----- | |
Dumps for the English Wikipedia can be founded at https://dumps.wikimedia.org/enwiki/. | |
Warnings | |
-------- | |
"Multistream" archives are *not* supported in Python 2 due to `limitations in the core bz2 library | |
<https://docs.python.org/2/library/bz2.html#de-compression-of-files>`_. | |
""" | |
def __init__( | |
self, | |
content, | |
processes=None, | |
dictionary=None, | |
filter_namespaces=("0",), | |
filter_articles=None, | |
): | |
"""Initialize the corpus. | |
Parameters | |
---------- | |
content : io.BytesIO | |
In-memory file with dump | |
processes : int, optional | |
Number of processes to run, defaults to `max(1, number of cpu - 1)`. | |
filter_namespaces : tuple of str, optional | |
Namespaces to consider. | |
filter_articles: callable or None, optional | |
If set, each XML article element will be passed to this callable before being processed. Only articles | |
where the callable returns an XML element are processed, returning None allows filtering out | |
some articles based on customised rules. | |
Warnings | |
-------- | |
Unless a dictionary is provided, this scans the corpus once, to determine its vocabulary. | |
""" | |
self.content = content | |
self.filter_namespaces = filter_namespaces | |
self.filter_articles = filter_articles | |
if processes is None: | |
processes = max(1, multiprocessing.cpu_count() - 1) | |
self.processes = processes | |
def get_texts(self): | |
"""Iterate over the dump, yielding a text by a text with no markup, | |
with no excessive filtering (except for namespaces and service articles) or tokenization. | |
Uses multiprocessing internally to parallelize the work and process the dump more quickly. | |
Yields | |
------ | |
(int, str, str) | |
page id, article title and content of an article | |
""" | |
articles, articles_all = 0, 0 | |
texts = ( | |
(pageid, title, text, date_of_publish) | |
for title, text, pageid, date_of_publish in extract_pages( | |
bz2.BZ2File(self.content), | |
self.filter_namespaces, | |
self.filter_articles, | |
) | |
) | |
pool = multiprocessing.Pool(self.processes, init_to_ignore_interrupt) | |
try: | |
# process the corpus in smaller chunks of docs, because multiprocessing.Pool | |
# is dumb and would load the entire input into RAM at once... | |
for group in utils.chunkize( | |
texts, chunksize=10 * self.processes, maxsize=1 | |
): | |
for pageid, title, text, date_of_publish in pool.imap( | |
parse_article, group | |
): | |
articles_all += 1 | |
if any( | |
title.startswith(ignore + ":") for ignore in IGNORED_NAMESPACES | |
): | |
continue | |
articles += 1 | |
yield pageid, title, text, date_of_publish | |
except KeyboardInterrupt: | |
logger.warning( | |
"user terminated iteration over Wikipedia corpus after %i documents" | |
"(total %i articles)", | |
articles, | |
articles_all, | |
) | |
except PicklingError as exc: | |
raise PicklingError( | |
f"Can not send filtering function {self.filter_articles} to multiprocessing, " | |
"make sure the function can be pickled." | |
) from exc | |
else: | |
logger.info( | |
"finished iterating over Wikipedia corpus of %i documents " | |
"(total %i articles)", | |
articles, | |
articles_all, | |
) | |
self.length = articles # cache corpus length | |
finally: | |
pool.terminate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage example: