Last active
July 12, 2024 00:41
-
-
Save jondurbin/11c2e5a7496ab8e38d44a0800e83e437 to your computer and use it in GitHub Desktop.
Copyright check
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import os | |
import re | |
from loguru import logger | |
from playwright.async_api import async_playwright | |
async def check_site(browser, domain, status): | |
page = await browser.new_page() | |
try: | |
await page.goto(f"https://{domain}") | |
has_copyright = ( | |
re.search(r"©|©|copyright", await page.content(), re.I) is not None | |
) | |
if has_copyright: | |
logger.error(f"[y] {domain}") | |
else: | |
logger.success(f"[n] {domain}") | |
status[domain] = str(has_copyright).lower() | |
except Exception: | |
logger.warning(f"[?] {domain}") | |
status[domain] = "unknown" | |
finally: | |
await page.close() | |
async def main(): | |
tasks = [] | |
async def _wait(): | |
if not tasks: | |
return | |
_, pending = await asyncio.wait(tasks, timeout=0.0) | |
while len(pending) > 16: | |
_, pending = await asyncio.wait(tasks, timeout=0.1) | |
async with aiohttp.ClientSession() as client: | |
if not os.path.exists("majestic_million.csv"): | |
async with client.get( | |
"https://downloads.majestic.com/majestic_million.csv" | |
) as response: | |
with open("majestic_million.csv", "w") as outfile: | |
outfile.write(await response.text()) | |
domains = [ | |
line.split(",")[2] | |
for line in open("majestic_million.csv").read().splitlines()[1:5001] | |
] | |
status = {} | |
async with async_playwright() as p: | |
browser = await p.chromium.launch() | |
for domain in domains: | |
await _wait() | |
tasks.append(asyncio.create_task(check_site(browser, domain, status))) | |
await asyncio.wait(tasks) | |
allowed_count = 0 | |
with open("result.csv", "w") as outfile: | |
outfile.write("domain,has_copyright\n") | |
for domain, has_copyright in status.items(): | |
outfile.write(f"{domain},{has_copyright}\n") | |
if has_copyright == "false": | |
allowed_count += 1 | |
logger.info( | |
f"Total without copyright = {allowed_count}, {allowed_count / len(status)}" | |
) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment