Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save azbesthu/3781318 to your computer and use it in GitHub Desktop.
Save azbesthu/3781318 to your computer and use it in GitHub Desktop.
coverage_result_processor.py
#!/usr/bin/env python
import csv
import getopt
import glob
import MySQLdb as mdb
import os
import readline
import sys
import time
def main():
if len(sys.argv) != 5:
sys.exit("Mandatory parameters: [database host address], [database username], [password], [database]!")
else:
host = sys.argv[1]
db_username = sys.argv[2]
password = sys.argv[3]
database = sys.argv[4]
while True:
print "1. Create tables."
print "2. Fill the database."
print "3. Clear the database."
print "4. Query the functions order by coverage. The result will be in an output file."
print "5. Query the number of tests that contain the current function. The result will be in an output file."
print "6. Query the uncovered functions."
print "9. Help."
print "0. Exit."
var = raw_input("Selected menu: ")
if var == "0" or var == "exit":
sys.exit(0)
elif var == "1":
try:
connection = create_connection(host, db_username, password ,database)
if is_empty_db(connection):
create_tables(connection)
else:
print "The database has already tables."
connection.close()
except mdb.Error, e:
print "Error %d: %s" % (e.args[0],e.args[1])
sys.exit(1)
print "\n"
elif var == "2":
try:
connection = create_connection(host, db_username, password ,database)
if is_empty_db(connection):
if query_yes_no("It will take hours. Are you sure you want to continue?", default="no") == "yes":
def complete(text, state):
if text == '~/':
text = os.path.expanduser('~') + "/"
return (glob.glob(text+'*')+[None])[state]
readline.set_completer_delims(' \t\n;')
readline.parse_and_bind("tab: complete")
readline.set_completer(complete)
dirpath = raw_input("Directory path of coverage results: ")
if dirpath == 'exit':
continue
testcaseslist_path = raw_input("Testcases path: ")
if testcaseslist_path == 'exit':
continue
save_testcaseslist(connection, dirpath, testcaseslist_path)
else:
print "The database is not empty! You can only use this menu with empty database!"
connection.close()
except mdb.Error, e:
print "Error %d: %s" % (e.args[0],e.args[1])
sys.exit(1)
print "\n"
elif var == "3":
try:
connection = create_connection(host, db_username, password ,database)
if is_empty_db(connection):
print "The database is empty."
else:
if query_yes_no("It will clear the database and leave the table structure unmodified! Are you sure you want to continue?", default="no") == "yes":
if query_yes_no("Are you sure you want to continue!?", default="no") == "yes":
clear_db(connection)
connection.close()
except mdb.Error, e:
print "Error %d: %s" % (e.args[0],e.args[1])
sys.exit(1)
print "\n"
elif var == "4":
file_name = raw_input("The output file name: ")
if file_name == 'exit':
continue
try:
connection = create_connection(host, db_username, password ,database)
query_functions_order_by_coverage(connection, file_name)
connection.close()
except mdb.Error, e:
print "Error %d: %s" % (e.args[0],e.args[1])
sys.exit(1)
print "\n"
elif var == "5":
file_name = raw_input("The output file name: ")
if file_name == 'exit':
continue
try:
connection = create_connection(host, db_username, password ,database)
query_tests(connection, file_name)
connection.close()
except mdb.Error, e:
print "Error %d: %s" % (e.args[0],e.args[1])
sys.exit(1)
print "\n"
elif var == "6":
file_name = raw_input("The output file name: ")
if file_name == 'exit':
continue
try:
connection = create_connection(host, db_username, password ,database)
query_uncovered_functions(connection, file_name)
connection.close()
except mdb.Error, e:
print "Error %d: %s" % (e.args[0],e.args[1])
sys.exit(1)
print "\n"
elif var == "9":
print "---------------------------------------------------"
print "You can use the TAB key for filepath autocomletion."
print "You can exit from a submenu with 'exit' command."
print "---------------------------------------------------"
raw_input("Press any key to continue...")
print "\n"
else:
print "Wrong number!\n"
def create_tables(connection):
t0 = time.time()
cursor = connection.cursor()
cursor.execute("""CREATE TABLE IF NOT EXISTS Functions (
id INTEGER PRIMARY KEY auto_increment,
name varchar(400),
mangle varchar(400),
file_name_id INTEGER,
linenum INTEGER,
mem_addr varchar(200),
FOREIGN KEY(file_name_id) REFERENCES FileNames(id),
CONSTRAINT uc_functions UNIQUE (mangle, file_name_id, linenum)
);
CREATE TABLE IF NOT EXISTS Tests (
id INTEGER PRIMARY KEY auto_increment,
name varchar(200),
name_2 varchar(200)
);
CREATE TABLE IF NOT EXISTS Results (
id INTEGER PRIMARY KEY auto_increment,
test_id INTEGER,
function_id INTEGER,
count INTEGER,
FOREIGN KEY(test_id) REFERENCES Tests(id),
FOREIGN KEY(function_id) REFERENCES Functions(id)
);
CREATE TABLE IF NOT EXISTS FileNames (
id INTEGER PRIMARY KEY auto_increment,
name varchar(300),
UNIQUE(name)
);""")
cursor.close()
print "Total time: %d:%02d:%02d" % timer(t0)
def clear_db(connection):
t0 = time.time()
cursor = connection.cursor()
cursor.execute("""TRUNCAT Results;
TRUNCAT Functions;
TRUNCAT FileNames;
TRUNCAT Tests;""")
cursor.close()
print "Total time: %d:%02d:%02d" % timer(t0)
def is_empty_db(connection):
cursor = connection.cursor()
cursor.execute("""SELECT sum(records) FROM (SELECT count(*) AS records FROM FileNames
union
SELECT count(*) AS records FROM Functions
union
SELECT count(*) AS records FROM Results
union
SELECT count(*) AS records FROM Tests) AS sumRecords;""")
value = int(cursor.fetchone()[0])
cursor.close()
if value > 0:
return False
return True
def save_testcaseslist(connection, dirpath, filepath):
t0 = time.time()
cursor = connection.cursor()
repeat = False
for x in csv.reader(open(filepath,'r'), delimiter=';'):
if(x[0] == "1"):
if repeat:
break
repeat = True
db_row = ""
split = x[1].split("!")
split2 = x[2].split("/")
if split[1] == "home":
db_row = ("!".join(split[6:]), "/".join(split2[6:]))
else:
db_row = (x[1], x[2])
cursor.execute("INSERT INTO Tests(name, name_2) VALUES (%s, %s)", db_row)
last_id = cursor.lastrowid
save_testcase(connection, os.path.join(dirpath, db_row[0]), last_id)
cursor.close()
print "Total time: %d:%02d:%02d" % timer(t0)
def save_testcase(connection, filepath, last_testcase_id):
cursor = connection.cursor()
for x in csv.reader(open(filepath,'r'), delimiter=';'):
if(x[3] != 'NO LOCATION'):
split = x[3].partition(":")
cursor.execute("INSERT INTO FileNames(name) VALUES (%s) ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id)", split[0])
last_id = cursor.lastrowid
db_row = (x[2], x[1], last_id, split[2])
cursor.execute("INSERT INTO Functions(name, mangle, file_name_id, linenum) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id)", db_row)
last_id = cursor.lastrowid
save_test(connection, last_testcase_id, last_id, x[0])
cursor.close()
def save_test(connection, test_id, function_id, count):
cursor = connection.cursor()
db_row = (test_id, function_id, count)
cursor.execute("INSERT INTO Results(test_id, function_id, count) VALUES (%s, %s, %s)", db_row)
cursor.close()
def query_functions_order_by_coverage(connection, file_name):
t0 = time.time()
cursor = connection.cursor()
cursor.execute("SELECT function_id, Functions.name, count(test_id) as count_test_id FROM Results, Functions WHERE Results.function_id=Functions.id GROUP BY function_id ORDER BY count_test_id DESC")
rows = cursor.fetchall()
for row in rows:
writer = csv.writer(open(file_name + ".csv", "w+"), delimiter=';')
writer.writerow(row)
cursor.close()
print "Total time: %d:%02d:%02d" % timer(t0)
def query_tests(connection, file_name):
t0 = time.time()
cursor = connection.cursor()
cursor.execute("SELECT function_id, Functions.name, count(test_id) FROM Results, Functions WHERE Results.function_id=Functions.id GROUP BY function_id")
rows = cursor.fetchall()
for row in rows:
writer = csv.writer(open(file_name + ".csv", "w+"), delimiter=';')
writer.writerow(row)
cursor.close()
print "Total time: %d:%02d:%02d" % timer(t0)
def query_uncovered_functions(connection, file_name):
t0 = time.time()
cursor = connection.cursor()
cursor.execute("SELECT FileNames.name FROM FileNames WHERE FileNames.id NOT IN (SELECT Functions.file_name_id FROM Functions)")
rows = cursor.fetchall()
for row in rows:
writer = csv.writer(open(file_name + ".csv", "w+"), delimiter=';')
writer.writerow(row)
cursor.close()
print "Total time: %d:%02d:%02d" % timer(t0)
def create_connection(host, db_username, password, database):
try:
connection = mdb.connect(host, db_username, password, database)
except mdb.Error, e:
print "Error %d: %s" % (e.args[0],e.args[1])
sys.exit(1)
return connection
def timer(t0):
m, s = divmod(int(time.time() - t0), 60)
h, m = divmod(m, 60)
return (h, m, s)
def query_yes_no(question, default="yes"):
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is one of "yes" or "no".
"""
valid = {"yes":"yes", "y":"yes", "ye":"yes",
"no":"no", "n":"no"}
if default == None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while 1:
sys.stdout.write(question + prompt)
choice = raw_input().lower()
if default is not None and choice == '':
return default
elif choice in valid.keys():
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' "\
"(or 'y' or 'n').\n")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment