Skip to content

Instantly share code, notes, and snippets.

@iwatakeshi
Created September 6, 2019 14:07
Show Gist options
  • Save iwatakeshi/f4f4d8d358bf80e5c8b1795745452dd7 to your computer and use it in GitHub Desktop.
Save iwatakeshi/f4f4d8d358bf80e5c8b1795745452dd7 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
import asyncio
import tldextract
import functools
import os
import click
from urllib.parse import urlparse
from usp.tree import sitemap_tree_for_homepage
from pyppeteer import launch
from functools import wraps
def coro(f):
@wraps(f)
def wrapper(*args, **kwargs):
return asyncio.run(f(*args, **kwargs))
return wrapper
def run_in_executor(f):
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, lambda: f(*args, **kwargs))
return inner
@click.command()
@click.argument('url', default='http://localhost:3000/')
@click.option('--domain', required=False, default=None, help="Set the domain name of the website")
@coro
async def main(url, domain):
async def test(page):
# loop = asyncio.get_event_loop()
# if (loop.is_closed()):
# print('CLOSED 2')
# else:
# print('NOT CLOSED 2')
browser = await launch()
page = await browser.newPage()
await page.goto(page.url)
await page.screenshot({'path': 'example.png'})
await browser.close()
pages = await fetch(url)
tasks = [test(page) for page in pages]
await asyncio.wait(tasks)
# async def main(url, domain):
# tasks = await snap(url, domain)
# print([await t for t in asyncio.as_completed(tasks)])
@run_in_executor
def fetch(url):
return sitemap_tree_for_homepage(url).all_pages()
# async def snap(url, domain):
# tree = await fetch(url)
# return [asyncio.create_task(navigate(page, domain)) for page in tree.all_pages()]
# async def navigate(page, domain):
# print(os.path.join(os.getcwd(), filename(page, domain)))
# browser = await launch()
# page = await browser.newPage()
# await page.goto(page.url)
# await page.screenshot({'path': os.path.join(os.getcwd(), filename(page, domain))} )
# await browser.close()
def filename(page, domain):
domain = domain or tldextract.extract(page.url).domain
ending = path(page)
ext = 'png'
if ending is None:
return f'{domain}.{ext}'
return f'{domain}-{ending}.{ext}'
# def path(page):
# return urlparse(page.url).path.replace("/", "")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment