Last active
February 16, 2020 19:00
-
-
Save boxysean/3ed325ebb75db0303002f9484821e553 to your computer and use it in GitHub Desktop.
Four approaches to multi-threaded extract-and-load code (including a new one)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import sqlite3 | |
URLS = [ | |
'http://www.foxnews.com/', | |
'http://www.cnn.com/', | |
'http://europe.wsj.com/', | |
'http://www.bbc.co.uk/', | |
'http://some-made-up-domain.com/', | |
] | |
async def extract_and_load(url, timeout=30): | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, timeout=timeout) as response: | |
web_result = await response.text() | |
print(f"{url} is {len(web_result)} bytes") | |
with sqlite3.connect('example.db') as conn, conn as cursor: | |
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);') | |
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result))) | |
except Exception as e: | |
print(f"{url} generated an exception: {e}") | |
return False | |
else: | |
return True | |
async def main(): | |
succeeded = await asyncio.gather(*[ | |
extract_and_load(url) | |
for url in URLS | |
]) | |
print(f"Successfully completed {sum(1 for result in succeeded if result)}") | |
if __name__ == '__main__': | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
The concurrent.futures solution, adapted from the Web Crawl example in | |
[PEP-3148](https://www.python.org/dev/peps/pep-3148/#id13). | |
""" | |
import concurrent.futures | |
import requests | |
import sqlite3 | |
URLS = [ | |
'http://www.foxnews.com/', | |
'http://www.cnn.com/', | |
'http://europe.wsj.com/', | |
'http://www.bbc.co.uk/', | |
'http://some-made-up-domain.com/', | |
] | |
def extract_and_load(url, timeout=30): | |
try: | |
web_result = requests.get(url, timeout=timeout).text | |
print(f"{url} is {len(web_result)} bytes") | |
with sqlite3.connect('example.db') as conn, conn as cursor: | |
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);') | |
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result))) | |
except Exception as e: | |
print(f"{url} generated an exception: {e}") | |
return False | |
else: | |
return True | |
def main(): | |
succeeded = [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: | |
future_to_url = dict( | |
(executor.submit(extract_and_load, url), url) | |
for url in URLS | |
) | |
for future in concurrent.futures.as_completed(future_to_url): | |
succeeded.append(future.result()) | |
print(f"Successfully completed {sum(1 for result in succeeded if result)}") | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
My proposed solution. | |
The new interfaces I have defined elsewhere includes: | |
- a context manager `pylateral.task_pool` to provide a multi-threaded | |
worker pool (backed by `concurrent.futures.ThreadPoolContext`) | |
- a decorator `@pylateral.task` to indicate a function must be run on | |
the worker pool (if exists) | |
""" | |
import requests | |
import sqlite3 | |
import pylateral | |
URLS = [ | |
'http://www.foxnews.com/', | |
'http://www.cnn.com/', | |
'http://europe.wsj.com/', | |
'http://www.bbc.co.uk/', | |
'http://some-made-up-domain.com/', | |
] | |
@pylateral.task | |
def extract_and_load(url, timeout=30): | |
try: | |
web_result = requests.get(url, timeout=timeout).text | |
print(f"{url} is {len(web_result)} bytes") | |
with sqlite3.connect('example.db') as conn, conn as cursor: | |
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);') | |
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result))) | |
except Exception as e: | |
print(f"{url} generated an exception: {e}") | |
return False | |
else: | |
return True | |
def main(): | |
# Provide a multithreaded worker pool to all code executing in this | |
# context. The __exit__ method waits for all workers to complete | |
# all tasks. | |
with pylateral.task_pool(max_workers=5) as manager: | |
for url in URLS: | |
# The return value is gobbled up by the @parallel.task | |
# decorator and redirected to `manager.results`. Since this | |
# runs asynchronously, we can't really get the result in | |
# this main execution thread. (If we were using `asyncio`, | |
# we could use `await` here.) | |
extract_and_load(url) | |
# An unordered list of results from the @parallel.task function | |
# calls. | |
succeeded = manager.results | |
print(f"Successfully completed {sum(1 for result in succeeded if result)}") | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Style of glue code I inherited. | |
""" | |
import requests | |
import sqlite3 | |
URLS = [ | |
'http://www.foxnews.com/', | |
'http://www.cnn.com/', | |
'http://europe.wsj.com/', | |
'http://www.bbc.co.uk/', | |
'http://some-made-up-domain.com/', | |
] | |
def extract_and_load(url, timeout=30): | |
try: | |
web_result = requests.get(url, timeout=timeout).text | |
print(f"{url} is {len(web_result)} bytes") | |
with sqlite3.connect('example.db') as conn, conn as cursor: | |
cursor.execute('CREATE TABLE IF NOT EXISTS web_results (url text, length int);') | |
cursor.execute('INSERT INTO web_results VALUES (?, ?)', (url, len(web_result))) | |
except Exception as e: | |
print(f"{url} generated an exception: {e}") | |
return False | |
else: | |
return True | |
def main(): | |
succeeded = [] | |
for url in URLS: | |
succeeded.append(extract_and_load(url)) | |
print(f"Successfully completed {sum(1 for result in succeeded if result)}") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment