Last active
November 17, 2022 03:30
-
-
Save truekonrads/25fd9d732efb8897a95f6e5c2905c1ab to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
import ujson as json | |
except ImportError: | |
import json | |
import aiohttp, asyncio | |
import logging | |
logging.basicConfig( | |
format="{'time':'%(asctime)s', 'name': '%(name)s', \ | |
'level': '%(levelname)s', 'message': '%(message)s'}" | |
) | |
LOGGER = logging.getLogger(__file__) | |
QUEUE = asyncio.Queue(1000) | |
GLOBAL_BACKOFF = None | |
BACKOFF_LOCK = asyncio.Lock() | |
# async def query(queue:asyncio.Queue,semaphore,apikey): | |
async def query(queue: asyncio.Queue, apikey): | |
retry_hash = None | |
while True: | |
async with aiohttp.ClientSession() as session: | |
try: | |
if retry_hash is not None: | |
hash = retry_hash | |
retry_hash = None | |
else: | |
hash = queue.get_nowait() | |
except asyncio.QueueEmpty: | |
await asyncio.sleep(1) | |
continue | |
# async with semaphore: | |
url = f"https://www.virustotal.com/api/v3/search" | |
headers = {"x-apikey": apikey} | |
params = {"query": hash} | |
LOGGER.debug((url, params)) | |
try: | |
response = await session.get(url, headers=headers, params=params) | |
# LOGGER.debug(response.url,response.code) | |
if response.status == 200: | |
j = json.loads(await response.text()) | |
try: | |
stats = j["data"][0]["attributes"]["last_analysis_stats"] | |
except (AttributeError, IndexError): | |
stats = {} | |
stats["hash"] = hash | |
print(json.dumps(stats)) | |
async with BACKOFF_LOCK: | |
GLOBAL_BACKOFF = None | |
# print(json.dumps(j)) | |
elif response.reason == "TooManyRequestsError": | |
async with BACKOFF_LOCK: | |
if GLOBAL_BACKOFF is None: | |
GLOBAL_BACKOFF = 1 | |
else: | |
GLOBAL_BACKOFF = GLOBAL_BACKOFF * 2 | |
LOGGER.error( | |
f"TooManyRequests, backoff={GLOBAL_BACKOFF} {response}" | |
) | |
await asyncio.sleep(GLOBAL_BACKOFF) | |
elif response.reason == "TransientError": | |
retry_hash = hash | |
LOGGER.warn(f"retrying hash {hash} due to TransientError") | |
else: | |
LOGGER.error(f"Error procesing hash {hash}: {response}") | |
except Exception as e: | |
LOGGER.error(response, (await response.text())) | |
async def pump(fn, queue: asyncio.Queue): | |
with open(fn, "r") as f: | |
for hash in f: | |
try: | |
queue.put_nowait(hash.strip()) | |
except asyncio.QueueFull: | |
await asyncio.sleep(1) | |
async def main(): | |
import argparse | |
parser = argparse.ArgumentParser(description="Bulk VT lookup") | |
parser.add_argument( | |
"--debug", action="store_true", default=False, help="Enable debugging mode" | |
) | |
parser.add_argument("apikey", metavar="VT_API", help="VT API Key") | |
parser.add_argument( | |
"hashes", metavar="HASHES", help="file path for hashes to read and query" | |
) | |
parser.add_argument( | |
"--tasks", | |
metavar="TASKS", | |
help="Number of simultaenous tasks", | |
default=100, | |
type=int, | |
) | |
args = parser.parse_args() | |
# sem = asyncio.Semaphore(args.tasks) | |
if args.debug: | |
loglevel = getattr(logging, "DEBUG") | |
LOGGER.setLevel(loglevel) | |
tasks = [] | |
try: | |
async with asyncio.TaskGroup() as tg: | |
tasks = [tg.create_task(pump(args.hashes, QUEUE))] | |
for _ in range(0, args.tasks): | |
tasks.append( | |
# tg.create_task(query(QUEUE,sem,args.apikey)) | |
tg.create_task(query(QUEUE, args.apikey)) | |
) | |
except (asyncio.exceptions.CancelledError): | |
#Swallow this exception - this would occur if you hit Ctrl-C | |
pass | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment