Last active
November 21, 2020 16:33
-
-
Save deliro/26a3ce1808fc052a4c17984131649b33 to your computer and use it in GitHub Desktop.
async parsing of wikipedia with process pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from concurrent.futures import ProcessPoolExecutor | |
import aiohttp | |
from loguru import logger as loguru | |
from lxml.html import fromstring | |
pool = ProcessPoolExecutor() | |
parser_sem = asyncio.Semaphore(pool._max_workers) | |
loguru.info(f"CPU workers: {pool._max_workers}") | |
host = "https://ru.wikipedia.org" | |
start_from = f"{host}/wiki/Заглавная_страница" | |
q_d = asyncio.Queue(maxsize=1024) | |
q_p = asyncio.Queue() | |
sem = asyncio.Semaphore(100) | |
downloaded_urls = set() | |
class O: | |
downloaded = 0 | |
parsed = 0 | |
downloading = 0 | |
down_pending = 0 | |
waiting_for_download_q = 0 | |
o = O() | |
async def log_printer(queue_d, queue_p): | |
while True: | |
loguru.debug( | |
f"[PRINTER] to Download: {queue_d.qsize()}, to Parse: {queue_p.qsize()}" | |
f" downloaded: {o.downloaded}, parsed: {o.parsed}" | |
f" pending: {o.down_pending}, downloading: {o.downloading}" | |
f" waiting Q: {o.waiting_for_download_q}" | |
f" tasks: {len(asyncio.all_tasks())}" | |
) | |
await asyncio.sleep(0.33) | |
def lxml_parse(html): | |
try: | |
tree = fromstring(html) | |
urls = tree.xpath("//a/@href") | |
try: | |
title = tree.find(".//title").text | |
except AttributeError: | |
title = "<UNKNOWN>" | |
new_urls = [] | |
for url in urls: | |
if url.startswith("/") and not url.startswith("//"): | |
new_urls.append(f"{host}{url}") | |
elif url.startswith("http"): | |
new_urls.append(url) | |
return new_urls, title | |
except Exception as e: | |
loguru.error(f"Parse error: {e}") | |
return [], "<ERROR>" | |
async def parse(html): | |
loop = asyncio.get_event_loop() | |
urls, title = await loop.run_in_executor(pool, lxml_parse, html) | |
o.parsed += 1 | |
return urls, title | |
async def start_parse_task(content, queue_d): | |
async with parser_sem: | |
urls, title = await parse(content) | |
# loguru.debug(f"[PARSER]: Parse done {title}") | |
o.waiting_for_download_q += 1 | |
for url in urls: | |
if url not in downloaded_urls: | |
await queue_d.put(url) | |
o.waiting_for_download_q -= 1 | |
# loguru.debug(f"[PARSER]: Add {len(urls)} to download queue") | |
async def parser(queue_d, queue_p): | |
while True: | |
content = await queue_p.get() | |
asyncio.create_task(start_parse_task(content, queue_d)) | |
async def downloader(queue_d, queue_p, session): | |
while True: | |
url = await queue_d.get() | |
if url in downloaded_urls: | |
continue | |
o.down_pending += 1 | |
async with sem: | |
o.down_pending -= 1 | |
o.downloading += 1 | |
try: | |
async with session.get(url) as resp: | |
downloaded_urls.add(url) | |
# loguru.debug(f"[DOWNLOADER]: got response for {url}") | |
try: | |
text = await resp.text() | |
await queue_p.put(text) | |
except UnicodeDecodeError: | |
pass | |
o.downloaded += 1 | |
except Exception as e: | |
loguru.error(f"Download error: {e}") | |
finally: | |
o.downloading -= 1 | |
async def main(): | |
await q_d.put(start_from) | |
async with aiohttp.ClientSession() as session: | |
ds = [] | |
for i in range(100): | |
ds.append(asyncio.create_task(downloader(q_d, q_p, session))) | |
p = asyncio.create_task(parser(q_d, q_p)) | |
printer = asyncio.create_task(log_printer(q_d, q_p)) | |
await asyncio.gather(*ds, p, printer) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment