Created
December 11, 2021 22:28
-
-
Save csm10495/d997b81f39d3153c609da27d9a52374a to your computer and use it in GitHub Desktop.
Quick/Dirty YouTube video id crawler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Small script that tries to recursively find youtube video ids starting from given urls. | |
MIT License - Charles Machalow | |
''' | |
import time | |
import re | |
from datetime import timedelta, datetime | |
from typing import List, Set | |
from requests_html import HTMLSession | |
import requests | |
import random | |
import pathlib | |
import pickle | |
import getpass | |
import gzip | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
WATCH_IDS_CACHE_FILE = pathlib.Path('watch_ids') | |
class Crawler: | |
def __init__(self): | |
self.url = random.choice([ | |
'https://www.youtube.com/', | |
]) | |
# HTMLSession is nice since it tries to render javascript too | |
self.session = HTMLSession() | |
# despite the name, the url can also just be the video id | |
self.already_done_urls = set() | |
def crawl(self, td: timedelta= timedelta(seconds=10)) -> Set[str]: | |
''' | |
Crawls starting at self.url till the given time delta is up | |
''' | |
watch_ids = set() | |
self.death_time = time.time() + td.total_seconds() | |
while time.time() < self.death_time: | |
# print (f"Current url: {self.url}") | |
new_watch_ids = self.get_watch_ids_from_response(self.get(self.url)) | |
if not new_watch_ids: | |
break | |
# this is not the most efficient way to do this: | |
while self.url in self.already_done_urls and time.time() < self.death_time: | |
self.url = random.choice(list(new_watch_ids)) | |
watch_ids |= new_watch_ids | |
#while self.url in self.already_done_urls: | |
# self.url = next(iter(watch_ids)) | |
print(f"Number of watch ids found: {len(watch_ids)}") | |
return watch_ids | |
def get(self, url: str) -> requests.Response: | |
self.already_done_urls.add(url) | |
# if its a watch url... follow it! | |
if not url.startswith(('https:', 'http:')): | |
url = f'https://www.youtube.com/watch?v={url}' | |
for i in range(10): | |
try: | |
return self.session.get(url, allow_redirects=True) | |
except ConnectionError: | |
time.sleep(i ** 2) | |
def get_watch_ids_from_response(self, response: requests.Response) -> Set[str]: | |
if response.ok: | |
ret = set([a.split('&')[0] for a in re.findall(r'watch\?v=(.+?)\"', response.text) if '\\' not in a and '...' not in a]) | |
# print(f"GET({response.url}) found {len(ret)} watch ids") | |
return ret | |
else: | |
print(f"Warning: Error in response for GET({response.url}): {response.reason}") | |
return [] | |
def multi_crawl(num=8, crawl_td: timedelta=timedelta(seconds=60), use_cache=True): | |
''' | |
Crawls using the given number of thread workers | |
''' | |
results = [] | |
with ThreadPoolExecutor(max_workers=num) as pool: | |
for i in range(num): | |
results.append(pool.submit(Crawler().crawl, td=crawl_td)) | |
final_out = set() | |
for result in results: | |
final_out |= result.result() | |
print(f"Total Number of watch ids found: {len(final_out)}") | |
if use_cache: | |
if not WATCH_IDS_CACHE_FILE.is_file(): | |
WATCH_IDS_CACHE_FILE.write_bytes(pickle.dumps(set())) | |
s = pickle.loads(WATCH_IDS_CACHE_FILE.read_bytes()) | |
s |= final_out | |
WATCH_IDS_CACHE_FILE.write_bytes(pickle.dumps(s)) | |
print(f"Total from cache now: {len(s)}") | |
else: | |
s = final_out | |
return s | |
def add_to_cache_from_file(f): | |
''' | |
Takes a file with youtube links on each line and adds them to the cache | |
''' | |
txt = pathlib.Path(f).read_text() | |
ids = set() | |
for line in txt.splitlines(): | |
if 'youtu.be' in line: | |
id = line.split('/')[-1] | |
elif 'youtube' in line: | |
id = line.split('v=', 1)[-1].split('&')[0] | |
else: | |
continue | |
if len(id) == 11: | |
ids.add(id) | |
s = pickle.loads(WATCH_IDS_CACHE_FILE.read_bytes()) | |
s |= ids | |
WATCH_IDS_CACHE_FILE.write_bytes(pickle.dumps(s)) | |
def cache_to_gz(): | |
''' | |
Takes the cache and makes a txt.gz file | |
''' | |
s = pickle.loads(WATCH_IDS_CACHE_FILE.read_bytes()) | |
txt = '\n'.join(s) | |
compressed_value = gzip.compress(bytes(txt, 'utf-8')) | |
now = datetime.now() | |
pathlib.Path(f'{getpass.getuser()}_{now.year}_{now.month}_{now.day}.txt.gz').write_bytes(compressed_value) | |
if __name__ == '__main__': | |
c = Crawler() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment