Created
June 25, 2022 08:43
-
-
Save Humoud/47ba78def91dd66c8eb2bbfd02b0b31b to your computer and use it in GitHub Desktop.
think it works
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from bson import json_util | |
from bson.objectid import ObjectId | |
import pymongo | |
class MongoDBHandler(): | |
""" | |
Binlex MongoDB Handler | |
""" | |
def __init__(self): | |
self.mongodb = pymongo.MongoClient() | |
self.cursor = self.mongodb['binlex'] | |
@staticmethod | |
def jsonify(data): | |
return json.loads(json.dumps(data, default=json_util.default)) | |
def query(self, query): | |
docs = self.cursor.find() | |
results = [] | |
for doc in docs: | |
results.append(self.jsonify(doc)) | |
return results | |
def query_doc_id(self, collection, id): | |
cursor = self.cursor[collection] | |
result = cursor.find_one({'_id': ObjectId(id)}) | |
return self.jsonify(result) | |
def stats_collection_count(self, collection): | |
cursor = self.cursor[collection] | |
count = cursor.count_documents({}) | |
return count | |
def upsert_trait(self, data, collection): | |
cursor = self.cursor[collection] | |
trait_id = cursor.update_one( | |
filter={ | |
'bytes_sha256': data['bytes_sha256'] | |
}, | |
update={ | |
"$set": data | |
}, | |
upsert=True | |
).upserted_id | |
if trait_id is None: | |
trait_id = cursor.find_one({ | |
'bytes_sha256': data['bytes_sha256'] | |
})['_id'] | |
return trait_id | |
def upsert_file_trait(self, data, trait_id): | |
cursor = self.cursor['files'] | |
files_id = cursor.update_one( | |
filter={ | |
'collection': data['collection'], | |
'mode': data['mode'], | |
'sha256': data['sha256'], | |
'trait_id': trait_id | |
}, | |
update={ | |
"$set": data | |
}, | |
upsert=True | |
).upserted_id | |
return files_id | |
class MongoQuery(): | |
def __init__(self): | |
self.mongodb = MongoDBHandler() | |
def process(self, body, sha256): | |
data = json.loads(body) | |
corpus = data['corpus'] | |
file_data = { | |
'corpus': data['corpus'], | |
'offset': data['offset'], | |
'sha256': sha256, | |
'mode': data['mode'] | |
} | |
del data['corpus'] | |
del data['offset'] | |
del data['mode'] | |
if corpus.startswith('default'): | |
trait_id = self.mongodb.upsert_trait(data, 'default') | |
# print('[*] default collection trait_id ' + str(trait_id)) | |
file_data['trait_id'] = trait_id | |
file_data['collection'] = 'default' | |
files_id = self.mongodb.upsert_file_trait(file_data, trait_id) | |
# if files_id is None: | |
# print('default files trait already exists') | |
# else: | |
# print('default collection inserted files_id ' + str(files_id)) | |
elif corpus.startswith('malware') is True: | |
trait_id = self.mongodb.upsert_trait(data, 'malware') | |
# print('[*] malware collection trait_id ' + str(trait_id)) | |
file_data['trait_id'] = trait_id | |
file_data['collection'] = 'malware' | |
files_id = self.mongodb.upsert_file_trait(file_data, trait_id) | |
# if files_id is None: | |
# print('malware files trait already exists') | |
# else: | |
# print('malware collection inserted files_id ' + str(files_id)) | |
elif corpus.startswith('goodware') is True: | |
trait_id = self.mongodb.upsert_trait(data, 'goodware') | |
# print('[*] goodware collection trait_id ' + str(trait_id)) | |
file_data['trait_id'] = trait_id | |
file_data['collection'] = 'goodware' | |
files_id = self.mongodb.upsert_file_trait(file_data, trait_id) | |
# if files_id is None: | |
# print('goodware files trait already exists') | |
# else: | |
# print('goodware collection inserted files_id ' + str(files_id)) | |
# else: | |
# print('message contained invalid corpus') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from MongoDBHandler import MongoQuery | |
import multiprocessing | |
from hashlib import sha256 | |
import pybinlex | |
import json | |
PROCESSES = 3 | |
def send_to_binlex(sample): | |
mongodb = MongoQuery() | |
pe = pybinlex.PE() | |
print(f'Processing: {sample}') | |
result = pe.read_file(sample) | |
if result: | |
# hash sample | |
f = open(sample, 'rb') | |
file_hash = sha256(f.read()).hexdigest() | |
f.close() | |
# generate traits | |
disassembler = pybinlex.Disassembler(pe) | |
disassembler.set_threads(4) | |
disassembler.set_corpus('malware') | |
disassembler.set_tags(['family']) | |
disassembler.disassemble() | |
traits = disassembler.get_traits() | |
# save traits to DB | |
for t in traits: | |
obj = json.dumps(t, indent=2) | |
mongodb.process(obj, file_hash) | |
return True | |
else: | |
return False | |
def main(): | |
samples = ['paths','here'] | |
with multiprocessing.Pool(PROCESSES) as pool: | |
# Process samples | |
results = pool.imap_unordered(send_to_binlex, samples) | |
print('Results:') | |
for r in results: | |
print(r) | |
print() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment