Skip to content

Instantly share code, notes, and snippets.

@av-gantimurov
Created February 13, 2023 05:21
Show Gist options
  • Save av-gantimurov/5bdeff74c9005acbddd56a399cb96ecf to your computer and use it in GitHub Desktop.
Save av-gantimurov/5bdeff74c9005acbddd56a399cb96ecf to your computer and use it in GitHub Desktop.
MalwareBazaar Downloader
"""
Небольшой скрипт для проверки есть ли файл на MalwareBazaar и загрузки в
указанную директорию. Для работы нужен `apikey` MalwareBazaar.
Если директория не указана, то только проверяет наличие файла и сохраняет
полученный ответ в текущую директорию.
Файлы загружает в виде архивов zip с паролем infected. Так отдает MalwareBazaar.
Можно указать список хешей для проверки - опция `-F`.
Поддерживает хеши `md5`, `sha1`, `sha256`.
Python 3.8+
Примеры
```
# help
python bazaar.py --help
# hashes from args
python bazaar.py --apikey 'APIKEY' 035c005e34504bd0228feaf4b54eea743894b7aa
# hashes from file
python bazaar.py --apikey 'APIKEY' -F hashlist.txt
# download files
python bazaar.py --apikey 'APIKEY' -F hashlist.txt -O output_directory
#MalwareBazaar #Python #utils
"""
import json
import logging
from pathlib import Path
from typing import List, Union
from urllib import parse, request
from urllib.error import HTTPError
class Bazaar:
def __init__(self, API_KEY: str, output_dir: str = "./download"):
if not API_KEY:
raise ValueError("API KEY isn't defined")
self._base_url = "https://mb-api.abuse.ch/api/v1/"
self._headers = {
"user-agent": "Falcon Sandbox",
"accept": "application/json",
"API-KEY": API_KEY,
}
self.output_dir = output_dir
self.cache = {}
self.logger = logging.getLogger("Bazaar")
def __contains__(self, hash: str):
result = self.details(hash)
return result is not None
def make_req(self, params: dict, is_raw: bool = False) -> List[dict]:
query = parse.urlencode(params).encode()
self.logger.debug("make request %s", query)
r = request.Request(self._base_url, data=query, headers=self._headers)
result = None
try:
with request.urlopen(r) as response:
status = response.status
if status != 200:
self.logger.warning(f"request {params} status {status}")
return None
raw = response.read()
self.logger.debug(raw)
if is_raw:
return raw
data = json.loads(raw.decode())
query_status = data.get("query_status", "")
if query_status != "ok":
self.logger.warning(f"request {params} query_status {query_status}")
return None
result = data["data"]
except HTTPError as e:
self.logger.error(e)
return result
def get_file(self, hash: str) -> bytes:
sha256 = self.details(hash)["sha256_hash"]
params = {"query": "get_file", "sha256_hash": sha256}
return self.make_req(params, is_raw=True)
def download(self, hash: str, outdir: str = None) -> str:
raw = self.get_file(hash)
if not raw:
return None
if not outdir:
outdir = self.output_dir
outdir = Path(outdir)
outdir.mkdir(exist_ok=True, parents=True)
target_path = outdir / hash
with open(target_path, "wb") as handle:
handle.write(raw)
return target_path
def details(self, hash: str) -> dict:
if hash in self.cache:
return self.cache[hash]
params = {"query": "get_info", "hash": hash}
result = self.make_req(params)
if result:
result = result[0]
for h in ("sha256_hash", "sha1_hash", "md5_hash"):
if h in result:
self.cache[result[h]] = result
return result
def get_tag(self, tag: str, limit: int = 50) -> List[dict]:
params = {"query": "get_taginfo", "tag": tag, "limit": limit}
return self.make_req(params)
if __name__ == "__main__":
import argparse
import pathlib
import re
def prepare_argparse() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Модуль позволяет запрашивать MalwareBazaar"
)
parser.add_argument("--apikey", required=True, help="MalwareBazaar apikey")
parser.add_argument(
"-O", "--outdir", default="", help="Директория для скачиваемых файлов"
)
parser.add_argument(
"hashes", help="File or directories to parse", metavar="HASH", nargs="*"
)
parser.add_argument(
"-F",
"--hashlist",
help="Файл со списком хешей",
metavar="FILE",
type=argparse.FileType("r"),
)
return parser
parser = prepare_argparse()
args = parser.parse_args()
bazaar = Bazaar(args.apikey, args.outdir)
outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
if args.hashlist:
hashes = args.hashlist.read().splitlines()
print(f"Загружено {len(hashes)} хешей из {args.hashlist.name}")
else:
hashes = args.hashes
if not hashes:
print("Пустой список хешей")
parser.print_help()
total = len(hashes)
for index, h in enumerate(hashes, start=1):
res = None
prefix = f"{index}/{total} {h}"
if (
re.match(r"^[0-9a-fA-F]{32}$", h)
or re.match(r"^[0-9a-fA-F]{40}$", h)
or re.match(r"^[0-9a-fA-F]{64}$", h)
):
# print(f"Check hash {h}")
pass
else:
print(f"{prefix} не является MD5|SHA1|SHA256")
continue
res = bazaar.details(h)
if not res:
print(f"{prefix} - не найден")
continue
with open(outdir / f"{h}.json", "w", encoding="utf-8") as r:
print(f"{prefix} - найден")
# print(f"{prefix} - найден, информация в {r.name}")
json.dump(res, r, ensure_ascii=False, indent=4)
if args.outdir:
outfile = bazaar.download(h)
# print(f"{prefix} - загружен в {outfile}")
print(f"{prefix} - загружен")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment