Created
April 18, 2012 21:07
-
-
Save cburgmer/2416573 to your computer and use it in GitHub Desktop.
Mediawiki markup to text converter that operates on dumps
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import logging | |
from lxml import etree | |
# Install from https://github.com/erikrose/pijnu & https://github.com/erikrose/mediawiki-parser | |
from pijnu.library.error import IncompleteParse | |
from preprocessor import make_parser as make_preprocessor_parser | |
from text import make_parser | |
__all__ = ["iter_pages", "convert_page", "PageNotFound"] | |
logger = logging.getLogger("wiki2text") | |
MW_NAMESPACE = 'http://www.mediawiki.org/xml/export-0.6/' | |
preprocessor = make_preprocessor_parser({}) | |
parser = make_parser() | |
def iter_pages(input_file, selector_func=None): | |
""" | |
Iterates over all pages in the dump and yields each page as text with its title. | |
Optionally a selector function can be given to limit the number of pages returned. | |
""" | |
context = etree.iterparse(input_file, events=('end',), tag='{%s}page' % MW_NAMESPACE) | |
return fast_iter(context, convert_page_node, selector_func) | |
def convert_page(input_file, page_title): | |
""" Converts a single page in the dump. Returns None if no page with the given title exists. """ | |
def matches_page_title(page): | |
title = get_page_title(page) | |
return title == page_title | |
try: | |
_, text = iter_pages(input_file, matches_page_title).next() | |
return text | |
except StopIteration: | |
raise PageNotFound(u"Page '%s' could not be found" % page_title) | |
class PageNotFound(Exception): | |
pass | |
# High performance iteration over large lxml parse tree, | |
# taken from http://www.ibm.com/developerworks/xml/library/x-hiperfparse/ | |
def fast_iter(context, func, selector_func=None): | |
for event, elem in context: | |
if not selector_func or selector_func(elem): | |
yield func(elem) | |
elem.clear() | |
while elem.getprevious() is not None: | |
del elem.getparent()[0] | |
del context | |
def get_page_title(page_node): | |
title = page_node.find('mw:title', namespaces={'mw': MW_NAMESPACE}) | |
return title.text | |
def convert_page_node(page): | |
text_node = page.find('mw:revision/mw:text', namespaces={'mw': MW_NAMESPACE}) | |
title = get_page_title(page) | |
converted_text = None | |
if text_node.text is None: | |
logger.error("Cannot get text from page %r" % title) | |
else: | |
try: | |
converted_text = wiki2text(text_node.text) | |
except IncompleteParse, e: | |
logger.error("Error parsing entry %r" % wiki_text[0:20]) | |
except: | |
logger.error("Error at %r" % text_node.text[:20]) | |
return (title, converted_text) | |
def wiki2text(wiki_text): | |
preprocessed_text = preprocessor.parse(wiki_text) | |
return parser.parse(preprocessed_text.value + '\n').leaves() | |
def main(): | |
logging.basicConfig() | |
input_file = sys.argv[1].decode('utf8') | |
if len(sys.argv) > 2: | |
page_title = sys.argv[2].decode('utf8') | |
try: | |
text_page = convert_page(input_file, page_title) | |
if text_page: | |
print text_page.encode('utf8') | |
except PageNotFound, e: | |
print >> sys.stderr, unicode(e).encode('utf8') | |
sys.exit(1) | |
else: | |
text_pages = iter_pages(input_file) | |
for title, page in text_pages: | |
if page: | |
print page.encode('utf8') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment