Last active
May 17, 2020 10:57
-
-
Save marirs/e3450c1f5700d02452943a82ec2fcf5a to your computer and use it in GitHub Desktop.
Python Script to convert CEF to CSV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
import re2 as re | |
except: | |
import re | |
import csv | |
from typing import List, Dict | |
__all__ = ["cef_to_csv"] | |
def __convert_cef(data: str, header: Dict[str, bool]) -> list or None: | |
"""Convert a given cef line to csv list of lists | |
:param data: Correctly formatted CEF Line | |
:return: list of lists [ [headers...], [vals...]] | |
""" | |
if not data: | |
return None | |
csv_data = {} | |
if isinstance(data, str): | |
cef_header, cef_extension = data.rsplit('|', 1) | |
if cef_header: | |
try: | |
_, deviceVendor, deviceProduct, deviceVersion, signatureId, name, severity = cef_header.split( | |
'|') | |
except: | |
# bad CEF header | |
return None | |
else: | |
# CEF header not found | |
return None | |
for hd in [ | |
'deviceVendor', 'deviceProduct', 'deviceVersion', | |
'signatureId', 'severity', 'name' | |
]: | |
if hd not in header: | |
header[hd] = True | |
csv_data['deviceVendor'] = deviceVendor | |
csv_data['deviceProduct'] = deviceProduct | |
csv_data['deviceVersion'] = deviceVersion | |
csv_data['signatureId'] = signatureId | |
csv_data['severity'] = severity | |
csv_data['name'] = name | |
cef_extension = f' {cef_extension}' | |
keys = re.findall(r'\s\w*=', cef_extension) | |
values = [ | |
x.strip() | |
for x in re.sub('|'.join(keys), '|', cef_extension).split('|')[1:] | |
] | |
for i in range(len(keys)): | |
k = keys[i].replace('=', '').strip() | |
v = values[i] | |
if k not in header: | |
header[k] = True | |
csv_data[k] = v | |
return csv_data | |
else: | |
return None | |
def cef_to_csv(lines: List[str]) -> List[List]: | |
csv_header = {} | |
csv_data = [] | |
for line in lines: | |
row_dict = __convert_cef(line, csv_header) | |
if row_dict: | |
csv_data.append(row_dict) | |
# @ this point csv_data can be written to file using DictWriter | |
# Convert a list of dict to list of list | |
header = list(csv_header) | |
csv_result = [header] | |
for row in csv_data: | |
row_data = [] | |
for hd in header: | |
if hd in row: | |
row_data.append(row[hd]) | |
else: | |
# Empty value for this column | |
row_data.append('') | |
csv_result.append(row_data) | |
return csv_result | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment