Last active
September 30, 2023 06:08
-
-
Save nolanlum/dd160e6ae752093aa5d98998bd0728a6 to your computer and use it in GitHub Desktop.
草まみれ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Callable, Iterable, List, NamedTuple, Optional | |
import json | |
from bs4 import BeautifulSoup | |
import requests | |
session = requests.Session() | |
def session_get(url: str) -> requests.Response: | |
return session.get(url, headers={ | |
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:60.0) Gecko/20100101 Firefox/60.0', | |
}) | |
def fetch_ytInitialData(url: str) -> dict: | |
html = session_get(url) | |
soup = BeautifulSoup(html.text, "html.parser") | |
ytInitialData_script = next( | |
script.string for script in soup.find_all('script') if script.string and 'ytInitialData' in script.string | |
) | |
ytInitialData = next( | |
line.strip()[len('window["ytInitialData"] = '):-1] | |
for line in ytInitialData_script.splitlines() | |
if 'ytInitialData' in line | |
) | |
return json.loads(ytInitialData) | |
def get_all_chat_continuation(ytInitialData: dict) -> dict: | |
livechat_header = ytInitialData['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['header'] | |
viewselector_submenuitems = livechat_header['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'] | |
continuation_by_title_map = { | |
x['title']: x['continuation']['reloadContinuationData']['continuation'] | |
for x in viewselector_submenuitems | |
} | |
return continuation_by_title_map['Live chat replay'] | |
class ChatItem(NamedTuple): | |
timestamp: str | |
author: str | |
text: str | |
def get_live_chat_replay(continuation: Optional[str], progress_callback: Callable = None) -> Iterable[ChatItem]: | |
while True: | |
if not continuation: | |
return | |
ytInitialData = fetch_ytInitialData(f"https://www.youtube.com/live_chat_replay?continuation={continuation}") | |
if 'actions' not in ytInitialData['continuationContents']['liveChatContinuation']: | |
return | |
last_timestamp = '' | |
for action in ytInitialData['continuationContents']['liveChatContinuation']['actions']: | |
replay_action = action.get('replayChatItemAction', {}).get('actions', [{}])[0] | |
if 'addChatItemAction' not in replay_action: | |
continue | |
item = replay_action['addChatItemAction']['item'] | |
renderer = item.get('liveChatTextMessageRenderer') or item.get('liveChatPaidMessageRenderer') | |
if not renderer or 'message' not in renderer: | |
continue | |
last_timestamp = renderer['timestampText']['simpleText'] | |
yield ChatItem( | |
timestamp=renderer['timestampText']['simpleText'], | |
author=renderer['authorName']['simpleText'], | |
text=parse_message_runs(renderer['message']['runs']), | |
) | |
if progress_callback: | |
progress_callback(last_timestamp) | |
continuation = (ytInitialData['continuationContents']['liveChatContinuation']['continuations'][0] | |
.get('liveChatReplayContinuationData', {}).get('continuation')) | |
def parse_message_runs(runs: List[dict]) -> str: | |
message_text = "" | |
for run in runs: | |
if 'text' in run: | |
message_text += run['text'] | |
elif 'emoji' in run: | |
message_text += run['emoji']['shortcuts'][0] | |
else: | |
raise ValueError(f"Unknown run: {run}") | |
return message_text | |
def fetch_all_chat_replay_for_video(video_id: str, progress_callback: Optional[Callable] = None) -> Iterable[ChatItem]: | |
watch_page_url = f"https://www.youtube.com/watch?v={video_id}" | |
ytInitialData = fetch_ytInitialData(watch_page_url) | |
continuation = get_all_chat_continuation(ytInitialData) | |
return get_live_chat_replay(continuation, progress_callback) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser(description="Dump YouTube chat replays.") | |
parser.add_argument('--json', dest='as_json', action='store_true', help='Dump as JSON (for kusadet) instead of human-readable') | |
parser.add_argument('video_id', type=str, nargs='?', default='Bv8g4n40F5M', help='YouTube Video ID') | |
args = parser.parse_args() | |
for item in fetch_all_chat_replay_for_video(args.video_id): | |
if json: | |
print(json.dumps(item._asdict())) | |
else: | |
print(f"[{item.timestamp}] <{item.author}> {item.text}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
import argparse | |
import json | |
from chatdump import fetch_all_chat_replay_for_video, ChatItem | |
parser = argparse.ArgumentParser(description="detect grass") | |
parser.add_argument('--load', dest='load_filename', type=str, help='Load chatlog from JSON file (from chatdump) instead of from the YouTubes') | |
parser.add_argument('video_id', type=str, nargs='?', default='Bv8g4n40F5M', help='YouTube Video ID') | |
args = parser.parse_args() | |
if args.load_filename: | |
def generate_chat_items(): | |
with open(args.load_filename, mode='r') as f: | |
for line in f: | |
if line: | |
yield ChatItem(**json.loads(line)) | |
else: | |
def make_progress(): | |
count = 0 | |
def progress(timestamp: str) -> None: | |
nonlocal count | |
if count % 10 == 0: | |
print(timestamp, end='') | |
print('.', end='', flush=True) | |
count += 1 | |
return progress | |
generate_chat_items = lambda: fetch_all_chat_replay_for_video(args.video_id, make_progress()) | |
kusa_buckets = defaultdict(int) | |
for item in generate_chat_items(): | |
if '草' in item.text or 'kusa' in item.text or 'grass' in item.text or 'wwww' in item.text: | |
timestamp_parts = item.timestamp[:-3].split(':') | |
minutes = -1 | |
if len(timestamp_parts) == 1: | |
minutes = int(timestamp_parts[0]) | |
elif len(timestamp_parts) == 2: | |
minutes = 60 * int(timestamp_parts[0]) + int(timestamp_parts[1]) | |
else: | |
raise ValueError | |
kusa_buckets[minutes] += 1 | |
print() | |
for i in range(max(kusa_buckets.keys()) + 1): | |
print(f"{i:3}: {'w' * kusa_buckets.get(i, 0)}") |
そんなブラウザ拡張機能したいなーと思ってこれが見、同じ考えでw
大歓迎です!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
そんなブラウザ拡張機能したいなーと思ってこれが見つけた、同じ考えでw
edit: https://github.com/pavlukivan/utility/blob/master/kusa-counter.user.js