Last active
September 4, 2020 03:24
-
-
Save IvanaGyro/48fedaf37be964c72500e0958bfadfbb to your computer and use it in GitHub Desktop.
把OONI測試趨勢家長防護網的結果從資料庫中匯出至CSV
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import csv | |
| import json | |
| import pathlib | |
| import re | |
| import sqlite3 | |
| import sys | |
| import urllib | |
| from pprint import pprint | |
| from bs4 import BeautifulSoup | |
| DB_PATH = r'C:\Users\Username\AppData\Roaming\OONI Probe\ooni_home\db\main.sqlite3' | |
| OUTPUT_CSV = 'blocked_list.csv' | |
| RESULT_ID = 2 | |
| TREND_ZH_CATEGORIES = { | |
| "01": "性愛/成人", | |
| "02": "不合法或禁止的內容", | |
| "03": "女性內衣/泳衣", | |
| "04": "裸體", | |
| "05": "色情", | |
| "06": "性教育", | |
| "07": "部落格/網路通訊", | |
| "08": "聊天/即時通訊", | |
| "09": "電子郵件", | |
| "0A": "Proxy 匿名代理伺服器服務", | |
| "0B": "點對點 (P2P) 網路", | |
| "0C": "相片搜尋", | |
| "0D": "串流媒體", | |
| "0E": "個人/約會", | |
| "0F": "社群網路", | |
| "10": "墮胎", | |
| "11": "非法/可疑", | |
| "12": "異教/神秘儀式", | |
| "13": "違法毒品", | |
| "14": "不雅觀", | |
| "15": "暴力/仇恨/種族主義", | |
| "16": "武器/軍隊", | |
| "17": "酒精/香煙", | |
| "18": "拍賣", | |
| "19": "賭博", | |
| "1A": "遊戲", | |
| "1B": "購物", | |
| "1C": "網頁廣告", | |
| "1D": "尚未測試的網站", | |
| "1E": "勒索軟體", | |
| "1F": "新聞群組/論壇", | |
| "20": "共用服務", | |
| "21": "射擊俱樂部/狩獵", | |
| "22": "軟體下載", | |
| "24": "數位貨幣挖礦", | |
| "25": "間諜程式", | |
| "26": "廣告軟體", | |
| "27": "惡意程式共犯", | |
| "28": "病媒", | |
| "29": "C&C 伺服器", | |
| "2A": "惡意網域", | |
| "2B": "詐騙", | |
| "2C": "不安全的 IoT 連線", | |
| "80": "網路釣魚", | |
| "2D": "搜尋引擎", | |
| } | |
| TREND_EN_CATEGORIES = { | |
| "01": "Erotic / Mature", | |
| "02": "Illegal or Prohibited Content", | |
| "03": "Intimate Apparel / Swimsuit", | |
| "04": "Nudity", | |
| "05": "Pornography", | |
| "06": "Sex Education", | |
| "07": "Blogs / Web Communications", | |
| "08": "Chat / Instant Messaging", | |
| "09": "Email", | |
| "0A": "Proxy Avoidance and Anonymizers", | |
| "0B": "Peer-to-Peer (P2P) Networks", | |
| "0C": "Photo Searches", | |
| "0D": "Streaming Media", | |
| "0E": "Personals / Dating", | |
| "0F": "Social Networking", | |
| "10": "Abortion", | |
| "11": "Illegal / Questionable", | |
| "12": "Cult / Occult", | |
| "13": "Illegal Drugs", | |
| "14": "Tasteless", | |
| "15": "Violence / Hate / Racism", | |
| "16": "Weapons / Military", | |
| "17": "Alcohol / Tobacco", | |
| "18": "Auctions", | |
| "19": "Gambling", | |
| "1A": "Games", | |
| "1B": "Shopping", | |
| "1C": "Web Advertisement", | |
| "1D": "Untested Websites", | |
| "1E": "Ransomware", | |
| "1F": "Newsgroups/ Forums", | |
| "20": "Sharing Services", | |
| "21": "Gun clubs/ Hunting", | |
| "22": "Software Downloads", | |
| "24": "Coin Mining", | |
| "25": "Spyware", | |
| "26": "Adware", | |
| "27": "Malware Accomplice", | |
| "28": "Disease Vector", | |
| "29": "C&C Server", | |
| "2A": "Malicious Domain", | |
| "2B": "Scam", | |
| "2C": "Unsecure IoT Connections", | |
| "80": "Phishing", | |
| "2D": "Search Engine", | |
| } | |
| def localize_path(windows_path): | |
| if sys.platform == 'win32': | |
| return windows_path | |
| windows_path = pathlib.PureWindowsPath(windows_path) | |
| drive = windows_path.drive | |
| parts = list(windows_path.parts) | |
| if drive: | |
| parts[0] = f'/mnt/{drive[:-1].lower()}' | |
| elif windows_path.root: | |
| parts[0] = '/' | |
| return pathlib.Path(*parts) | |
| def find_meta(html_soup): | |
| title = html_soup.find('title') | |
| title = title.string if title is not None else '' | |
| description = html_soup.select_one('meta[name="description"]') | |
| description = description.get('content', '') if description else '' | |
| return title.strip(), description.strip() | |
| def request_meta(url): | |
| try: | |
| with urllib.request.urlopen(url) as res: | |
| html = res.read() | |
| except (urllib.error.HTTPError, urllib.error.URLError): | |
| print(f'Cannot get the content of {url}') | |
| return '', '' | |
| soup = BeautifulSoup(html, 'html.parser') | |
| return find_meta(soup) | |
| conn = sqlite3.connect(localize_path(DB_PATH)) | |
| c = conn.cursor() | |
| result = c.execute(f""" | |
| SELECT measurement_file_path FROM measurements | |
| WHERE is_anomaly = 1 | |
| AND result_id = {RESULT_ID} | |
| """) | |
| csv_table = [] | |
| for json_path, in result: | |
| with open(localize_path(json_path)) as fp: | |
| data = json.load(fp) | |
| url = data['input'] | |
| blocking = data['test_keys']['blocking'] | |
| html = data['test_keys']['requests'][0].get('response', {}).get('body') | |
| # maybe there was no response | |
| if html is None: | |
| title, description = request_meta(url) | |
| csv_table.append({ | |
| 'URL': url, | |
| 'Title': title, | |
| 'Description': description, | |
| 'Blocking': blocking, | |
| }) | |
| continue | |
| soup = BeautifulSoup(html, 'html.parser') | |
| # blocked by Trend Micro Titanium | |
| if ( | |
| blocking == 'http-diff' and | |
| soup.select_one('meta[id="__wrs_alert__"]') is not None | |
| ): | |
| match = re.search(r'CategoryGroup:"(.+?)",', html) | |
| if match is not None: | |
| category = match.group(1) | |
| if category == 'Server Lookup Error': | |
| zh_category = '未分級的網頁' | |
| en_category = 'Unrated Page' | |
| else: | |
| zh_category = TREND_ZH_CATEGORIES.get(category, '') | |
| en_category = TREND_EN_CATEGORIES.get(category, '') | |
| else: | |
| # error | |
| print(soup.prettify()) | |
| raise Exception('category not found') | |
| title, description = request_meta(url) | |
| csv_table.append({ | |
| 'URL': url, | |
| 'Title': title, | |
| 'Description': description, | |
| 'Blocking': 'Trend Micro Titanium', | |
| 'EN Category': en_category, | |
| 'ZH Category': zh_category, | |
| }) | |
| continue | |
| title, description = find_meta(soup) | |
| csv_table.append({ | |
| 'URL': url, | |
| 'Title': title, | |
| 'Description': description, | |
| 'Blocking': blocking, | |
| }) | |
| with open(OUTPUT_CSV, 'w') as csvfile: | |
| fieldnames = ['URL', 'Title', 'Description', 'Blocking', 'EN Category', 'ZH Category'] | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(sorted(csv_table, key=lambda x: x['URL'])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment