Last active
June 12, 2020 06:59
-
-
Save Cr4sh/5b06371f0e3a3b137684fd79885f0519 to your computer and use it in GitHub Desktop.
AT commands fuzzer based on ATFuzzer code base
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
''' | |
******************************************************************************** | |
AT commands fuzzer based on ATFuzzer code base. | |
* https://github.com/Imtiazkarimik23/ATFuzzer | |
* https://relentless-warrior.github.io/wp-content/uploads/2019/11/atfuzz.pdf | |
USAGE: at_fuzzer.py <serial_port_device> | |
******************************************************************************** | |
''' | |
import sys, os, time, random, string, json, copy | |
import serial | |
from numpy import setdiff1d | |
# number of iterations | |
ATTEMPTS = 1 | |
# number of input generated for each grammar | |
INPUT_NUMBER = 1 | |
# number of commands in the input line | |
CMD_NUMBER = 1 | |
# grammar database | |
GRAM_FILE_PATH = 'at_command_grammar.json' | |
DEVICE_BAUD = 115200 | |
DEVICE_RED_LEN = 0x100 | |
DEVICE_TIMEOUT = 1 | |
C_END = '\33[0m' | |
C_RED = '\33[31m' | |
C_GREEN = '\33[32m' | |
C_YELLOW = '\33[33m' | |
C_BLUE = '\33[34m' | |
MAX_INT = sys.maxsize | |
MIN_INT = -sys.maxsize | |
# list of standard versions of the grammars | |
m_standard_set = [] | |
# serial port device | |
m_device = None | |
m_count = 0 | |
semicolon = ';' | |
def color(col, text): | |
# print text with given color | |
return col + text + C_END | |
def read_conf(): | |
conf = json.load(open(GRAM_FILE_PATH)) | |
return conf['AT_CMD_GRAMMARS'] | |
def flip_coin(faces = 1): | |
# returns a random number between 0 and faces | |
return random.randint(0, faces) if faces > 0 else random.randint(0, 1) | |
def remove_elements(list_1, list_2): | |
new_list = [] | |
for e in list_1: | |
if e not in list_2: | |
new_list.append(e) | |
return new_list | |
def in_gram_crossover(cmd_gram, move_command = 0): | |
gram_struct = cmd_gram['struct'] | |
if len(gram_struct) > 2: | |
start = 0 if move_command == 1 else 2 | |
i = random.randint(start, len(gram_struct) - 1) | |
j = random.randint(start, len(gram_struct) - 1) | |
gram_struct[i], gram_struct[j] = gram_struct[j], gram_struct[i] | |
return cmd_gram | |
def multi_gram_crossover(grams_set): | |
gram_1, gram_2 = random.choices(grams_set, k = 2) | |
arg_1 = random.choice(gram_1['arg']) | |
arg_2 = random.choice(gram_2['arg']) | |
gram_1[arg_1], gram_2[arg_2] = gram_2[arg_2], gram_1[arg_1] | |
def add_field(cmd_gram): | |
gram_struct = cmd_gram['struct'] | |
possible_elements = remove_elements(cmd_gram.keys(), [ 'struct', 'arg', 'separator', 'score' ]) | |
missing_elements = setdiff1d(possible_elements, gram_struct) | |
if len(missing_elements) > 0: | |
gram_struct.append(random.choice(missing_elements)) | |
else: | |
cmd_gram['struct'].append('random') | |
cmd_gram['arg'].append('random') | |
cmd_gram['random'] = { 'type': 'string', 'length': 5 } | |
if not cmd_gram.has_key('separator'): | |
# there is no separator, probably due to the fact that there is one ore less argument | |
cmd_gram['separator'] = ',' | |
def remove_field(cmd_gram, move_command = 0): | |
gram_struct = cmd_gram['struct'] | |
if len(gram_struct) > 2: | |
start = 0 if move_command == 1 else 2 | |
gram_struct.pop(random.randint(start, len(gram_struct) - 1)) | |
def gram_random_add_delete(cmd_gram, move_command = 0): | |
''' Takes list of elements and add/delete an element to/from that list. | |
''' | |
gram_struct = cmd_gram['struct'] | |
if len(gram_struct) < 2 or flip_coin() == 0: | |
# add field to the input | |
add_field(cmd_gram) | |
else: | |
# remove field from the imput | |
remove_field(cmd_gram, move_command) | |
def negate_condition(cmd_gram): | |
''' Negate the condition associated to a field. For instance, the field is an integer between 1 and 10, | |
the function changes the range below 1 or above 10. | |
''' | |
arg_in_struct = [] | |
for el in cmd_gram['arg']: | |
if el in cmd_gram['struct']: | |
arg_in_struct.append(el) | |
if len(arg_in_struct) > 0: | |
arg_name = random.choice(arg_in_struct) | |
arg = cmd_gram[arg_name] | |
type = arg['type'] | |
if type == 'digit' or type == 'letters' or type == 'string': | |
# length: int | |
half_len, double_len = (arg['length'] / 2), (arg['length'] * 2) | |
arg['length'] = double_len if half_len < 2 else random.choice([ half_len, double_len ]) | |
elif type == 'ranged': | |
start, end = arg['range'][0], arg['range'][1] | |
start_or_end = flip_coin() | |
if start_or_end == 1: | |
if start > MIN_INT: | |
# negate start | |
arg['range'] = [ start - 100, start - 1 ] if (start - 100 > MIN_INT) else [ MIN_INT, start - 1 ] | |
else: | |
if end < MAX_INT: | |
# engate end | |
arg['range'] = [ end + 1, end + 100 ] if (end + 100 < MAX_INT) else [ end + 1, MAX_INT ] | |
elif type == 'fixed': | |
# values: [ str_1, str_2, ... , str_N ] | |
new_value = [] | |
for x in range(10): | |
new_value.append(random.choice(string.ascii_letters + string.digits + string.punctuation)) | |
arg['values'].append(''.join(new_value)) | |
elif type == 'immutable': | |
# immutable value: str | |
pass | |
else: | |
raise Exception('Unknow argument type') | |
def try_restore_type(arg, type): | |
''' Try to restore a fixed or ranged type in the case it was negate in a previous iteration. | |
This leverages the fact that we do not delete the field values or range when negating the type. | |
''' | |
if flip_coin() == 1: | |
if arg.has_key('values'): | |
arg['type'] = random.choice([ 'string', 'letters', 'fixed' ]) | |
else: | |
if arg.has_key('range'): | |
arg['type'] = random.choice([ 'string', 'letters', 'ranged' ]) | |
else: | |
if type == 'digit': | |
arg['type'] = random.choice([ 'string', 'letters' ]) | |
elif type == 'letters': | |
arg['type'] = random.choice([ 'string', 'digit' ]) | |
elif type == 'string': | |
pass | |
else: | |
if type == 'digit': | |
arg['type'] = random.choice([ 'string', 'letters' ]) | |
elif type == 'letters': | |
arg['type'] = random.choice([ 'string', 'digit' ]) | |
elif type == 'string': | |
pass | |
def negate_type(cmd_gram): | |
''' Negates the type associated to a field. For instance, the field is an integer, | |
the function converts it into a string. | |
''' | |
arg_in_struct = [] | |
for el in cmd_gram['arg']: | |
if el in cmd_gram['struct']: | |
arg_in_struct.append(el) | |
if len(arg_in_struct) > 0: | |
arg_name = random.choice(arg_in_struct) | |
arg = cmd_gram[arg_name] | |
type = arg['type'] | |
if type == 'digit' or type == 'letters' or type == 'string': | |
try_restore_type(arg, type) | |
elif type == 'ranged' or type == 'fixed': | |
arg['type'] = random.choice(['string', 'digit', 'letters']) | |
arg['length'] = random.randint(0, 100) | |
elif type == 'immutable': | |
# cannot change immutable | |
pass | |
else: | |
raise Exception('Unknow argument type') | |
def fixed_integers(cmd_gram): | |
''' Negates the condition associated to a field and set value of the filed to corner case values. | |
''' | |
arg_in_struct = [] | |
for el in cmd_gram['arg']: | |
if el in cmd_gram['struct']: | |
arg_in_struct.append(el) | |
if len(arg_in_struct) > 0: | |
arg_name = random.choice(arg_in_struct) | |
arg = cmd_gram[arg_name] | |
type = arg['type'] | |
if type == 'digit': | |
arg['type'] = 'fixed' | |
arg['values'] = [ MIN_INT, MAX_INT ] | |
if type == 'fixed': | |
arg['values'].append([ MIN_INT, MAX_INT ]) | |
if type == 'ranged': | |
if flip_coin() == 0: | |
arg['range'] = [ MIN_INT, arg['range'][1] ] | |
else: | |
arg['range'] = [ arg['range'][0], MAX_INT ] | |
def alter_connectors(cmd_gram): | |
try: | |
# alters the symbols used for connecting grammars and fields with grammars | |
cmd_gram['separator'] = random.choice(string.punctuation) | |
except KeyError: | |
pass | |
def modify_grammar(cmd_gram, move_command = 0): | |
in_gram_crossover(cmd_gram, move_command) | |
if flip_coin() == 1: | |
gram_random_add_delete(cmd_gram, move_command) | |
if flip_coin() == 1: | |
negate_condition(cmd_gram) | |
if flip_coin() == 1: | |
negate_type(cmd_gram) | |
if flip_coin() == 1: | |
fixed_integers(cmd_gram) | |
if flip_coin() == 1: | |
alter_connectors(cmd_gram) | |
def random_semicolon(): | |
''' Produce a string with a random number of semicolon (1-5). | |
''' | |
semicolon_num = random.randint(1, 5) | |
return ''.join(semicolon for x in range(semicolon_num)) | |
def random_digits(value_length): | |
''' Produce a random number of maximum length of value_length. | |
''' | |
if value_length < 0: | |
raise Exception('Negative length') | |
if value_length == 0: | |
value_length = 1 | |
l = random.randint(1, value_length) | |
range_start = 10 ** (l - 1) | |
range_end = (10 ** l) - 1 | |
return str(random.randint(range_start, range_end)) | |
def random_letters(value_length): | |
''' Produce a random string of letters of length of value_length. | |
''' | |
if value_length < 0: | |
raise Exception('Negative length') | |
l = 1 if value_length == 1 or value_length == 0 else random.randint(1, value_length) | |
return ''.join(random.choice(string.lowercase) for x in range(l)) | |
def random_string(value_length): | |
''' Produce a random string of numbers, letters and symbols of length of value_length. | |
''' | |
if value_length < 0: | |
raise Exception('Negative length') | |
l = 1 if value_length == 1 or value_length == 0 else random.randint(1, value_length) | |
return ''.join(random.choice(string.ascii_letters + string.digits + string.punctuation) for x in range(l)) | |
def generate_value(singol_arg): | |
type = singol_arg['type'] | |
if type == 'digit': | |
return random_digits(singol_arg['length']) | |
elif type == 'letters': | |
return random_letters(singol_arg['length']) | |
elif type == 'string': | |
return random_string(singol_arg['length']) | |
elif type == 'ranged': | |
start, end = singol_arg['range'][0], singol_arg['range'][1] | |
return str(random.randint(start, end)) | |
elif type == 'fixed': | |
return str(random.choice(singol_arg['values'])) | |
elif type == 'immutable': | |
return singol_arg['immutable_value'] | |
else: | |
raise Exception('Unknown type') | |
def gen_terminal(cmd_info, elem): | |
try: | |
arg = cmd_info['arg'] | |
except: | |
arg = [] | |
return generate_value(cmd_info[elem]) if elem in arg else str(cmd_info[elem]) | |
def gen_command(command_gram): | |
''' Build one AT command based on the grammar structure and the command info. | |
''' | |
command, previous_was_arg = '', 0 | |
for elem in command_gram['struct']: | |
term = gen_terminal(command_gram, elem) | |
try: | |
arg = command_gram['arg'] | |
except: | |
arg = [] | |
if elem in arg: | |
if term != 'null': | |
command += (command_gram['separator'] + term) if len(arg) > 1 and previous_was_arg == 1 else term | |
previous_was_arg = 1 | |
else: | |
command += term | |
previous_was_arg = 0 | |
return command | |
def command_exec(command, quiet = False): | |
global m_device, m_count | |
reply = '' | |
# send command | |
m_device.read(DEVICE_RED_LEN) | |
m_device.write(command + '\r') | |
if not quiet: | |
print('[%.8x] >>> %s' % (m_count, repr(command))) | |
t = time.time() | |
try: | |
# receive reply | |
reply = m_device.readline().strip() | |
t = time.time() - t | |
if reply == command.strip(): | |
reply = m_device.readline().strip() | |
except serial.serialutil.SerialException: | |
pass | |
if not quiet: | |
reply_str = reply | |
if len(reply_str) == 0: | |
reply_str = None | |
if reply_str is None: | |
reply_str = color(C_RED, repr(reply_str)) | |
elif reply_str.endswith('OK'): | |
reply_str = color(C_GREEN, repr(reply_str)) | |
elif not reply_str.endswith('ERROR'): | |
reply_str = color(C_YELLOW, repr(reply_str)) | |
else: | |
reply_str = repr(reply_str) | |
print('[%.8x] <<< %s %s' % (m_count, reply_str, color(C_BLUE, '%.4f' % t))) | |
return reply | |
def command_eval(command): | |
global m_count | |
ok = False | |
for _ in range(0, 5): | |
# check connection | |
reply = command_exec('AT', quiet = True) | |
if reply.strip() == 'OK': | |
ok = True | |
break | |
else: | |
time.sleep(1) | |
if not ok: | |
raise(Exception('Bad reply')) | |
command_exec(command) | |
m_count += 1 | |
def set_eval(gram_set): | |
for _ in range(INPUT_NUMBER): | |
command = '' | |
for gram in gram_set: | |
# generate command data for given grammar | |
command += 'AT' + str(gen_command(gram)) + ';' | |
# execute command | |
command_eval(command[: -1]) | |
# accept a list of grammars and return a list of as new list as the diversification factor | |
def set_modify(gram_set, diversification_factor): | |
global m_standard_set | |
modified_set = [] | |
for _ in range(diversification_factor): | |
modified_set.append([]) | |
for gram in gram_set: | |
generated = 0 | |
gram_prev = {} | |
while generated < diversification_factor: | |
# copy and modify current grammar | |
gram_new = copy.deepcopy(gram) | |
modify_grammar(gram_new) | |
if gram_new not in m_standard_set and gram_new != gram_prev: | |
modified_set[generated].append(gram_new) | |
generated += 1 | |
gram_prev = copy.deepcopy(gram_new) | |
return modified_set | |
def population_create(gram_sets, diversification_factor = 1): | |
new_population = [] | |
for single_set in gram_sets: | |
for new_set in set_modify(single_set, diversification_factor): | |
new_population.append(new_set) | |
return new_population | |
def population_select(scores): | |
# randomly select 2 grammars | |
return random.sample(scores, 2) | |
def gram_fuzz(gram_set): | |
global m_standard_set | |
for gram in gram_set: | |
gram['score'] = 0 | |
if not gram.has_key('arg'): | |
# no argument is expected | |
gram['struct'].append('random') | |
gram['separator'] = '' | |
gram['arg'] = [ 'random' ] | |
gram['random'] = { 'type': 'string', 'length': 5 } | |
if len(gram['struct']) > 3 and gram['cmd'] != '+CMGS': | |
m_standard_set.append(gram) | |
set_population = population_create([ gram_set ], 10) | |
set_scores = [] | |
for _ in range(ATTEMPTS): | |
for gram_set in set_population: | |
# evaluate grammars | |
set_eval(gram_set) | |
set_scores.append(gram_set) | |
# create new population | |
selected_sets = population_select(set_scores) | |
set_population = population_create(selected_sets, 2) | |
def gram_eval(): | |
grams = read_conf() | |
while True: | |
current_set = random.sample(grams.values(), CMD_NUMBER) | |
gram_fuzz(current_set) | |
def main(): | |
global m_device | |
if len(sys.argv) < 2: | |
print('USAGE: at_fuzzer.py <serial_port_device>') | |
return 0 | |
# open serial port | |
m_device = serial.Serial(port = sys.argv[1], baudrate = DEVICE_BAUD, timeout = DEVICE_TIMEOUT) | |
# check connectivity | |
command_exec('AT', quiet = True) | |
try: | |
# run fuzzing | |
gram_eval() | |
except KeyboardInterrupt: | |
print('\nEXIT') | |
return 0 | |
if __name__ == '__main__': | |
exit(main()) | |
# | |
# EoF | |
# |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment