Last active
September 11, 2018 19:48
-
-
Save jo-makar/8b759d50d23cc4f00de555821eac1480 to your computer and use it in GitHub Desktop.
Suricata rule lexer/parser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Suricata rule lexer/parser | |
# | |
# http://www.dabeaz.com/ply/ply.html | |
# https://suricata.readthedocs.io/en/latest/rules/intro.html | |
# | |
# TODO Perhaps a better approach is to isolate the signature parsing code from the suricata source | |
# and create bindings for higher-level languages around that isolated code | |
# See https://github.com/OISF/suricata/tree/master/src/detect-parse.{c,h} SigParse() | |
import ply.lex, ply.yacc | |
import pprint | |
class Lexer: | |
tokens = ('KEYWD', 'ANY', 'ACTION', 'PROTO', | |
'DIR', 'VAR', 'STR', 'REGEX', 'NUM', 'DATE', 'URL', | |
'IPV4', 'CIDR4', 'IPV6', 'CIDR6', | |
) | |
literals = '![],():;' | |
t_ignore = ' \t' | |
# NB The order of the functions matters but not so for the regexes | |
def t_KEYWD(self, t): | |
r'[a-zA-Z]([a-zA-Z0-9_-]|\.|/)*' | |
if t.value == 'any': | |
t.type = 'ANY' | |
elif t.value in ['pass', 'drop', 'reject', 'alert']: | |
t.type = 'ACTION' | |
elif t.value in ['tcp', 'udp', 'icmp', 'ip', 'http', 'ftp', 'tls', 'smb', 'dns', 'dcerpc', | |
'ssh', 'smtp', 'imap', 'msn', 'modbus', 'dnp3', 'enip', 'nfs', 'ikev2', | |
'krb5', 'ntp', 'dhcp']: | |
t.type = 'PROTO' | |
elif reduce(lambda a,b: a or b, [c in './' for c in t.value]): | |
t.type = 'URL' | |
return t | |
t_DIR = r'->|<>' | |
t_VAR = r'\$[a-zA-Z][a-zA-Z0-9_]*' | |
t_STR = r'"([^"]|\\")*"' | |
t_REGEX = r'/([^/]|\\/)*/' | |
t_NUM = r'\d+' | |
t_DATE = r'\d{4}_\d{2}_\d{2}' | |
# To avoid making these regexes too complex, will validate in the parser instead | |
t_IPV4 = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}' | |
t_CIDR4 = t_IPV4 + '/\d{1,2}' | |
t_IPV6 = r'([a-fA-F0-9]{0,4}:){2,}[a-fA-F0-9]{0,4}' | |
t_CIDR6 = t_IPV6 + '/\d{1,3}' | |
def t_error(self, t): | |
raise Exception('unexpected token: %r' % t) | |
# TODO These rules need to be reevaluated | |
class Parser: | |
tokens = Lexer.tokens | |
def p_rule(self, p): | |
"rule : ACTION PROTO source source_port DIR dest dest_port '(' option_list ')'" | |
a, r, s, sp, i, d, dp = p[1:8] | |
p[0] = Rule(a, r, s, sp, i, d, dp, p[9]) | |
def p_source(self, p): | |
'source : location_expression' | |
p[0] = p[1] | |
def p_dest(self, p): | |
'dest : location_expression' | |
p[0] = p[1] | |
def p_location_expression(self, p): | |
''' | |
location_expression : ANY | |
| IPV4 | |
| CIDR4 | |
| IPV6 | |
| CIDR6 | |
| VAR | |
| location_grouping | |
| '!' location_expression | |
''' | |
p[0] = ''.join(map(str, p[1:])) | |
def p_location_grouping(self, p): | |
"location_grouping : '[' location_grouping_list ']'" | |
p[0] = ','.join(map(str, p[2])) | |
def p_location_grouping_list(self, p): | |
''' | |
location_grouping_list : location_expression | |
| location_grouping_list ',' location_expression | |
''' | |
if len(p) == 2: | |
p[0] = [p[1]] | |
else: | |
p[0] = p[1] + [p[3]] | |
def p_source_port(self, p): | |
'source_port : port_expression' | |
p[0] = p[1] | |
def p_dest_port(self, p): | |
'dest_port : port_expression' | |
p[0] = p[1] | |
def p_port_expression(self, p): | |
''' | |
port_expression : ANY | |
| NUM | |
| '[' NUM ':' NUM ']' | |
| '[' NUM ':' ']' | |
| '[' ':' NUM ']' | |
| '[' port_list ']' | |
| '!' port_expression | |
''' | |
p[0] = ''.join(map(str, p[1:])) | |
def p_port_list(self, p): | |
''' | |
port_list : NUM | |
| port_list ',' NUM | |
''' | |
if len(p) == 2: | |
p[0] = [p[1]] | |
else: | |
p[0] = p[1] + [p[2]] | |
def p_option_list(self, p): | |
''' | |
option_list : option | |
| option_list option | |
''' | |
if len(p) == 2: | |
p[0] = [p[1]] | |
else: | |
p[0] = p[1] + [p[2]] | |
def p_option(self, p): | |
''' | |
option : KEYWD ':' option_primitive_list ';' | |
| KEYWD ':' STR ';' | |
| KEYWD ':' REGEX ';' | |
| KEYWD ':' NUM ';' | |
| KEYWD ';' | |
''' | |
if len(p) == 3: | |
p[0] = (p[1],) | |
else: | |
p[0] = (p[1], p[3]) | |
def p_option_primitive_list(self, p): | |
''' | |
option_primitive_list : option_primitive | |
| option_primitive_list ',' option_primitive | |
''' | |
if len(p) == 2: | |
p[0] = [p[1]] | |
else: | |
p[0] = p[1] + [p[3]] | |
def p_option_primitive(self, p): | |
''' | |
option_primitive : KEYWD | |
| URL | |
| KEYWD KEYWD | |
| KEYWD NUM | |
| KEYWD DATE | |
| KEYWD URL | |
''' | |
p[0] = ' '.join(p[1:]) | |
def p_error(self, p): | |
raise Exception('unexpected syntax: %r' % p) | |
class Rule: | |
def __init__(self, action, proto, source, srcport, dir, dest, destport, options): | |
self.action = action | |
self.proto = proto | |
self.source = source | |
self.srcport = srcport | |
self.dir = dir | |
self.dest = dest | |
self.destport = destport | |
self.options = options | |
def __str__(self): | |
lines = pprint.pformat(self.options, width=150).splitlines(True) | |
return ' action = %s\n' % self.action + \ | |
' proto = %s\n' % self.proto + \ | |
' source = %s\n' % self.source + \ | |
' srcport = %s\n' % self.srcport + \ | |
' dir = %s\n' % self.dir + \ | |
' dest = %s\n' % self.dest + \ | |
'destport = %s\n' % self.destport + \ | |
' options = %s\n' % (' '*11).join(lines) | |
if __name__ == '__main__': | |
input = 'alert dns any any -> $HOME_NET any (msg:"EmergingThreats:Indicator-2829678"; content:"|00 01 00 01 00 00 00 00|"; offset:4; depth:8; content:"|00 10|"; distance:0; content:"powershell IEX"; distance:0; fast_pattern; metadata: former_category TROJAN; classtype:trojan-activity; sid: 26482; rev:1; metadata:affected_product Windows_XP_Vista_7_8_10_Server_32_64_Bit, attack_target Client_Endpoint, deployment Perimeter, signature_severity Major, created_at 2018_02_15, performance_impact Moderate, updated_at 2018_02_15;)' | |
lexer = ply.lex.lex(module=Lexer()) | |
parser = ply.yacc.yacc(module=Parser()) | |
print parser.parse(input) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment