Skip to content

Instantly share code, notes, and snippets.

@marcostolosa
Created September 21, 2025 22:19
Show Gist options
  • Save marcostolosa/c776f878e94f85e95829dbcf8ee5aa03 to your computer and use it in GitHub Desktop.
Save marcostolosa/c776f878e94f85e95829dbcf8ee5aa03 to your computer and use it in GitHub Desktop.
TikTok OSINT
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CROT DALAM — TikTok OSINT (No-API Web Scraper) · Python CLI
Approach
• Drive a real browser with Playwright (Chromium) to load the public search page.
• Scroll and collect unique video URLs (no login required).
• Open each video page and extract metadata from meta tags / structured data / DOM.
• Heuristic "risk tags" scoring based on keywords (multi-language friendly).
Legal & ethics
Use responsibly. Respect TikTok's terms and local laws. This is for OSINT on public data only.
Quickstart
python -m pip install playwright typer rich
python -m playwright install chromium
# Basic search (headless)
python crot_dalam.py search "phishing" "scam" --limit 80 --out out/crot_dalam
# Visible browser + screenshots + Indonesian locale
python crot_dalam.py search "promo gratis" --locale id-ID --headless false --screenshot --limit 40
Outputs
out/<basename>.jsonl — one JSON object per line
out/<basename>.csv — flattened table
out/screenshots/ — optional PNGs (one per video)
Tested on: Python 3.10+ · Playwright ≥1.44
Refactored by: Marcos Tolosa
GitHub: https://github.com/marcostolosa
"""
from __future__ import annotations
import csv
import dataclasses as dc
import datetime as dt
import json
import os
import pathlib
import re
import sys
import time
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from statistics import mean
from urllib.parse import urlparse, parse_qs, quote_plus, urljoin
import typer
from rich import print as rprint
from rich.table import Table
from rich.panel import Panel
from rich.text import Text
from rich.progress import Progress, SpinnerColumn, TextColumn
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Page, BrowserContext
# —————————————————————————
# Banner & Metadata
# —————————————————————————
BANNER = r"""
█████████ █████ ██████████ ████
███░░░░░███ ░░███ ░░███░░░░███ ░░███
███ ░░░ ████████ ██████ ███████ ░███ ░░███ ██████ ░███ ██████ █████████████
░███ ░░███░░███ ███░░███░░░███░ ░███ ░███ ░░░░░███ ░███ ░░░░░███ ░░███░░███░░███
░███ ░███ ░░░ ░███ ░███ ░███ ░███ ░███ ███████ ░███ ███████ ░███ ░███ ░███
░░███ ███ ░███ ░███ ░███ ░███ ███ ░███ ███ ███░░███ ░███ ███░░███ ░███ ░███ ░███
░░█████████ █████ ░░██████ ░░█████ ██████████ ░░████████ █████░░████████ █████░███ █████
░░░░░░░░░ ░░░░░ ░░░░░░ ░░░░░ ░░░░░░░░░░ ░░░░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░
Refactored by Marcos Tolosa
GitHub: github.com/marcostolosa
Original Code By sudo3rs
"""
SUBTITLE = "Collection & Reconnaissance Of TikTok — Discovery, Analysis, Logging, And Monitoring"
def print_banner() -> None:
rprint(Panel.fit(BANNER, title="[bold cyan]CROT DALAM[/]", subtitle=SUBTITLE, border_style="cyan"))
app = typer.Typer(add_completion=False, help="CROT‑DALAM — TikTok OSINT by keyword (no API)")
# —————————
# Data models & keyword heuristics
# —————————
@dc.dataclass
class VideoRecord:
"""Represents a TikTok video with its metadata."""
video_id: str
url: str
username: Optional[str] = None
author_name: Optional[str] = None
description: Optional[str] = None
upload_date: Optional[str] = None # ISO8601 if found
like_count: Optional[int] = None
comment_count: Optional[int] = None
share_count: Optional[int] = None
view_count: Optional[int] = None
hashtags: List[str] = dc.field(default_factory=list)
keyword_searched: Optional[str] = None
risk_score: int = 0
risk_matches: List[str] = dc.field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Converts the dataclass instance to a dictionary, flattening lists."""
d = dc.asdict(self)
d["hashtags"] = ",".join(self.hashtags)
d["risk_matches"] = ",".join(self.risk_matches)
return d
# Common scam/phish terms (EN + ID + generic). Extend as needed.
RISK_TERMS = [
# English
r"\b(scams?|phishing|smishing|spoof|giveaway|free\s*iphone|airdrop|crypto\s*giveaway|binary\s*options?|forex\s*signals?)\b",
r"\b(win\s*(?:cash|money|prize|reward)s?\b)",
r"\bclick\s*(?:the|this)?\s*link\b",
r"\bOTP|one[-\s]?time\s*password\b",
r"\bverification\s*code\b",
r"\bKYC\b",
# Indonesian / Bahasa
r"\bpenipuan|modus|phising|rek\.?\s*penipu|hadiah\s*gratis|promo\s*gratis|bagi[-\s]?bagi|giveaway\b",
r"\bklik\s*link|tautan\s*di\s*bio|link\s*di\s*bio\b",
r"\btransfer\s*dulu|deposit\s*dulu|saldo\s*bonus|langsung\s*cair\b",
r"\bkode\s*OTP|jangan\s*kasih\s*OTP\b",
]
RISK_RE = [re.compile(pat, re.I) for pat in RISK_TERMS]
# —————————
# Helpers
# —————————
def ensure_out(base: str) -> pathlib.Path:
"""Ensures the output directory exists and returns the base path."""
p = pathlib.Path(base)
if p.suffix:
p = p.with_suffix("")
p.parent.mkdir(parents=True, exist_ok=True)
return p
def parse_username_and_id_from_url(url: str) -> Tuple[Optional[str], Optional[str]]:
"""
Parses the username and video ID from a TikTok video URL.
e.g., https://www.tiktok.com/@someuser/video/7251234567890123456
"""
try:
parsed_url = urlparse(url)
path_parts = parsed_url.path.strip("/").split("/")
user = None
vid = None
for i, part in enumerate(path_parts):
if part.startswith("@"):
user = part[1:]
if part == "video" and i + 1 < len(path_parts):
vid = path_parts[i + 1].split("?")[0]
return user, vid
except Exception:
return None, None
def to_int_safe(s: Optional[str]) -> Optional[int]:
"""Converts TikTok shorthand counts like 1.2M, 3.4K to integers."""
if not s:
return None
try:
s_clean = s.strip().replace(",", "") # Handle commas in numbers like 1,234
m = re.match(r"([0-9]+(?:\.[0-9]+)?)([KkMmBb]?)", s_clean)
if not m:
return int(s_clean)
num = float(m.group(1))
suf = m.group(2).lower()
mult = {"k": 1_000, "m": 1_000_000, "b": 1_000_000_000}.get(suf, 1)
return int(num * mult)
except (ValueError, TypeError):
return None
def risk_score(text: str) -> Tuple[int, List[str]]:
"""Calculates risk score and matched terms based on RISK_TERMS."""
text_l = text.lower() if text else ""
matches = []
score = 0
for rx in RISK_RE:
# findall can return strings or tuples (for groups)
found = rx.findall(text_l)
for m in found:
score += 1
# If it's a tuple (from groups), get the first non-empty match
if isinstance(m, tuple):
match_str = next((x for x in m if x), str(m))
else:
match_str = m if isinstance(m, str) else str(m)
matches.append(match_str)
# Deduplicate while keeping order
return score, list(dict.fromkeys(matches))
# —————————
# Browser routines
# —————————
def new_context(
pw,
headless: bool,
locale: str,
user_agent: Optional[str],
proxy: Optional[str]
) -> Tuple[Any, BrowserContext]:
"""Creates a new Playwright browser context with settings."""
launch_args = {
"headless": headless,
"args": ["--disable-blink-features=AutomationControlled"]
}
if proxy:
launch_args["proxy"] = {"server": proxy}
browser = pw.chromium.launch(**launch_args)
context = browser.new_context(
locale=locale,
user_agent=user_agent or (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" # Updated UA
),
viewport={"width": 1280, "height": 900},
)
# Reduce obvious automation fingerprints
context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.chrome = { runtime: {} };
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
""")
return browser, context
def accept_cookies_if_any(page: Page) -> None:
"""Attempts to click common 'Accept' cookie buttons."""
try:
# Use a more flexible selector
page.click("button:has-text('Accept'), button:has-text('I agree'), button:has-text('AGREE'), button:has-text('Accept all')", timeout=5000)
except (PWTimeout, Exception):
# It's okay if we can't click it, just continue
pass
def search_collect_video_urls(
page: Page,
query: str,
limit: int,
per_scroll_wait: float = 1.5
) -> List[str]:
"""Navigates to search results and collects video URLs by scrolling."""
url = f"https://www.tiktok.com/search?q={quote_plus(query)}"
page.goto(url, wait_until="domcontentloaded")
accept_cookies_if_any(page)
seen = set()
last_count = 0
stagnant_rounds = 0
max_stagnant_rounds = 8
# Use Rich progress bar
with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"), transient=True) as progress:
collecting_task = progress.add_task(f"[cyan]Collecting URLs for '{query}'...", total=None)
while len(seen) < limit and stagnant_rounds < max_stagnant_rounds:
# Gather anchors pointing to video pages
anchors = page.query_selector_all("a[href*='/video/']")
initial_seen_count = len(seen)
for a in anchors:
href = a.get_attribute("href")
if href:
# Ensure absolute URL
if href.startswith("/"):
href = urljoin("https://www.tiktok.com", href)
# Standardize URL (remove query params for deduplication)
clean_href = href.split("?")[0]
if "/video/" in clean_href:
seen.add(clean_href)
if len(seen) >= limit:
break
# Update progress
progress.update(collecting_task, description=f"[cyan]Collected {len(seen)} URLs for '{query}'...")
# Scroll to load more
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
time.sleep(per_scroll_wait)
# Check for stagnation
if len(seen) == initial_seen_count:
stagnant_rounds += 1
# Optional: Add a small wait if stagnating
if stagnant_rounds > 3:
time.sleep(2)
else:
stagnant_rounds = 0
last_count = len(seen)
final_urls = list(seen)[:limit]
rprint(f"[green]✓[/green] Found {len(final_urls)} unique video URLs for '{query}'")
return final_urls
def extract_video_metadata(page: Page, url: str) -> VideoRecord:
"""Extracts metadata from a TikTok video page."""
# Default values
desc = None
upload_date = None
author_name = None
view_count = like_count = comment_count = share_count = None
hashtags: List[str] = []
username = None
vid = None
try:
page.goto(url, wait_until="domcontentloaded", timeout=30000)
accept_cookies_if_any(page)
except PWTimeout:
rprint(f"[yellow]⚠[/yellow] Timeout loading {url}")
except Exception as e:
rprint(f"[red]✗[/red] Error loading {url}: {e}")
# 1) Try JSON-LD structured data
try:
for script in page.query_selector_all('script[type="application/ld+json"]'):
try:
script_content = script.inner_text()
if script_content:
data = json.loads(script_content)
else:
continue
except json.JSONDecodeError:
continue
except Exception:
continue
nodes = data if isinstance(data, list) else [data]
for n in nodes:
if not isinstance(n, dict):
continue
# Match VideoObject or SocialMediaPosting types
if n.get("@type") in ("VideoObject", "SocialMediaPosting"):
desc = n.get("description") or desc
# Prefer uploadDate, fallback to datePublished
upload_date = n.get("uploadDate") or n.get("datePublished") or upload_date
author = n.get("author")
if isinstance(author, dict):
author_name = author.get("name") or author_name
# Extract interaction counts
interaction_stats = n.get("interactionStatistic")
if isinstance(interaction_stats, list):
for stat in interaction_stats:
if not isinstance(stat, dict):
continue
itype = stat.get("interactionType", "")
# Convert count to int, handling potential strings like "1.2K"
count_str = str(stat.get("userInteractionCount", ""))
count = to_int_safe(count_str)
if not count:
continue
# Match interaction type case-insensitively
itype_lower = json.dumps(itype).lower()
if any(keyword in itype_lower for keyword in ["view", "watch"]):
view_count = view_count or count
elif "like" in itype_lower:
like_count = like_count or count
elif "comment" in itype_lower:
comment_count = comment_count or count
elif "share" in itype_lower:
share_count = share_count or count
except Exception as e:
rprint(f"[yellow]⚠[/yellow] Error parsing JSON-LD for {url}: {e}")
# 2) Meta tags fallback for description
try:
og_desc_elem = page.locator('meta[property="og:description"]').first
if og_desc_elem.count() > 0: # Check if element exists
og_desc = og_desc_elem.get_attribute("content")
if og_desc:
desc = desc or og_desc
except Exception as e:
rprint(f"[yellow]⚠[/yellow] Error parsing meta tags for {url}: {e}")
# 3) DOM selectors for hashtags
try:
# Find hashtag links
hashtag_links = page.query_selector_all("a[href*='/tag/']")
for a in hashtag_links:
text = (a.inner_text() or "").strip()
# Hashtags usually start with #, but let's be a bit flexible
if text and (text.startswith('#') or re.match(r'^[\w]+$', text)):
# Add without the # if present
tag = text[1:] if text.startswith('#') else text
hashtags.append(tag)
except Exception as e:
rprint(f"[yellow]⚠[/yellow] Error parsing hashtags for {url}: {e}")
# Extract username and video ID from URL (fallback if not in JSON-LD)
username, vid = parse_username_and_id_from_url(url)
# Calculate risk score based on description
score, matches = risk_score(desc or "")
return VideoRecord(
video_id=vid or "",
url=url,
username=username,
author_name=author_name,
description=desc,
upload_date=upload_date,
like_count=like_count,
comment_count=comment_count,
share_count=share_count,
view_count=view_count,
hashtags=sorted(set(hashtags)), # Deduplicate hashtags
risk_score=score,
risk_matches=matches,
)
# —————————
# I/O
# —————————
def write_outputs(
records: List[VideoRecord],
base: pathlib.Path,
do_screenshots: bool,
shots_dir: pathlib.Path
) -> None:
"""Writes collected records to JSONL and CSV files."""
jsonl_path = base.with_suffix('.jsonl')
csv_path = base.with_suffix('.csv')
# Write JSONL
try:
with jsonl_path.open('w', encoding='utf-8') as jf:
for record in records:
jf.write(json.dumps(record.to_dict(), ensure_ascii=False) + '\n')
rprint(f"[bold green]✓ Saved JSONL:[/] {jsonl_path}")
except Exception as e:
rprint(f"[bold red]✗ Failed to write JSONL:[/] {e}")
# Write CSV
if records:
fields = list(VideoRecord.__annotations__.keys())
try:
with csv_path.open('w', encoding='utf-8', newline='') as cf:
writer = csv.DictWriter(cf, fieldnames=fields)
writer.writeheader()
for record in records:
writer.writerow(record.to_dict())
rprint(f"[bold green]✓ Saved CSV:[/] {csv_path}")
except Exception as e:
rprint(f"[bold red]✗ Failed to write CSV:[/] {e}")
else:
rprint("[yellow]⚠ No data to write to CSV.[/]")
# —————————
# CLI
# —————————
@app.command()
def search(
keywords: List[str] = typer.Argument(..., help="One or more keywords to search on TikTok"),
limit: int = typer.Option(60, min=1, max=2000, help="Max total videos to collect (approx)"),
out: str = typer.Option("out/crot_dalam", help="Output basename (no extension)"),
headless: bool = typer.Option(True, help="Run headless browser"),
locale: str = typer.Option("en-US", help="Browser locale like en-US or id-ID"),
user_agent: Optional[str] = typer.Option(None, help="Custom User-Agent"),
proxy: Optional[str] = typer.Option(None, help="Proxy, e.g. http://user:pass@host:port"),
screenshot: bool = typer.Option(False, help="Save per-video page screenshot"),
per_keyword_limit: Optional[int] = typer.Option(None, help="Max videos per keyword; default shares --limit across all"),
):
"""
Search TikTok public UI for each KEYWORD and export JSONL/CSV (no API keys).
Refactored by Marcos Tolosa (github.com/marcostolosa).
"""
print_banner()
base_path = ensure_out(out)
screenshots_dir = base_path.parent / "screenshots"
if screenshot:
screenshots_dir.mkdir(parents=True, exist_ok=True)
rprint(f"[bold]CROT-DALAM[/] starting...")
rprint(f"Keywords: {keywords}")
rprint(f"Total Limit: ~{limit}")
rprint(f"Headless: {headless}")
rprint(f"Locale: {locale}")
if proxy:
rprint(f"Proxy: {proxy}")
collected_records: List[VideoRecord] = []
seen_video_urls: set[str] = set()
with sync_playwright() as pw:
browser, context = new_context(pw, headless=headless, locale=locale, user_agent=user_agent, proxy=proxy)
search_page = context.new_page()
try:
total_keywords = len(keywords)
for i, keyword in enumerate(keywords, 1):
rprint(f"\n[blue]--- Processing keyword ({i}/{total_keywords}): '{keyword}' ---[/]")
if len(collected_records) >= limit:
rprint("[yellow]⚠ Reached total limit, stopping search.[/]")
break
# Determine how many videos to fetch for this keyword
if per_keyword_limit:
keyword_cap = min(per_keyword_limit, limit - len(collected_records))
else:
# Distribute remaining limit across remaining keywords, but at least 1
remaining_keywords = total_keywords - i + 1
remaining_limit = limit - len(collected_records)
keyword_cap = max(1, remaining_limit // remaining_keywords)
# Ensure it doesn't exceed the absolute remaining limit
keyword_cap = min(keyword_cap, remaining_limit)
rprint(f"[blue]Targeting up to {keyword_cap} videos for '{keyword}'...[/]")
# Collect video URLs for this keyword
video_urls = search_collect_video_urls(search_page, keyword, keyword_cap)
# Process each video URL
for j, url in enumerate(video_urls, 1):
if url in seen_video_urls:
continue # Skip if already processed
if len(collected_records) >= limit:
rprint("[yellow]⚠ Reached total limit during processing, stopping.[/]")
break
rprint(f"[dim]Processing ({j}/{len(video_urls)} for '{keyword}'): {url}[/]")
seen_video_urls.add(url)
# Create a new page for each video to avoid state issues
video_page = context.new_page()
try:
# Extract metadata
record = extract_video_metadata(video_page, url)
record.keyword_searched = keyword # Attach the search keyword
# Take screenshot if requested
if screenshot and record.video_id:
try:
screenshot_path = screenshots_dir / f"{record.video_id}.png"
# Use full_page=False for potentially faster screenshots of just the visible area
video_page.screenshot(path=str(screenshot_path), full_page=True)
# rprint(f"[dim] Screenshot saved: {screenshot_path.name}[/]") # Optional verbose log
except Exception as e:
rprint(f"[red]✗[/red] Failed to take screenshot for {record.video_id}: {e}")
collected_records.append(record)
finally:
video_page.close()
# Brief pause between requests to be respectful
time.sleep(0.5)
except KeyboardInterrupt:
rprint("\n[yellow]⚠ Interrupted by user. Saving collected data...[/]")
except Exception as e:
rprint(f"\n[bold red]✗ An unexpected error occurred:[/] {e}")
finally:
# Cleanup browser resources
try:
search_page.close()
context.close()
browser.close()
except Exception as e:
rprint(f"[yellow]⚠ Error during cleanup:[/] {e}")
# Write final outputs
if collected_records:
rprint(f"\n[green]✅ Collection finished. Writing {len(collected_records)} records to files...[/]")
write_outputs(collected_records, base_path, screenshot, screenshots_dir)
# Console summary using Rich
summary_table = Table(title="CROT-DALAM Summary", show_header=True, header_style="bold magenta")
summary_table.add_column("Metric", style="dim")
summary_table.add_column("Value")
summary_table.add_row("Videos Collected", str(len(collected_records)))
summary_table.add_row("Keywords Searched", ", ".join(keywords))
if collected_records:
avg_risk = mean(record.risk_score for record in collected_records)
summary_table.add_row("Average Risk Score", f"{avg_risk:.2f}")
else:
summary_table.add_row("Average Risk Score", "N/A")
rprint(summary_table)
else:
rprint("\n[red]✗ No video records were collected.[/]")
if __name__ == "__main__":
try:
app()
except KeyboardInterrupt:
rprint("\n[red]✗ Exiting...[/]")
sys.exit(130)
except Exception as e:
rprint(f"\n[bold red]✗ Fatal error:[/] {e}")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment