Skip to content

Instantly share code, notes, and snippets.

@tuckner
Last active January 29, 2019 16:32
Show Gist options
  • Save tuckner/9e1bd5f5d5d628bde7f427d1e596a4c2 to your computer and use it in GitHub Desktop.
Save tuckner/9e1bd5f5d5d628bde7f427d1e596a4c2 to your computer and use it in GitHub Desktop.
import re
from .base import SingleTextQueryBackend
from .exceptions import NotSupportedError
class AzureLogAnalyticsBackend(SingleTextQueryBackend):
"""Converts Sigma rule into Azure Log Analytics Queries."""
identifier = "ala"
active = True
reEscape = re.compile('("|\\\\(?![*?]))')
reClear = None
andToken = " and "
orToken = " or "
notToken = "not "
subExpression = "(%s)"
listExpression = "(%s)"
listSeparator = ", "
valueExpression = "\"%s\""
nullExpression = "isnull(%s)"
notNullExpression = "isnotnull(%s)"
mapExpression = "%s == %s"
mapListsSpecialHandling = True
mapListValueExpression = "%s in %s"
def __init__(self, *args, **kwargs):
"""Initialize field mappings"""
super().__init__(*args, **kwargs)
def id_mapping(self, src):
"""Identity mapping, source == target field name"""
return src
def default_value_mapping(self, val):
op = "=="
if type(val) == str and "*" in val[1:-1]: # value contains * inside string - use regex match
op = "matches regex"
val = re.sub('(\\\\(?![*?]))', '\\\\\\\\\\\\\g<1>', val)
val = re.sub('([".^$])', '\\\\\\\\\g<1>', val)
val = re.sub('\\*', '.*', val)
val = re.sub('\\?', '.', val)
elif type(val) == str: # value possibly only starts and/or ends with *, use prefix/postfix match
if val.endswith("*") and val.startswith("*"):
op = "contains"
val = self.cleanValue(val[1:-1])
elif val.endswith("*"):
op = "startswith"
val = self.cleanValue(val[:-1])
elif val.startswith("*"):
op = "endswith"
val = self.cleanValue(val[1:])
return "%s \"%s\"" % (op, val)
def generate(self, sigmaparser):
self.table = None
try:
self.product = sigmaparser.parsedyaml['logsource']['product']
self.service = sigmaparser.parsedyaml['logsource']['service']
except KeyError:
self.product = None
self.service = None
return super().generate(sigmaparser)
def generateBefore(self, parsed):
if self.table is None:
raise NotSupportedError("No table could be determined from Sigma rule")
return "%s | where " % self.table
def generateMapItemNode(self, node):
"""
Azure Log Analytics queries refer to event tables instead of Windows logging event identifiers. This method catches conditions that refer to this field
and creates an appropriate table reference.
"""
key, value = node
if type(value) == list: # handle map items with values list like multiple OR-chained conditions
return self.generateORNode(
[(key, v) for v in value]
)
elif key == "EventID": # EventIDs are not reflected in condition but in table selection
if self.product == "windows":
self.table = "SecurityEvent"
elif type(value) in (str, int): # default value processing
mapping = (key, self.default_value_mapping)
if len(mapping) == 1:
mapping = mapping[0]
if type(mapping) == str:
return mapping
elif callable(mapping):
conds = mapping(key, value)
return self.generateSubexpressionNode(
self.generateANDNode(
[cond for cond in mapping(key, value)]
)
)
elif len(mapping) == 2:
result = list()
for mapitem, val in zip(mapping, node): # iterate mapping and mapping source value synchronously over key and value
if type(mapitem) == str:
result.append(mapitem)
elif callable(mapitem):
result.append(mapitem(val))
return "{} {}".format(*result)
else:
raise TypeError("Backend does not support map values of type " + str(type(value)))
return super().generateMapItemNode(node)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment