Skip to content

Instantly share code, notes, and snippets.

Last active August 3, 2021 14:35
Show Gist options
  • Save dnk8n/ee22a58c24aa4a4d62554334946a7841 to your computer and use it in GitHub Desktop.
Save dnk8n/ee22a58c24aa4a4d62554334946a7841 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
"cells": [
"cell_type": "code",
"execution_count": null,
"id": "7cb8c367",
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"# # No evidence that template expanding helps in most cases, so going to skip it in favour of speed\n",
"# wikiextractor_cmd_base = 'python -m wikiextractor.WikiExtractor --json -o -'\n",
"wikiextractor_cmd_base = 'python -m wikiextractor.WikiExtractor --no-templates --json -o -'\n",
"filter_categories_by_keywords = ['Flight', 'Travel', 'Tourism', 'Aerospace', 'Airlines', 'Airports', 'Airfields', 'Aviation', 'Transport']\n",
"csv_file = \"/home/dnk8n/Downloads/travel-wiki-extract.csv\"\n",
"# exclude recombined gz and index files\n",
"#pattern = 'enwiki-20210720-pages-articles-multistream[0-9]*.xml*.bz2' ## For some reason selecting multiple files bugs out\n",
"#pattern = 'enwiki-20210720-pages-articles-multistream.xml.bz2' ## Full wikipedia dump\n",
"pattern = 'enwiki-20210720-pages-articles-multistream16.xml-p20460153p20570392.bz2' ## Smallest file to dev on\n",
"wiki_dir = Path('/home/dnk8n/wikipedia')"
"cell_type": "code",
"execution_count": null,
"id": "9addbb9f",
"metadata": {},
"outputs": [],
"source": [
"import subprocess\n",
"from typing import Pattern, Union\n",
"import xml.sax\n",
"from xml.sax import SAXParseException\n",
"from xml.sax.expatreader import ExpatParser"
"cell_type": "code",
"execution_count": null,
"id": "b76248cc",
"metadata": {},
"outputs": [],
"source": [
"import re\n",
"RE_CATEGORY = re.compile('\\[\\[Category\\:([\\w\\-\\s]+)\\]\\]')\n",
"class WikiXmlHandler(xml.sax.handler.ContentHandler):\n",
" \"\"\"Content handler for Wiki XML data using SAX\"\"\"\n",
" def __init__(self, filter_categories_by=None):\n",
" super().__init__()\n",
" self._current_tag = []\n",
" self._current_id = \"\"\n",
" self._temp_holding_dict = {}\n",
" self._filter_categories_by = filter_categories_by or []\n",
" self.matching_docs = {}\n",
" def startElement(self, name, attrs):\n",
" \"\"\"Opening tag of element\"\"\"\n",
" self._current_tag.append(name)\n",
" def characters(self, content):\n",
" \"\"\"Characters between opening and closing tags\"\"\"\n",
" if self._current_tag:\n",
" if self._current_tag[-1] == \"id\":\n",
" outer_tag = self._current_tag[-2]\n",
" if outer_tag == \"page\": \n",
" self._current_id = content\n",
" elif outer_tag == \"revision\":\n",
" self._temp_holding_dict['revid'] = content\n",
" elif self._current_tag[-1] == \"timestamp\":\n",
" self._temp_holding_dict['timestamp'] = content\n",
" elif self._current_tag[-1] == \"text\":\n",
" category_match = RE_CATEGORY.match(content)\n",
" if category_match:\n",
" category =\n",
" if self._temp_holding_dict.get('categories') is None:\n",
" self._temp_holding_dict['categories'] = []\n",
" self._temp_holding_dict['categories'].append(category)\n",
" def endElement(self, name):\n",
" \"\"\"Closing tag of element\"\"\"\n",
" if self._current_tag:\n",
" if name == self._current_tag[-1]:\n",
" ended_tag = self._current_tag.pop()\n",
" if ended_tag == \"page\":\n",
" wiki_categories = self._temp_holding_dict.get('categories')\n",
" if wiki_categories:\n",
" if self._filter_categories_by:\n",
" if not any(substr.lower() in wiki_category.lower() for substr in self._filter_categories_by for wiki_category in wiki_categories):\n",
" self._temp_holding_dict = {}\n",
" if self._temp_holding_dict:\n",
" self._temp_holding_dict['categories'] = '\\n'.join(self._temp_holding_dict['categories'])\n",
" self.matching_docs[self._current_id] = self._temp_holding_dict\n",
" self._temp_holding_dict = {}\n",
" else:\n",
" if self._filter_categories_by:\n",
" self._temp_holding_dict = {}\n",
" else:\n",
" self.matching_docs[self._current_id] = self._temp_holding_dict\n",
" self._temp_holding_dict = {}"
"cell_type": "code",
"execution_count": null,
"id": "3e2829bb",
"metadata": {},
"outputs": [],
"source": [
"def manualextract_bz_dir_serial(bz_dir: Union[str, Path], pattern: Union[str, Pattern], parser: ExpatParser):\n",
" bz_dir_path = Path(bz_dir)\n",
" assert bz_dir_path.is_dir()\n",
" for f in bz_dir_path.glob(pattern):\n",
" manualextract_bz_file(f)"
"cell_type": "code",
"execution_count": null,
"id": "854471c7",
"metadata": {},
"outputs": [],
"source": [
"def manualextract_bz_file(f: Union[str, Path]):\n",
" f_path = Path(f)\n",
" print('Processing file: ', f)\n",
" assert f_path.is_file()\n",
" for line in subprocess.Popen(\n",
" [\"bzcat\"],\n",
" stdin =,\n",
" stdout = subprocess.PIPE\n",
" ).stdout:\n",
" try:\n",
" parser.feed(line)\n",
" except SAXParseException as e:\n",
" print('error with file: ', f)\n",
" break\n",
" except StopIteration:\n",
" break"
"cell_type": "code",
"execution_count": null,
"id": "baba5f16",
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"def wikiextract_bz(bz_dir: Union[str, Path], pattern: Union[str, Pattern]):\n",
" bz_dir_path = Path(bz_dir)\n",
" assert bz_dir_path.is_dir()\n",
" for f in bz_dir_path.glob(pattern):\n",
" wikiextractor_cmd = wikiextractor_cmd_base.split() + [f.as_posix()]\n",
" print('wikiextractor_cmd: ', wikiextractor_cmd)\n",
" for line in subprocess.Popen(wikiextractor_cmd, stdout=subprocess.PIPE).stdout:\n",
" yield json.loads(line)"
"cell_type": "code",
"execution_count": null,
"id": "a828be0f",
"metadata": {},
"outputs": [],
"source": [
"def merge_docs(main_docs, meta_docs):\n",
" for doc in main_docs:\n",
" id_ = doc['id']\n",
" meta = meta_docs.pop(id_, None)\n",
" if meta:\n",
" yield {\n",
" 'id': id_,\n",
" 'url': doc['url'] + '&oldid=' + meta['revid'],\n",
" 'title': doc['title'],\n",
" 'text': doc['text'],\n",
" **meta\n",
" }\n",
" if not meta_docs:\n",
" break"
"cell_type": "code",
"execution_count": null,
"id": "8125ad93",
"metadata": {
"scrolled": true
"outputs": [],
"source": [
"handler = WikiXmlHandler(filter_categories_by_keywords)\n",
"parser = xml.sax.make_parser()\n",
"cell_type": "code",
"execution_count": null,
"id": "3b67dcbf",
"metadata": {},
"outputs": [],
"source": [
"# %%time\n",
"# # Paralised\n",
"# from multiprocessing import Pool\n",
"# partitions = [f for f in wiki_dir.glob(pattern)]\n",
"# pool = Pool(processes = 3)\n",
"#, partitions)\n",
"# pool.close()\n",
"# pool.join()"
"cell_type": "code",
"execution_count": null,
"id": "3b8bc526",
"metadata": {},
"outputs": [],
"source": [
"# Serial\n",
"manualextract_bz_dir_serial(wiki_dir, pattern, parser)"
"cell_type": "code",
"execution_count": null,
"id": "5bcd05ff",
"metadata": {},
"outputs": [],
"source": [
"# Naturally paralised\n",
"wikiextract_docs = wikiextract_bz(wiki_dir, pattern)"
"cell_type": "code",
"execution_count": null,
"id": "06c3dea0",
"metadata": {},
"outputs": [],
"source": [
"import csv\n",
"csv_columns = ['id', 'url', 'title','text', 'revid', 'timestamp', 'categories']\n",
" with open(csv_file, 'w') as csvfile:\n",
" writer = csv.DictWriter(csvfile, fieldnames=csv_columns)\n",
" writer.writeheader()\n",
" for data in merge_docs(wikiextract_docs, handler.matching_docs):\n",
" writer.writerow(data)\n",
"except IOError:\n",
" print(\"I/O error\")"
"cell_type": "code",
"execution_count": null,
"id": "63e8cb03",
"metadata": {},
"outputs": [],
"source": [
"# TODO: Potentially locate category pages like below and traverse subcategories, thus expanding search of categories to filter by\n",
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"nbformat": 4,
"nbformat_minor": 5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment