|
#!/usr/bin/env python |
|
# -*- coding: utf-8 -*- |
|
|
|
|
|
import json |
|
import mwclient |
|
import os |
|
import re |
|
import requests |
|
import subprocess |
|
import sys |
|
from pathlib import Path |
|
from typing import Union |
|
from urllib.parse import urlparse |
|
from shlex import quote as shlex_quote |
|
|
|
|
|
""" cli opts """ |
|
CLI_SAST_CONFIG_FILE = "semgrep-ext-css-loading.yml" |
|
if len(sys.argv) > 1: |
|
if re.match(r".+\.y(a)?ml", sys.argv[1]): |
|
CLI_SAST_CONFIG_FILE = sys.argv[1] |
|
|
|
CLI_OUTPUT_MODE = None |
|
if len(sys.argv) > 2: |
|
if sys.argv[2] == "table": |
|
CLI_OUTPUT_MODE = "table" |
|
|
|
""" constants """ |
|
SITEMATRIX_API_SITE = "meta.wikimedia.org" |
|
|
|
SEARCH_NAMESPACE = 8 |
|
SEARCH_PATTERNS = ["intitle:\.js", "intitle:\.css"] |
|
|
|
ALLOWED_FILE_TYPES = ["javascript", "css"] |
|
ALLOWED_FILE_EXTS_PAT = ".+(\.js|\.css)$" |
|
|
|
FILE_CACHE_PATH = "./local_files" |
|
FILE_RECACHE = False |
|
|
|
SEMGREP_CMD = "semgrep" |
|
SEMGREP_CMD_OPTS = ["--metrics=off", "--timeout=30", "--config=" + CLI_SAST_CONFIG_FILE] |
|
|
|
OUTPUT_MODE = ( |
|
CLI_OUTPUT_MODE # 'table' or nothing, 'table' results in stats table report |
|
) |
|
DEBUG = False # limit various api searches, extra debug |
|
DEBUG_MAX = 25 |
|
|
|
|
|
def collect_project_urls() -> list: |
|
project_urls = [] |
|
|
|
site = mwclient.Site(SITEMATRIX_API_SITE) |
|
result = site.api("sitematrix", smlimit="max", smsiteprop="url", format="json") |
|
for lang in result["sitematrix"].values(): |
|
if isinstance(lang, int): |
|
continue |
|
|
|
if isinstance(lang, list): |
|
"""specials""" |
|
for projects in lang: |
|
if "url" in projects: |
|
project_urls.append(projects["url"]) |
|
|
|
if "site" in lang: |
|
for urls in lang["site"]: |
|
if "url" in urls: |
|
project_urls.append(urls["url"]) |
|
|
|
return project_urls |
|
|
|
|
|
def search_for_files(project_urls: list, namespace: int, patterns: list) -> list: |
|
files = [] |
|
count = 0 |
|
|
|
for url in project_urls: |
|
project_domain = urlparse(url).netloc |
|
site = mwclient.Site(project_domain) |
|
|
|
for pattern in patterns: |
|
try: |
|
result = site.api( |
|
"query", |
|
list="search", |
|
srnamespace=namespace, |
|
srsearch=pattern, |
|
srlimit="max", |
|
format="json", |
|
) |
|
except mwclient.errors.APIError: |
|
print("Error: APIError raised for: " + project_domain) |
|
continue |
|
except urllib3.exceptions.ReadTimeoutError: |
|
print("Error: ReadTimeoutError raised for: " + project_domain) |
|
continue |
|
|
|
if isinstance(result, dict) and "query" in result: |
|
if DEBUG is True and ( |
|
"searchinfo" in result["query"] |
|
and "totalhits" in result["query"]["searchinfo"] |
|
): |
|
print( |
|
("-" * 80) |
|
+ "\n" |
|
+ project_domain |
|
+ ": " |
|
+ str(result["query"]["searchinfo"]["totalhits"]) |
|
+ " files found\n" |
|
+ ("-" * 80) |
|
) |
|
if "search" in result["query"]: |
|
for titles in result["query"]["search"]: |
|
"""quick filter file ext check""" |
|
if re.match(ALLOWED_FILE_EXTS_PAT, titles["title"]): |
|
files.append([project_domain, titles["title"]]) |
|
count += 1 |
|
|
|
if DEBUG is True and count > DEBUG_MAX: |
|
break |
|
|
|
return files |
|
|
|
|
|
def fetch_file_sources(file_titles: list) -> None: |
|
Path(FILE_CACHE_PATH).mkdir(parents=True, exist_ok=True) |
|
[f.unlink() for f in Path(FILE_CACHE_PATH).glob("*") if f.is_file()] |
|
|
|
for file_data in file_titles: |
|
if len(file_data) == 2: |
|
site = mwclient.Site(file_data[0]) |
|
result = site.api( |
|
"query", |
|
titles=file_data[1], |
|
prop="revisions", |
|
rvprop="content", |
|
format="json", |
|
) |
|
if isinstance(result, dict) and "query" in result: |
|
if "pages" in result["query"]: |
|
for page_info in result["query"]["pages"].values(): |
|
if DEBUG is True: |
|
print( |
|
file_data[0] |
|
+ " : " |
|
+ page_info["title"] |
|
+ " : " |
|
+ page_info["revisions"][0]["contentmodel"] |
|
) |
|
if page_info["revisions"][0]["contentmodel"] in ALLOWED_FILE_TYPES: |
|
local_file_name = ( |
|
FILE_CACHE_PATH |
|
+ os.sep |
|
+ file_data[0] |
|
+ "-" |
|
+ page_info["title"].replace("/", "-") |
|
) |
|
|
|
with open(local_file_name, "w") as f: |
|
if page_info["revisions"][0]["*"] is not None: |
|
f.write(page_info["revisions"][0]["*"]) |
|
else: |
|
print( |
|
file_data[0] |
|
+ " : " |
|
+ page_info["title"] |
|
+ "not js or css..." |
|
) |
|
|
|
|
|
def scan_for_third_party_resources() -> Union[str, None]: |
|
try: |
|
valid_cmd = subprocess.run( |
|
["type", "-a", shlex_quote(SEMGREP_CMD)], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
) |
|
if valid_cmd.returncode != 0: |
|
raise ValueError( |
|
"Error: " + SEMGREP_CMD + " does not appear to be installed locally!" |
|
) |
|
except subprocess.CalledProcessError as err: |
|
print(str(err)) |
|
|
|
sast_tool_out = "" |
|
try: |
|
sast_cmd = [SEMGREP_CMD] + SEMGREP_CMD_OPTS |
|
if OUTPUT_MODE == "table": |
|
sast_cmd += ["--json"] |
|
sast_cmd += [FILE_CACHE_PATH] |
|
sast_tool_out = subprocess.run( |
|
sast_cmd, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
encoding="utf-8", |
|
) |
|
except subprocess.CalledProcessError as err: |
|
print(str(err)) |
|
finally: |
|
if OUTPUT_MODE == "table": |
|
return str(sast_tool_out.stdout) |
|
else: |
|
print(sast_tool_out.stdout) |
|
|
|
|
|
def _wiki_link(site: str, file: str) -> str: |
|
return "[[ https://" + site + "/wiki/" + file + " | " + file + " ]]" |
|
|
|
|
|
def print_results(sast_json_data: str) -> None: |
|
processed_data = {} |
|
|
|
"""generate markdown table (for Phab et al)""" |
|
output = "" |
|
output += "| Project | Code File | Ext Domain / Code \n" |
|
output += "| ------- | --------- | ----------------- \n" |
|
|
|
""" process sast json data """ |
|
sast_json_data = json.loads(sast_json_data) |
|
|
|
""" process stats for sast json results """ |
|
if "results" in sast_json_data: |
|
print(str(len(sast_json_data["results"])) + " results found!\n") |
|
for item in sast_json_data["results"]: |
|
if "path" in item and "extra" in item and "lines" in item["extra"]: |
|
file_path_prefix = FILE_CACHE_PATH[2:] + "/" |
|
file_separator = "-" |
|
|
|
path = item["path"][ |
|
item["path"].startswith(file_path_prefix) |
|
and len(file_path_prefix) : |
|
] |
|
sub_project, rest = path.split(".", 1) |
|
project, code_file = rest.split(file_separator, 1) |
|
project = sub_project + "." + project |
|
found_ext_url = re.search( |
|
"(?P<url>(https?|ftp|wss|file)://[^\s]+)", item["extra"]["lines"] |
|
) |
|
|
|
if found_ext_url: |
|
found_ext_url = found_ext_url.group("url") |
|
else: |
|
raw_code = item["extra"]["lines"].strip() |
|
found_ext_url = ( |
|
raw_code[:45] + "..." if len(raw_code) > 45 else raw_code |
|
) |
|
|
|
if project in processed_data and code_file in processed_data[project]: |
|
processed_data[project][code_file].append(found_ext_url) |
|
else: |
|
processed_data[project] = {} |
|
processed_data[project][code_file] = [found_ext_url] |
|
|
|
for k, v in processed_data.items(): |
|
output += ( |
|
"| " |
|
+ k |
|
+ " | " |
|
+ _wiki_link(k, next(iter(v))) |
|
+ " | `" |
|
+ ", ".join(v[next(iter(v))]).replace("\n", "") |
|
+ "`\n" |
|
) |
|
|
|
print(output + "\n") |
|
|
|
|
|
def init() -> None: |
|
if FILE_RECACHE is True: |
|
site_urls = collect_project_urls() |
|
files = search_for_files(site_urls, SEARCH_NAMESPACE, SEARCH_PATTERNS) |
|
fetch_file_sources(files) |
|
|
|
scan_results = scan_for_third_party_resources() |
|
|
|
if OUTPUT_MODE == "table": |
|
print_results(scan_results) |
|
|
|
|
|
if __name__ == "__main__": |
|
init() |