Created
February 13, 2023 05:21
-
-
Save av-gantimurov/5bdeff74c9005acbddd56a399cb96ecf to your computer and use it in GitHub Desktop.
MalwareBazaar Downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Небольшой скрипт для проверки есть ли файл на MalwareBazaar и загрузки в | |
указанную директорию. Для работы нужен `apikey` MalwareBazaar. | |
Если директория не указана, то только проверяет наличие файла и сохраняет | |
полученный ответ в текущую директорию. | |
Файлы загружает в виде архивов zip с паролем infected. Так отдает MalwareBazaar. | |
Можно указать список хешей для проверки - опция `-F`. | |
Поддерживает хеши `md5`, `sha1`, `sha256`. | |
Python 3.8+ | |
Примеры | |
``` | |
# help | |
python bazaar.py --help | |
# hashes from args | |
python bazaar.py --apikey 'APIKEY' 035c005e34504bd0228feaf4b54eea743894b7aa | |
# hashes from file | |
python bazaar.py --apikey 'APIKEY' -F hashlist.txt | |
# download files | |
python bazaar.py --apikey 'APIKEY' -F hashlist.txt -O output_directory | |
#MalwareBazaar #Python #utils | |
""" | |
import json | |
import logging | |
from pathlib import Path | |
from typing import List, Union | |
from urllib import parse, request | |
from urllib.error import HTTPError | |
class Bazaar: | |
def __init__(self, API_KEY: str, output_dir: str = "./download"): | |
if not API_KEY: | |
raise ValueError("API KEY isn't defined") | |
self._base_url = "https://mb-api.abuse.ch/api/v1/" | |
self._headers = { | |
"user-agent": "Falcon Sandbox", | |
"accept": "application/json", | |
"API-KEY": API_KEY, | |
} | |
self.output_dir = output_dir | |
self.cache = {} | |
self.logger = logging.getLogger("Bazaar") | |
def __contains__(self, hash: str): | |
result = self.details(hash) | |
return result is not None | |
def make_req(self, params: dict, is_raw: bool = False) -> List[dict]: | |
query = parse.urlencode(params).encode() | |
self.logger.debug("make request %s", query) | |
r = request.Request(self._base_url, data=query, headers=self._headers) | |
result = None | |
try: | |
with request.urlopen(r) as response: | |
status = response.status | |
if status != 200: | |
self.logger.warning(f"request {params} status {status}") | |
return None | |
raw = response.read() | |
self.logger.debug(raw) | |
if is_raw: | |
return raw | |
data = json.loads(raw.decode()) | |
query_status = data.get("query_status", "") | |
if query_status != "ok": | |
self.logger.warning(f"request {params} query_status {query_status}") | |
return None | |
result = data["data"] | |
except HTTPError as e: | |
self.logger.error(e) | |
return result | |
def get_file(self, hash: str) -> bytes: | |
sha256 = self.details(hash)["sha256_hash"] | |
params = {"query": "get_file", "sha256_hash": sha256} | |
return self.make_req(params, is_raw=True) | |
def download(self, hash: str, outdir: str = None) -> str: | |
raw = self.get_file(hash) | |
if not raw: | |
return None | |
if not outdir: | |
outdir = self.output_dir | |
outdir = Path(outdir) | |
outdir.mkdir(exist_ok=True, parents=True) | |
target_path = outdir / hash | |
with open(target_path, "wb") as handle: | |
handle.write(raw) | |
return target_path | |
def details(self, hash: str) -> dict: | |
if hash in self.cache: | |
return self.cache[hash] | |
params = {"query": "get_info", "hash": hash} | |
result = self.make_req(params) | |
if result: | |
result = result[0] | |
for h in ("sha256_hash", "sha1_hash", "md5_hash"): | |
if h in result: | |
self.cache[result[h]] = result | |
return result | |
def get_tag(self, tag: str, limit: int = 50) -> List[dict]: | |
params = {"query": "get_taginfo", "tag": tag, "limit": limit} | |
return self.make_req(params) | |
if __name__ == "__main__": | |
import argparse | |
import pathlib | |
import re | |
def prepare_argparse() -> argparse.ArgumentParser: | |
parser = argparse.ArgumentParser( | |
description="Модуль позволяет запрашивать MalwareBazaar" | |
) | |
parser.add_argument("--apikey", required=True, help="MalwareBazaar apikey") | |
parser.add_argument( | |
"-O", "--outdir", default="", help="Директория для скачиваемых файлов" | |
) | |
parser.add_argument( | |
"hashes", help="File or directories to parse", metavar="HASH", nargs="*" | |
) | |
parser.add_argument( | |
"-F", | |
"--hashlist", | |
help="Файл со списком хешей", | |
metavar="FILE", | |
type=argparse.FileType("r"), | |
) | |
return parser | |
parser = prepare_argparse() | |
args = parser.parse_args() | |
bazaar = Bazaar(args.apikey, args.outdir) | |
outdir = Path(args.outdir) | |
outdir.mkdir(parents=True, exist_ok=True) | |
if args.hashlist: | |
hashes = args.hashlist.read().splitlines() | |
print(f"Загружено {len(hashes)} хешей из {args.hashlist.name}") | |
else: | |
hashes = args.hashes | |
if not hashes: | |
print("Пустой список хешей") | |
parser.print_help() | |
total = len(hashes) | |
for index, h in enumerate(hashes, start=1): | |
res = None | |
prefix = f"{index}/{total} {h}" | |
if ( | |
re.match(r"^[0-9a-fA-F]{32}$", h) | |
or re.match(r"^[0-9a-fA-F]{40}$", h) | |
or re.match(r"^[0-9a-fA-F]{64}$", h) | |
): | |
# print(f"Check hash {h}") | |
pass | |
else: | |
print(f"{prefix} не является MD5|SHA1|SHA256") | |
continue | |
res = bazaar.details(h) | |
if not res: | |
print(f"{prefix} - не найден") | |
continue | |
with open(outdir / f"{h}.json", "w", encoding="utf-8") as r: | |
print(f"{prefix} - найден") | |
# print(f"{prefix} - найден, информация в {r.name}") | |
json.dump(res, r, ensure_ascii=False, indent=4) | |
if args.outdir: | |
outfile = bazaar.download(h) | |
# print(f"{prefix} - загружен в {outfile}") | |
print(f"{prefix} - загружен") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment