Skip to content

Instantly share code, notes, and snippets.

@truekonrads
Last active November 17, 2022 03:30
Show Gist options
  • Save truekonrads/25fd9d732efb8897a95f6e5c2905c1ab to your computer and use it in GitHub Desktop.
Save truekonrads/25fd9d732efb8897a95f6e5c2905c1ab to your computer and use it in GitHub Desktop.
try:
import ujson as json
except ImportError:
import json
import aiohttp, asyncio
import logging
logging.basicConfig(
format="{'time':'%(asctime)s', 'name': '%(name)s', \
'level': '%(levelname)s', 'message': '%(message)s'}"
)
LOGGER = logging.getLogger(__file__)
QUEUE = asyncio.Queue(1000)
GLOBAL_BACKOFF = None
BACKOFF_LOCK = asyncio.Lock()
# async def query(queue:asyncio.Queue,semaphore,apikey):
async def query(queue: asyncio.Queue, apikey):
retry_hash = None
while True:
async with aiohttp.ClientSession() as session:
try:
if retry_hash is not None:
hash = retry_hash
retry_hash = None
else:
hash = queue.get_nowait()
except asyncio.QueueEmpty:
await asyncio.sleep(1)
continue
# async with semaphore:
url = f"https://www.virustotal.com/api/v3/search"
headers = {"x-apikey": apikey}
params = {"query": hash}
LOGGER.debug((url, params))
try:
response = await session.get(url, headers=headers, params=params)
# LOGGER.debug(response.url,response.code)
if response.status == 200:
j = json.loads(await response.text())
try:
stats = j["data"][0]["attributes"]["last_analysis_stats"]
except (AttributeError, IndexError):
stats = {}
stats["hash"] = hash
print(json.dumps(stats))
async with BACKOFF_LOCK:
GLOBAL_BACKOFF = None
# print(json.dumps(j))
elif response.reason == "TooManyRequestsError":
async with BACKOFF_LOCK:
if GLOBAL_BACKOFF is None:
GLOBAL_BACKOFF = 1
else:
GLOBAL_BACKOFF = GLOBAL_BACKOFF * 2
LOGGER.error(
f"TooManyRequests, backoff={GLOBAL_BACKOFF} {response}"
)
await asyncio.sleep(GLOBAL_BACKOFF)
elif response.reason == "TransientError":
retry_hash = hash
LOGGER.warn(f"retrying hash {hash} due to TransientError")
else:
LOGGER.error(f"Error procesing hash {hash}: {response}")
except Exception as e:
LOGGER.error(response, (await response.text()))
async def pump(fn, queue: asyncio.Queue):
with open(fn, "r") as f:
for hash in f:
try:
queue.put_nowait(hash.strip())
except asyncio.QueueFull:
await asyncio.sleep(1)
async def main():
import argparse
parser = argparse.ArgumentParser(description="Bulk VT lookup")
parser.add_argument(
"--debug", action="store_true", default=False, help="Enable debugging mode"
)
parser.add_argument("apikey", metavar="VT_API", help="VT API Key")
parser.add_argument(
"hashes", metavar="HASHES", help="file path for hashes to read and query"
)
parser.add_argument(
"--tasks",
metavar="TASKS",
help="Number of simultaenous tasks",
default=100,
type=int,
)
args = parser.parse_args()
# sem = asyncio.Semaphore(args.tasks)
if args.debug:
loglevel = getattr(logging, "DEBUG")
LOGGER.setLevel(loglevel)
tasks = []
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(pump(args.hashes, QUEUE))]
for _ in range(0, args.tasks):
tasks.append(
# tg.create_task(query(QUEUE,sem,args.apikey))
tg.create_task(query(QUEUE, args.apikey))
)
except (asyncio.exceptions.CancelledError):
#Swallow this exception - this would occur if you hit Ctrl-C
pass
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment