Created
September 6, 2019 14:07
-
-
Save iwatakeshi/f4f4d8d358bf80e5c8b1795745452dd7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
import asyncio | |
import tldextract | |
import functools | |
import os | |
import click | |
from urllib.parse import urlparse | |
from usp.tree import sitemap_tree_for_homepage | |
from pyppeteer import launch | |
from functools import wraps | |
def coro(f): | |
@wraps(f) | |
def wrapper(*args, **kwargs): | |
return asyncio.run(f(*args, **kwargs)) | |
return wrapper | |
def run_in_executor(f): | |
@functools.wraps(f) | |
def inner(*args, **kwargs): | |
loop = asyncio.get_running_loop() | |
return loop.run_in_executor(None, lambda: f(*args, **kwargs)) | |
return inner | |
@click.command() | |
@click.argument('url', default='http://localhost:3000/') | |
@click.option('--domain', required=False, default=None, help="Set the domain name of the website") | |
@coro | |
async def main(url, domain): | |
async def test(page): | |
# loop = asyncio.get_event_loop() | |
# if (loop.is_closed()): | |
# print('CLOSED 2') | |
# else: | |
# print('NOT CLOSED 2') | |
browser = await launch() | |
page = await browser.newPage() | |
await page.goto(page.url) | |
await page.screenshot({'path': 'example.png'}) | |
await browser.close() | |
pages = await fetch(url) | |
tasks = [test(page) for page in pages] | |
await asyncio.wait(tasks) | |
# async def main(url, domain): | |
# tasks = await snap(url, domain) | |
# print([await t for t in asyncio.as_completed(tasks)]) | |
@run_in_executor | |
def fetch(url): | |
return sitemap_tree_for_homepage(url).all_pages() | |
# async def snap(url, domain): | |
# tree = await fetch(url) | |
# return [asyncio.create_task(navigate(page, domain)) for page in tree.all_pages()] | |
# async def navigate(page, domain): | |
# print(os.path.join(os.getcwd(), filename(page, domain))) | |
# browser = await launch() | |
# page = await browser.newPage() | |
# await page.goto(page.url) | |
# await page.screenshot({'path': os.path.join(os.getcwd(), filename(page, domain))} ) | |
# await browser.close() | |
def filename(page, domain): | |
domain = domain or tldextract.extract(page.url).domain | |
ending = path(page) | |
ext = 'png' | |
if ending is None: | |
return f'{domain}.{ext}' | |
return f'{domain}-{ending}.{ext}' | |
# def path(page): | |
# return urlparse(page.url).path.replace("/", "") | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete(main()) | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment