Created
September 21, 2025 22:19
-
-
Save marcostolosa/c776f878e94f85e95829dbcf8ee5aa03 to your computer and use it in GitHub Desktop.
TikTok OSINT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
CROT DALAM — TikTok OSINT (No-API Web Scraper) · Python CLI | |
Approach | |
• Drive a real browser with Playwright (Chromium) to load the public search page. | |
• Scroll and collect unique video URLs (no login required). | |
• Open each video page and extract metadata from meta tags / structured data / DOM. | |
• Heuristic "risk tags" scoring based on keywords (multi-language friendly). | |
Legal & ethics | |
Use responsibly. Respect TikTok's terms and local laws. This is for OSINT on public data only. | |
Quickstart | |
python -m pip install playwright typer rich | |
python -m playwright install chromium | |
# Basic search (headless) | |
python crot_dalam.py search "phishing" "scam" --limit 80 --out out/crot_dalam | |
# Visible browser + screenshots + Indonesian locale | |
python crot_dalam.py search "promo gratis" --locale id-ID --headless false --screenshot --limit 40 | |
Outputs | |
out/<basename>.jsonl — one JSON object per line | |
out/<basename>.csv — flattened table | |
out/screenshots/ — optional PNGs (one per video) | |
Tested on: Python 3.10+ · Playwright ≥1.44 | |
Refactored by: Marcos Tolosa | |
GitHub: https://github.com/marcostolosa | |
""" | |
from __future__ import annotations | |
import csv | |
import dataclasses as dc | |
import datetime as dt | |
import json | |
import os | |
import pathlib | |
import re | |
import sys | |
import time | |
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union | |
from statistics import mean | |
from urllib.parse import urlparse, parse_qs, quote_plus, urljoin | |
import typer | |
from rich import print as rprint | |
from rich.table import Table | |
from rich.panel import Panel | |
from rich.text import Text | |
from rich.progress import Progress, SpinnerColumn, TextColumn | |
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Page, BrowserContext | |
# ————————————————————————— | |
# Banner & Metadata | |
# ————————————————————————— | |
BANNER = r""" | |
█████████ █████ ██████████ ████ | |
███░░░░░███ ░░███ ░░███░░░░███ ░░███ | |
███ ░░░ ████████ ██████ ███████ ░███ ░░███ ██████ ░███ ██████ █████████████ | |
░███ ░░███░░███ ███░░███░░░███░ ░███ ░███ ░░░░░███ ░███ ░░░░░███ ░░███░░███░░███ | |
░███ ░███ ░░░ ░███ ░███ ░███ ░███ ░███ ███████ ░███ ███████ ░███ ░███ ░███ | |
░░███ ███ ░███ ░███ ░███ ░███ ███ ░███ ███ ███░░███ ░███ ███░░███ ░███ ░███ ░███ | |
░░█████████ █████ ░░██████ ░░█████ ██████████ ░░████████ █████░░████████ █████░███ █████ | |
░░░░░░░░░ ░░░░░ ░░░░░░ ░░░░░ ░░░░░░░░░░ ░░░░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░ | |
Refactored by Marcos Tolosa | |
GitHub: github.com/marcostolosa | |
Original Code By sudo3rs | |
""" | |
SUBTITLE = "Collection & Reconnaissance Of TikTok — Discovery, Analysis, Logging, And Monitoring" | |
def print_banner() -> None: | |
rprint(Panel.fit(BANNER, title="[bold cyan]CROT DALAM[/]", subtitle=SUBTITLE, border_style="cyan")) | |
app = typer.Typer(add_completion=False, help="CROT‑DALAM — TikTok OSINT by keyword (no API)") | |
# ————————— | |
# Data models & keyword heuristics | |
# ————————— | |
@dc.dataclass | |
class VideoRecord: | |
"""Represents a TikTok video with its metadata.""" | |
video_id: str | |
url: str | |
username: Optional[str] = None | |
author_name: Optional[str] = None | |
description: Optional[str] = None | |
upload_date: Optional[str] = None # ISO8601 if found | |
like_count: Optional[int] = None | |
comment_count: Optional[int] = None | |
share_count: Optional[int] = None | |
view_count: Optional[int] = None | |
hashtags: List[str] = dc.field(default_factory=list) | |
keyword_searched: Optional[str] = None | |
risk_score: int = 0 | |
risk_matches: List[str] = dc.field(default_factory=list) | |
def to_dict(self) -> Dict[str, Any]: | |
"""Converts the dataclass instance to a dictionary, flattening lists.""" | |
d = dc.asdict(self) | |
d["hashtags"] = ",".join(self.hashtags) | |
d["risk_matches"] = ",".join(self.risk_matches) | |
return d | |
# Common scam/phish terms (EN + ID + generic). Extend as needed. | |
RISK_TERMS = [ | |
# English | |
r"\b(scams?|phishing|smishing|spoof|giveaway|free\s*iphone|airdrop|crypto\s*giveaway|binary\s*options?|forex\s*signals?)\b", | |
r"\b(win\s*(?:cash|money|prize|reward)s?\b)", | |
r"\bclick\s*(?:the|this)?\s*link\b", | |
r"\bOTP|one[-\s]?time\s*password\b", | |
r"\bverification\s*code\b", | |
r"\bKYC\b", | |
# Indonesian / Bahasa | |
r"\bpenipuan|modus|phising|rek\.?\s*penipu|hadiah\s*gratis|promo\s*gratis|bagi[-\s]?bagi|giveaway\b", | |
r"\bklik\s*link|tautan\s*di\s*bio|link\s*di\s*bio\b", | |
r"\btransfer\s*dulu|deposit\s*dulu|saldo\s*bonus|langsung\s*cair\b", | |
r"\bkode\s*OTP|jangan\s*kasih\s*OTP\b", | |
] | |
RISK_RE = [re.compile(pat, re.I) for pat in RISK_TERMS] | |
# ————————— | |
# Helpers | |
# ————————— | |
def ensure_out(base: str) -> pathlib.Path: | |
"""Ensures the output directory exists and returns the base path.""" | |
p = pathlib.Path(base) | |
if p.suffix: | |
p = p.with_suffix("") | |
p.parent.mkdir(parents=True, exist_ok=True) | |
return p | |
def parse_username_and_id_from_url(url: str) -> Tuple[Optional[str], Optional[str]]: | |
""" | |
Parses the username and video ID from a TikTok video URL. | |
e.g., https://www.tiktok.com/@someuser/video/7251234567890123456 | |
""" | |
try: | |
parsed_url = urlparse(url) | |
path_parts = parsed_url.path.strip("/").split("/") | |
user = None | |
vid = None | |
for i, part in enumerate(path_parts): | |
if part.startswith("@"): | |
user = part[1:] | |
if part == "video" and i + 1 < len(path_parts): | |
vid = path_parts[i + 1].split("?")[0] | |
return user, vid | |
except Exception: | |
return None, None | |
def to_int_safe(s: Optional[str]) -> Optional[int]: | |
"""Converts TikTok shorthand counts like 1.2M, 3.4K to integers.""" | |
if not s: | |
return None | |
try: | |
s_clean = s.strip().replace(",", "") # Handle commas in numbers like 1,234 | |
m = re.match(r"([0-9]+(?:\.[0-9]+)?)([KkMmBb]?)", s_clean) | |
if not m: | |
return int(s_clean) | |
num = float(m.group(1)) | |
suf = m.group(2).lower() | |
mult = {"k": 1_000, "m": 1_000_000, "b": 1_000_000_000}.get(suf, 1) | |
return int(num * mult) | |
except (ValueError, TypeError): | |
return None | |
def risk_score(text: str) -> Tuple[int, List[str]]: | |
"""Calculates risk score and matched terms based on RISK_TERMS.""" | |
text_l = text.lower() if text else "" | |
matches = [] | |
score = 0 | |
for rx in RISK_RE: | |
# findall can return strings or tuples (for groups) | |
found = rx.findall(text_l) | |
for m in found: | |
score += 1 | |
# If it's a tuple (from groups), get the first non-empty match | |
if isinstance(m, tuple): | |
match_str = next((x for x in m if x), str(m)) | |
else: | |
match_str = m if isinstance(m, str) else str(m) | |
matches.append(match_str) | |
# Deduplicate while keeping order | |
return score, list(dict.fromkeys(matches)) | |
# ————————— | |
# Browser routines | |
# ————————— | |
def new_context( | |
pw, | |
headless: bool, | |
locale: str, | |
user_agent: Optional[str], | |
proxy: Optional[str] | |
) -> Tuple[Any, BrowserContext]: | |
"""Creates a new Playwright browser context with settings.""" | |
launch_args = { | |
"headless": headless, | |
"args": ["--disable-blink-features=AutomationControlled"] | |
} | |
if proxy: | |
launch_args["proxy"] = {"server": proxy} | |
browser = pw.chromium.launch(**launch_args) | |
context = browser.new_context( | |
locale=locale, | |
user_agent=user_agent or ( | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " | |
"(KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" # Updated UA | |
), | |
viewport={"width": 1280, "height": 900}, | |
) | |
# Reduce obvious automation fingerprints | |
context.add_init_script(""" | |
Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); | |
window.chrome = { runtime: {} }; | |
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']}); | |
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]}); | |
""") | |
return browser, context | |
def accept_cookies_if_any(page: Page) -> None: | |
"""Attempts to click common 'Accept' cookie buttons.""" | |
try: | |
# Use a more flexible selector | |
page.click("button:has-text('Accept'), button:has-text('I agree'), button:has-text('AGREE'), button:has-text('Accept all')", timeout=5000) | |
except (PWTimeout, Exception): | |
# It's okay if we can't click it, just continue | |
pass | |
def search_collect_video_urls( | |
page: Page, | |
query: str, | |
limit: int, | |
per_scroll_wait: float = 1.5 | |
) -> List[str]: | |
"""Navigates to search results and collects video URLs by scrolling.""" | |
url = f"https://www.tiktok.com/search?q={quote_plus(query)}" | |
page.goto(url, wait_until="domcontentloaded") | |
accept_cookies_if_any(page) | |
seen = set() | |
last_count = 0 | |
stagnant_rounds = 0 | |
max_stagnant_rounds = 8 | |
# Use Rich progress bar | |
with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"), transient=True) as progress: | |
collecting_task = progress.add_task(f"[cyan]Collecting URLs for '{query}'...", total=None) | |
while len(seen) < limit and stagnant_rounds < max_stagnant_rounds: | |
# Gather anchors pointing to video pages | |
anchors = page.query_selector_all("a[href*='/video/']") | |
initial_seen_count = len(seen) | |
for a in anchors: | |
href = a.get_attribute("href") | |
if href: | |
# Ensure absolute URL | |
if href.startswith("/"): | |
href = urljoin("https://www.tiktok.com", href) | |
# Standardize URL (remove query params for deduplication) | |
clean_href = href.split("?")[0] | |
if "/video/" in clean_href: | |
seen.add(clean_href) | |
if len(seen) >= limit: | |
break | |
# Update progress | |
progress.update(collecting_task, description=f"[cyan]Collected {len(seen)} URLs for '{query}'...") | |
# Scroll to load more | |
page.evaluate("window.scrollTo(0, document.body.scrollHeight)") | |
time.sleep(per_scroll_wait) | |
# Check for stagnation | |
if len(seen) == initial_seen_count: | |
stagnant_rounds += 1 | |
# Optional: Add a small wait if stagnating | |
if stagnant_rounds > 3: | |
time.sleep(2) | |
else: | |
stagnant_rounds = 0 | |
last_count = len(seen) | |
final_urls = list(seen)[:limit] | |
rprint(f"[green]✓[/green] Found {len(final_urls)} unique video URLs for '{query}'") | |
return final_urls | |
def extract_video_metadata(page: Page, url: str) -> VideoRecord: | |
"""Extracts metadata from a TikTok video page.""" | |
# Default values | |
desc = None | |
upload_date = None | |
author_name = None | |
view_count = like_count = comment_count = share_count = None | |
hashtags: List[str] = [] | |
username = None | |
vid = None | |
try: | |
page.goto(url, wait_until="domcontentloaded", timeout=30000) | |
accept_cookies_if_any(page) | |
except PWTimeout: | |
rprint(f"[yellow]⚠[/yellow] Timeout loading {url}") | |
except Exception as e: | |
rprint(f"[red]✗[/red] Error loading {url}: {e}") | |
# 1) Try JSON-LD structured data | |
try: | |
for script in page.query_selector_all('script[type="application/ld+json"]'): | |
try: | |
script_content = script.inner_text() | |
if script_content: | |
data = json.loads(script_content) | |
else: | |
continue | |
except json.JSONDecodeError: | |
continue | |
except Exception: | |
continue | |
nodes = data if isinstance(data, list) else [data] | |
for n in nodes: | |
if not isinstance(n, dict): | |
continue | |
# Match VideoObject or SocialMediaPosting types | |
if n.get("@type") in ("VideoObject", "SocialMediaPosting"): | |
desc = n.get("description") or desc | |
# Prefer uploadDate, fallback to datePublished | |
upload_date = n.get("uploadDate") or n.get("datePublished") or upload_date | |
author = n.get("author") | |
if isinstance(author, dict): | |
author_name = author.get("name") or author_name | |
# Extract interaction counts | |
interaction_stats = n.get("interactionStatistic") | |
if isinstance(interaction_stats, list): | |
for stat in interaction_stats: | |
if not isinstance(stat, dict): | |
continue | |
itype = stat.get("interactionType", "") | |
# Convert count to int, handling potential strings like "1.2K" | |
count_str = str(stat.get("userInteractionCount", "")) | |
count = to_int_safe(count_str) | |
if not count: | |
continue | |
# Match interaction type case-insensitively | |
itype_lower = json.dumps(itype).lower() | |
if any(keyword in itype_lower for keyword in ["view", "watch"]): | |
view_count = view_count or count | |
elif "like" in itype_lower: | |
like_count = like_count or count | |
elif "comment" in itype_lower: | |
comment_count = comment_count or count | |
elif "share" in itype_lower: | |
share_count = share_count or count | |
except Exception as e: | |
rprint(f"[yellow]⚠[/yellow] Error parsing JSON-LD for {url}: {e}") | |
# 2) Meta tags fallback for description | |
try: | |
og_desc_elem = page.locator('meta[property="og:description"]').first | |
if og_desc_elem.count() > 0: # Check if element exists | |
og_desc = og_desc_elem.get_attribute("content") | |
if og_desc: | |
desc = desc or og_desc | |
except Exception as e: | |
rprint(f"[yellow]⚠[/yellow] Error parsing meta tags for {url}: {e}") | |
# 3) DOM selectors for hashtags | |
try: | |
# Find hashtag links | |
hashtag_links = page.query_selector_all("a[href*='/tag/']") | |
for a in hashtag_links: | |
text = (a.inner_text() or "").strip() | |
# Hashtags usually start with #, but let's be a bit flexible | |
if text and (text.startswith('#') or re.match(r'^[\w]+$', text)): | |
# Add without the # if present | |
tag = text[1:] if text.startswith('#') else text | |
hashtags.append(tag) | |
except Exception as e: | |
rprint(f"[yellow]⚠[/yellow] Error parsing hashtags for {url}: {e}") | |
# Extract username and video ID from URL (fallback if not in JSON-LD) | |
username, vid = parse_username_and_id_from_url(url) | |
# Calculate risk score based on description | |
score, matches = risk_score(desc or "") | |
return VideoRecord( | |
video_id=vid or "", | |
url=url, | |
username=username, | |
author_name=author_name, | |
description=desc, | |
upload_date=upload_date, | |
like_count=like_count, | |
comment_count=comment_count, | |
share_count=share_count, | |
view_count=view_count, | |
hashtags=sorted(set(hashtags)), # Deduplicate hashtags | |
risk_score=score, | |
risk_matches=matches, | |
) | |
# ————————— | |
# I/O | |
# ————————— | |
def write_outputs( | |
records: List[VideoRecord], | |
base: pathlib.Path, | |
do_screenshots: bool, | |
shots_dir: pathlib.Path | |
) -> None: | |
"""Writes collected records to JSONL and CSV files.""" | |
jsonl_path = base.with_suffix('.jsonl') | |
csv_path = base.with_suffix('.csv') | |
# Write JSONL | |
try: | |
with jsonl_path.open('w', encoding='utf-8') as jf: | |
for record in records: | |
jf.write(json.dumps(record.to_dict(), ensure_ascii=False) + '\n') | |
rprint(f"[bold green]✓ Saved JSONL:[/] {jsonl_path}") | |
except Exception as e: | |
rprint(f"[bold red]✗ Failed to write JSONL:[/] {e}") | |
# Write CSV | |
if records: | |
fields = list(VideoRecord.__annotations__.keys()) | |
try: | |
with csv_path.open('w', encoding='utf-8', newline='') as cf: | |
writer = csv.DictWriter(cf, fieldnames=fields) | |
writer.writeheader() | |
for record in records: | |
writer.writerow(record.to_dict()) | |
rprint(f"[bold green]✓ Saved CSV:[/] {csv_path}") | |
except Exception as e: | |
rprint(f"[bold red]✗ Failed to write CSV:[/] {e}") | |
else: | |
rprint("[yellow]⚠ No data to write to CSV.[/]") | |
# ————————— | |
# CLI | |
# ————————— | |
@app.command() | |
def search( | |
keywords: List[str] = typer.Argument(..., help="One or more keywords to search on TikTok"), | |
limit: int = typer.Option(60, min=1, max=2000, help="Max total videos to collect (approx)"), | |
out: str = typer.Option("out/crot_dalam", help="Output basename (no extension)"), | |
headless: bool = typer.Option(True, help="Run headless browser"), | |
locale: str = typer.Option("en-US", help="Browser locale like en-US or id-ID"), | |
user_agent: Optional[str] = typer.Option(None, help="Custom User-Agent"), | |
proxy: Optional[str] = typer.Option(None, help="Proxy, e.g. http://user:pass@host:port"), | |
screenshot: bool = typer.Option(False, help="Save per-video page screenshot"), | |
per_keyword_limit: Optional[int] = typer.Option(None, help="Max videos per keyword; default shares --limit across all"), | |
): | |
""" | |
Search TikTok public UI for each KEYWORD and export JSONL/CSV (no API keys). | |
Refactored by Marcos Tolosa (github.com/marcostolosa). | |
""" | |
print_banner() | |
base_path = ensure_out(out) | |
screenshots_dir = base_path.parent / "screenshots" | |
if screenshot: | |
screenshots_dir.mkdir(parents=True, exist_ok=True) | |
rprint(f"[bold]CROT-DALAM[/] starting...") | |
rprint(f"Keywords: {keywords}") | |
rprint(f"Total Limit: ~{limit}") | |
rprint(f"Headless: {headless}") | |
rprint(f"Locale: {locale}") | |
if proxy: | |
rprint(f"Proxy: {proxy}") | |
collected_records: List[VideoRecord] = [] | |
seen_video_urls: set[str] = set() | |
with sync_playwright() as pw: | |
browser, context = new_context(pw, headless=headless, locale=locale, user_agent=user_agent, proxy=proxy) | |
search_page = context.new_page() | |
try: | |
total_keywords = len(keywords) | |
for i, keyword in enumerate(keywords, 1): | |
rprint(f"\n[blue]--- Processing keyword ({i}/{total_keywords}): '{keyword}' ---[/]") | |
if len(collected_records) >= limit: | |
rprint("[yellow]⚠ Reached total limit, stopping search.[/]") | |
break | |
# Determine how many videos to fetch for this keyword | |
if per_keyword_limit: | |
keyword_cap = min(per_keyword_limit, limit - len(collected_records)) | |
else: | |
# Distribute remaining limit across remaining keywords, but at least 1 | |
remaining_keywords = total_keywords - i + 1 | |
remaining_limit = limit - len(collected_records) | |
keyword_cap = max(1, remaining_limit // remaining_keywords) | |
# Ensure it doesn't exceed the absolute remaining limit | |
keyword_cap = min(keyword_cap, remaining_limit) | |
rprint(f"[blue]Targeting up to {keyword_cap} videos for '{keyword}'...[/]") | |
# Collect video URLs for this keyword | |
video_urls = search_collect_video_urls(search_page, keyword, keyword_cap) | |
# Process each video URL | |
for j, url in enumerate(video_urls, 1): | |
if url in seen_video_urls: | |
continue # Skip if already processed | |
if len(collected_records) >= limit: | |
rprint("[yellow]⚠ Reached total limit during processing, stopping.[/]") | |
break | |
rprint(f"[dim]Processing ({j}/{len(video_urls)} for '{keyword}'): {url}[/]") | |
seen_video_urls.add(url) | |
# Create a new page for each video to avoid state issues | |
video_page = context.new_page() | |
try: | |
# Extract metadata | |
record = extract_video_metadata(video_page, url) | |
record.keyword_searched = keyword # Attach the search keyword | |
# Take screenshot if requested | |
if screenshot and record.video_id: | |
try: | |
screenshot_path = screenshots_dir / f"{record.video_id}.png" | |
# Use full_page=False for potentially faster screenshots of just the visible area | |
video_page.screenshot(path=str(screenshot_path), full_page=True) | |
# rprint(f"[dim] Screenshot saved: {screenshot_path.name}[/]") # Optional verbose log | |
except Exception as e: | |
rprint(f"[red]✗[/red] Failed to take screenshot for {record.video_id}: {e}") | |
collected_records.append(record) | |
finally: | |
video_page.close() | |
# Brief pause between requests to be respectful | |
time.sleep(0.5) | |
except KeyboardInterrupt: | |
rprint("\n[yellow]⚠ Interrupted by user. Saving collected data...[/]") | |
except Exception as e: | |
rprint(f"\n[bold red]✗ An unexpected error occurred:[/] {e}") | |
finally: | |
# Cleanup browser resources | |
try: | |
search_page.close() | |
context.close() | |
browser.close() | |
except Exception as e: | |
rprint(f"[yellow]⚠ Error during cleanup:[/] {e}") | |
# Write final outputs | |
if collected_records: | |
rprint(f"\n[green]✅ Collection finished. Writing {len(collected_records)} records to files...[/]") | |
write_outputs(collected_records, base_path, screenshot, screenshots_dir) | |
# Console summary using Rich | |
summary_table = Table(title="CROT-DALAM Summary", show_header=True, header_style="bold magenta") | |
summary_table.add_column("Metric", style="dim") | |
summary_table.add_column("Value") | |
summary_table.add_row("Videos Collected", str(len(collected_records))) | |
summary_table.add_row("Keywords Searched", ", ".join(keywords)) | |
if collected_records: | |
avg_risk = mean(record.risk_score for record in collected_records) | |
summary_table.add_row("Average Risk Score", f"{avg_risk:.2f}") | |
else: | |
summary_table.add_row("Average Risk Score", "N/A") | |
rprint(summary_table) | |
else: | |
rprint("\n[red]✗ No video records were collected.[/]") | |
if __name__ == "__main__": | |
try: | |
app() | |
except KeyboardInterrupt: | |
rprint("\n[red]✗ Exiting...[/]") | |
sys.exit(130) | |
except Exception as e: | |
rprint(f"\n[bold red]✗ Fatal error:[/] {e}") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment