Created
June 15, 2020 10:24
-
-
Save olliencc/2d028581c3452a466c58e26defa0b305 to your computer and use it in GitHub Desktop.
Parse CobaltStrike beacon metadata
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import M2Crypto | |
import requests | |
PRIVATE_KEY_TEMPLATE = "-----BEGIN PRIVATE KEY-----\n{}\n-----END PRIVATE KEY-----" | |
PUBLIC_KEY_TEMPLATE = "-----BEGIN PUBLIC KEY-----\n{}\n-----END PUBLIC KEY-----" | |
class Metadata(object): | |
""" | |
Class to represent a beacon Metadata object | |
""" | |
def __init__(self, data="", private_key="", public_key="", cs_version=4): | |
self.cs_version = cs_version | |
self.data = data | |
self.public_key = public_key | |
self.private_key = private_key | |
self.port = 0 | |
self.ciphertext = "" | |
self.charset = "" | |
self.charset_oem = "" | |
self.ver = "" | |
self.intz = "" | |
self.comp = "" | |
self.user = "" | |
self.pid = "" | |
self.bid = "" | |
self.barch = "" | |
self.raw_aes_keys = "" | |
self.aes_key = "" | |
self.hmac_key = "" | |
self.is64 = False | |
self.high_integrity = False | |
if data and len(data) != 128: | |
raise AttributeError('Metadata should be 128 bytes') | |
if data and private_key: | |
self.rsa_decrypt() | |
self.unpack() | |
def calculate_aes(self): | |
h = hashlib.sha256(self.raw_aes_keys) | |
digest = h.digest() | |
self.aes_key = digest[0:16] | |
self.hmac_key = digest[16:] | |
def rsa_decrypt(self): | |
pkey = M2Crypto.RSA.load_key_string(PRIVATE_KEY_TEMPLATE.format(self.private_key)) | |
plaintext = pkey.private_decrypt(self.data, M2Crypto.RSA.pkcs1_padding) | |
assert plaintext[0:4] == '\x00\x00\xBE\xEF' | |
self.data = StringIO.StringIO(plaintext[8:]) | |
def readInt(self, byteorder='>'): | |
fmt = byteorder + 'L' | |
return struct.unpack(fmt, self.data.read(struct.calcsize(fmt)))[0] | |
def readShort(self, byteorder='>'): | |
fmt = byteorder + 'H' | |
return struct.unpack(fmt, self.data.read(struct.calcsize(fmt)))[0] | |
def readByte(self): | |
fmt = 'b' | |
return struct.unpack(fmt, self.data.read(struct.calcsize(fmt)))[0] | |
def flag(self, b, s): | |
return b & s == s | |
def print_config(self): | |
print "raw AES key: %s" % self.raw_aes_keys[0:8].encode('hex') | |
print "raw HMAC key: %s" % self.raw_aes_keys[8:].encode('hex') | |
print "AES key: %s" % self.aes_key.encode('hex') | |
print "HMAC key: %s" % self.hmac_key.encode('hex') | |
print "ver: %s" % self.ver | |
print "host: %s" % self.intz | |
print "computer: %s" % self.comp | |
print "user: %s" % self.user | |
print "pid: %s" % self.pid | |
print "id: %s" % self.bid | |
print "barch: %s" % self.barch | |
print "is64: %s" % self.is64 | |
if self.cs_version > 3: | |
print "charset: %s" % self.charset | |
print "port: %s" % self.port | |
def unpack(self): | |
self.data.seek(0) | |
self.raw_aes_keys = self.data.read(16) | |
self.calculate_aes() | |
if self.cs_version < 4: | |
config = self.data.read().split('\t') | |
self.bid = config[0] | |
self.pid = config[1] | |
self.ver = config[2] | |
self.intz = config[3] | |
self.comp = config[4] | |
self.user = config[5] | |
self.is64 = config[6] | |
if config[7] == '1': | |
self.barch = 'x64' | |
else: | |
self.barch = 'x86' | |
return | |
self.charset = self.readShort('<') | |
self.charset_oem = self.readShort('<') | |
self.bid = self.readInt() | |
self.pid = self.readInt() | |
self.port = self.readShort() | |
b = self.readByte() | |
if self.flag(b, 1): | |
self.barch = "" | |
self.pid = "" | |
self.is64 = "" | |
elif self.flag(b, 2): | |
self.barch = "x64" | |
else: | |
self.barch = "x86" | |
self.is64 = int(self.flag(b, 4)) | |
self.high_integrity = self.flag(b, 8) | |
self.ver, self.intz, self.comp, self.user, self.proc = self.data.read().split('\t') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment