Last active
August 13, 2021 23:28
-
-
Save moohax/0ff0593691d3fab4877a14aae893938b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Generated by counterfit # | |
# code modified from https://github.com/monoxgas/FlyingAFalseFlag/blob/256197b78a8140d15df6e18b3221b637b5c3490a/Addendum/addendum.py | |
import os | |
import re | |
import json | |
import time | |
import requests | |
import numpy as np | |
import tqdm | |
import uuid | |
from zipfile import ZipFile | |
from counterfit.core import config | |
from counterfit.core.targets import ArtTarget | |
# Don't forget to add your headers and login information # | |
# Sometimes it will not require the X-Recaptcha-Response header. If it is not available in the headers, comment it out. | |
class Virustotal(ArtTarget): | |
model_name = "virustotal" | |
model_data_type = "pe" | |
model_endpoint = "https://virustotal.com" | |
model_input_shape = (1,) | |
model_output_classes = [0, 1] | |
X = [] | |
def __init__(self): | |
# Data Information | |
self.zip_info = [] | |
self.encryption_password = b'infected' | |
self.sample_endpoint = 'https://mlsec.io/static/MLSEC_2021_malware.zip' | |
self.sample_input_path = f"{config.targets_path}/{self.model_name}/mlsec_malware_samples.zip" | |
# Download the samples | |
if not os.path.exists(self.sample_input_path): | |
self.download_samples() | |
print( | |
f"\n[-] scanning malware sample info from {self.sample_input_path}\n") | |
with ZipFile(self.sample_input_path) as thezip: | |
thezip.setpassword(self.encryption_password) | |
for zipinfo in tqdm.tqdm(thezip.infolist()): | |
self.zip_info.append(zipinfo) | |
# Load samples into X | |
self.X = self.read_input_data() | |
# VT Information | |
self.session = requests.session() | |
self.scantime = 90 | |
self.headers = { | |
"Accept": "application/json", | |
"Accept-Ianguage": "en-US,en;q=0.9,es;q=0.8", | |
"X-Tool": "vt-ui-main", | |
"X-App-Version": "v1x37x1", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36 Edg/92.0.902.67", | |
"X-Recaptcha-Response": "", | |
"X-Vt-Anti-Abuse-Header": "", | |
} | |
if not self.login('', ''): | |
print('[+] Login failed!') | |
return | |
self.headers["X-Session-Hash"] = self.session.cookies['VT_SESSION_HASH'] | |
def login(self, username, password): | |
login_json = { | |
"data": { | |
"forever": True, | |
"password": password, | |
"user_id": username | |
} | |
} | |
login_response = self.session.post( | |
f"{self.model_endpoint}/ui/signin", headers=self.headers, json=login_json) | |
print(login_response) | |
return (login_response.status_code == 200) | |
def download_samples(self): | |
response = requests.get(self.sample_endpoint) | |
if response.ok: | |
print( | |
f"\n[+] Successfully downloaded malware samples to {self.sample_input_path}\n") | |
with open(self.sample_input_path, "wb") as samples: | |
samples.write(response.content) | |
else: | |
print( | |
f"\n[+] Failed to download malware samples: {response.status_code}\n") | |
def read_input_data(self): | |
out = [] | |
with ZipFile(self.sample_input_path) as thezip: | |
for i in range(len(self.zip_info)): | |
with thezip.open(self.zip_info[i], pwd=self.encryption_password) as thefile: | |
out.append(thefile.read()) | |
return out | |
def upload_file(self, file_name, file_data): | |
upload_url_response = requests.get( | |
f"{self.model_endpoint}/ui/files/upload_url", headers=self.headers) | |
try: | |
upload_url = upload_url_response.json()['data'] | |
except: | |
print('[!] Failed to get upload URL') | |
return False | |
files = { | |
'file': (file_name, file_data), | |
'filename': (None, file_name) | |
} | |
upload_response = requests.post( | |
upload_url, files=files, headers=self.headers) | |
try: | |
file_id = upload_response.json()['data']['id'] | |
print(f'[+] File id is: {file_id}') | |
except: | |
print('[!] Failed to post file') | |
return False | |
return file_id | |
def get_file_analysis(self, file_id): | |
response = requests.get( | |
f"{self.model_endpoint}/ui/analyses/{file_id}", headers=self.headers) | |
try: | |
status = response.json()['data']['attributes']['status'] | |
stats = response.json()['data']['attributes']['stats'] | |
results = response.json()['data']['attributes']['results'] | |
except: | |
status, stats, results = None, None, None | |
return status, stats, results | |
def set_attack_samples(self, index=0): | |
# JIT loading of samples | |
if hasattr(index, "__iter__"): | |
# list of indices | |
to_fetch = index | |
else: | |
to_fetch = [index] | |
out = [] | |
for i in to_fetch: | |
out.append(self.X[i]) | |
self.active_attack.sample_index = index | |
self.active_attack.samples = out | |
def __call__(self, x): | |
scores = [] | |
for sample in x: | |
fname = uuid.uuid4().hex | |
sample_id = self.upload_file(fname, x[0]) | |
time_left = self.scantime | |
while time_left: | |
status, stats, results = self.get_file_analysis(sample_id) | |
if 'completed' not in status or not results: | |
time.sleep(15) | |
time_left -= 15 | |
else: | |
print(stats) | |
break | |
if not time_left: | |
print('[!] Timeout hit waiting for analysis to finish') | |
scores.append([0.0, 1.0]) | |
continue | |
else: | |
try: | |
score = float(re.findall( | |
r'(\d+)', results['MAX']['result'])[0])/100 | |
print(score) | |
scores.append([1-score, score]) | |
except: | |
print('[!] Failed to get score!') | |
scores.append([0.0, 1.0]) | |
return np.array(scores) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment