Skip to content

Instantly share code, notes, and snippets.

@jo-makar
Last active September 11, 2018 19:48
Show Gist options
  • Save jo-makar/8b759d50d23cc4f00de555821eac1480 to your computer and use it in GitHub Desktop.
Save jo-makar/8b759d50d23cc4f00de555821eac1480 to your computer and use it in GitHub Desktop.
Suricata rule lexer/parser
# Suricata rule lexer/parser
#
# http://www.dabeaz.com/ply/ply.html
# https://suricata.readthedocs.io/en/latest/rules/intro.html
#
# TODO Perhaps a better approach is to isolate the signature parsing code from the suricata source
# and create bindings for higher-level languages around that isolated code
# See https://github.com/OISF/suricata/tree/master/src/detect-parse.{c,h} SigParse()
import ply.lex, ply.yacc
import pprint
class Lexer:
tokens = ('KEYWD', 'ANY', 'ACTION', 'PROTO',
'DIR', 'VAR', 'STR', 'REGEX', 'NUM', 'DATE', 'URL',
'IPV4', 'CIDR4', 'IPV6', 'CIDR6',
)
literals = '![],():;'
t_ignore = ' \t'
# NB The order of the functions matters but not so for the regexes
def t_KEYWD(self, t):
r'[a-zA-Z]([a-zA-Z0-9_-]|\.|/)*'
if t.value == 'any':
t.type = 'ANY'
elif t.value in ['pass', 'drop', 'reject', 'alert']:
t.type = 'ACTION'
elif t.value in ['tcp', 'udp', 'icmp', 'ip', 'http', 'ftp', 'tls', 'smb', 'dns', 'dcerpc',
'ssh', 'smtp', 'imap', 'msn', 'modbus', 'dnp3', 'enip', 'nfs', 'ikev2',
'krb5', 'ntp', 'dhcp']:
t.type = 'PROTO'
elif reduce(lambda a,b: a or b, [c in './' for c in t.value]):
t.type = 'URL'
return t
t_DIR = r'->|<>'
t_VAR = r'\$[a-zA-Z][a-zA-Z0-9_]*'
t_STR = r'"([^"]|\\")*"'
t_REGEX = r'/([^/]|\\/)*/'
t_NUM = r'\d+'
t_DATE = r'\d{4}_\d{2}_\d{2}'
# To avoid making these regexes too complex, will validate in the parser instead
t_IPV4 = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
t_CIDR4 = t_IPV4 + '/\d{1,2}'
t_IPV6 = r'([a-fA-F0-9]{0,4}:){2,}[a-fA-F0-9]{0,4}'
t_CIDR6 = t_IPV6 + '/\d{1,3}'
def t_error(self, t):
raise Exception('unexpected token: %r' % t)
# TODO These rules need to be reevaluated
class Parser:
tokens = Lexer.tokens
def p_rule(self, p):
"rule : ACTION PROTO source source_port DIR dest dest_port '(' option_list ')'"
a, r, s, sp, i, d, dp = p[1:8]
p[0] = Rule(a, r, s, sp, i, d, dp, p[9])
def p_source(self, p):
'source : location_expression'
p[0] = p[1]
def p_dest(self, p):
'dest : location_expression'
p[0] = p[1]
def p_location_expression(self, p):
'''
location_expression : ANY
| IPV4
| CIDR4
| IPV6
| CIDR6
| VAR
| location_grouping
| '!' location_expression
'''
p[0] = ''.join(map(str, p[1:]))
def p_location_grouping(self, p):
"location_grouping : '[' location_grouping_list ']'"
p[0] = ','.join(map(str, p[2]))
def p_location_grouping_list(self, p):
'''
location_grouping_list : location_expression
| location_grouping_list ',' location_expression
'''
if len(p) == 2:
p[0] = [p[1]]
else:
p[0] = p[1] + [p[3]]
def p_source_port(self, p):
'source_port : port_expression'
p[0] = p[1]
def p_dest_port(self, p):
'dest_port : port_expression'
p[0] = p[1]
def p_port_expression(self, p):
'''
port_expression : ANY
| NUM
| '[' NUM ':' NUM ']'
| '[' NUM ':' ']'
| '[' ':' NUM ']'
| '[' port_list ']'
| '!' port_expression
'''
p[0] = ''.join(map(str, p[1:]))
def p_port_list(self, p):
'''
port_list : NUM
| port_list ',' NUM
'''
if len(p) == 2:
p[0] = [p[1]]
else:
p[0] = p[1] + [p[2]]
def p_option_list(self, p):
'''
option_list : option
| option_list option
'''
if len(p) == 2:
p[0] = [p[1]]
else:
p[0] = p[1] + [p[2]]
def p_option(self, p):
'''
option : KEYWD ':' option_primitive_list ';'
| KEYWD ':' STR ';'
| KEYWD ':' REGEX ';'
| KEYWD ':' NUM ';'
| KEYWD ';'
'''
if len(p) == 3:
p[0] = (p[1],)
else:
p[0] = (p[1], p[3])
def p_option_primitive_list(self, p):
'''
option_primitive_list : option_primitive
| option_primitive_list ',' option_primitive
'''
if len(p) == 2:
p[0] = [p[1]]
else:
p[0] = p[1] + [p[3]]
def p_option_primitive(self, p):
'''
option_primitive : KEYWD
| URL
| KEYWD KEYWD
| KEYWD NUM
| KEYWD DATE
| KEYWD URL
'''
p[0] = ' '.join(p[1:])
def p_error(self, p):
raise Exception('unexpected syntax: %r' % p)
class Rule:
def __init__(self, action, proto, source, srcport, dir, dest, destport, options):
self.action = action
self.proto = proto
self.source = source
self.srcport = srcport
self.dir = dir
self.dest = dest
self.destport = destport
self.options = options
def __str__(self):
lines = pprint.pformat(self.options, width=150).splitlines(True)
return ' action = %s\n' % self.action + \
' proto = %s\n' % self.proto + \
' source = %s\n' % self.source + \
' srcport = %s\n' % self.srcport + \
' dir = %s\n' % self.dir + \
' dest = %s\n' % self.dest + \
'destport = %s\n' % self.destport + \
' options = %s\n' % (' '*11).join(lines)
if __name__ == '__main__':
input = 'alert dns any any -> $HOME_NET any (msg:"EmergingThreats:Indicator-2829678"; content:"|00 01 00 01 00 00 00 00|"; offset:4; depth:8; content:"|00 10|"; distance:0; content:"powershell IEX"; distance:0; fast_pattern; metadata: former_category TROJAN; classtype:trojan-activity; sid: 26482; rev:1; metadata:affected_product Windows_XP_Vista_7_8_10_Server_32_64_Bit, attack_target Client_Endpoint, deployment Perimeter, signature_severity Major, created_at 2018_02_15, performance_impact Moderate, updated_at 2018_02_15;)'
lexer = ply.lex.lex(module=Lexer())
parser = ply.yacc.yacc(module=Parser())
print parser.parse(input)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment