Last active
October 14, 2021 13:27
-
-
Save buzzer-re/355f9e7c3d0899206352987c30cf0bd3 to your computer and use it in GitHub Desktop.
Simple VirusTotal API client class for Python projects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import requests | |
class VirusTotal: | |
VT_API = 'https://www.virustotal.com/api/v3' | |
def __init__(self, key = None, query = None, limit_query=5): | |
self.valid = True | |
self.last_err = '' | |
self.query = query | |
self.limit_query = limit_query | |
if not key or key == '': | |
self.__read_local() | |
else: | |
self.key = key | |
def __read_local(self) -> None: | |
try: | |
home = os.path.expanduser('~') | |
with open(home + "/.vt.toml", "r") as vtfd: | |
vttoml = vtfd.read().split('=') | |
if len(vttoml) == 2: | |
vttoml = vttoml[1][1:-2] | |
if len(vttoml) != 64: | |
self.valid = False | |
self.last_err = "Invalid API key saved at .vt.toml!" | |
else: | |
self.key = vttoml | |
except: | |
self.valid = False | |
self.last_err = "No usable API keys found for VirusTotal" | |
def run_query(self, query='') -> list: | |
if not query: | |
query = self.query | |
# res = self.iterator('/intelligence/search' ,params={'query': query}, limit=self.limit_query) | |
res = self.custom_call(f'/intelligence/search?query={query}&limit={self.limit_query}') | |
return res | |
def custom_call(self, path) -> dict: | |
# TODO: Add log | |
headers = { | |
'x-apikey': self.key, | |
} | |
retries = 5 | |
url = f'{self.VT_API}{path}' | |
for i in range(retries): | |
try: | |
res = requests.get(url, headers=headers) | |
if res.status_code == 200: | |
content_type = res.headers['Content-Type'] | |
data = res.content | |
res.close() | |
if 'application/json' in content_type: | |
res_json = json.loads(data.decode()) | |
return res_json.get('data', {}) | |
return data | |
except Exception as e: | |
print(e) | |
print(f"Error on requesting {url}, retrying...") | |
return {} | |
def get_relations(self, hash, relations = []): | |
''' | |
Return a dictionary with a relations | |
''' | |
res = {} | |
for relation in relations: | |
res[relation] = self.custom_call(f'/files/{hash}/{relation}') | |
return res | |
def read_sample(self, hash) -> bytes: | |
return self.custom_call(f'/files/{hash}/download') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment