Forked from anonymous/coverage_result_processor.py
Created
September 25, 2012 11:53
-
-
Save azbesthu/3781318 to your computer and use it in GitHub Desktop.
coverage_result_processor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import csv | |
import getopt | |
import glob | |
import MySQLdb as mdb | |
import os | |
import readline | |
import sys | |
import time | |
def main(): | |
if len(sys.argv) != 5: | |
sys.exit("Mandatory parameters: [database host address], [database username], [password], [database]!") | |
else: | |
host = sys.argv[1] | |
db_username = sys.argv[2] | |
password = sys.argv[3] | |
database = sys.argv[4] | |
while True: | |
print "1. Create tables." | |
print "2. Fill the database." | |
print "3. Clear the database." | |
print "4. Query the functions order by coverage. The result will be in an output file." | |
print "5. Query the number of tests that contain the current function. The result will be in an output file." | |
print "6. Query the uncovered functions." | |
print "9. Help." | |
print "0. Exit." | |
var = raw_input("Selected menu: ") | |
if var == "0" or var == "exit": | |
sys.exit(0) | |
elif var == "1": | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
if is_empty_db(connection): | |
create_tables(connection) | |
else: | |
print "The database has already tables." | |
connection.close() | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "2": | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
if is_empty_db(connection): | |
if query_yes_no("It will take hours. Are you sure you want to continue?", default="no") == "yes": | |
def complete(text, state): | |
if text == '~/': | |
text = os.path.expanduser('~') + "/" | |
return (glob.glob(text+'*')+[None])[state] | |
readline.set_completer_delims(' \t\n;') | |
readline.parse_and_bind("tab: complete") | |
readline.set_completer(complete) | |
dirpath = raw_input("Directory path of coverage results: ") | |
if dirpath == 'exit': | |
continue | |
testcaseslist_path = raw_input("Testcases path: ") | |
if testcaseslist_path == 'exit': | |
continue | |
save_testcaseslist(connection, dirpath, testcaseslist_path) | |
else: | |
print "The database is not empty! You can only use this menu with empty database!" | |
connection.close() | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "3": | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
if is_empty_db(connection): | |
print "The database is empty." | |
else: | |
if query_yes_no("It will clear the database and leave the table structure unmodified! Are you sure you want to continue?", default="no") == "yes": | |
if query_yes_no("Are you sure you want to continue!?", default="no") == "yes": | |
clear_db(connection) | |
connection.close() | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "4": | |
file_name = raw_input("The output file name: ") | |
if file_name == 'exit': | |
continue | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
query_functions_order_by_coverage(connection, file_name) | |
connection.close() | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "5": | |
file_name = raw_input("The output file name: ") | |
if file_name == 'exit': | |
continue | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
query_tests(connection, file_name) | |
connection.close() | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "6": | |
file_name = raw_input("The output file name: ") | |
if file_name == 'exit': | |
continue | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
query_uncovered_functions(connection, file_name) | |
connection.close() | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "9": | |
print "---------------------------------------------------" | |
print "You can use the TAB key for filepath autocomletion." | |
print "You can exit from a submenu with 'exit' command." | |
print "---------------------------------------------------" | |
raw_input("Press any key to continue...") | |
print "\n" | |
else: | |
print "Wrong number!\n" | |
def create_tables(connection): | |
t0 = time.time() | |
cursor = connection.cursor() | |
cursor.execute("""CREATE TABLE IF NOT EXISTS Functions ( | |
id INTEGER PRIMARY KEY auto_increment, | |
name varchar(400), | |
mangle varchar(400), | |
file_name_id INTEGER, | |
linenum INTEGER, | |
mem_addr varchar(200), | |
FOREIGN KEY(file_name_id) REFERENCES FileNames(id), | |
CONSTRAINT uc_functions UNIQUE (mangle, file_name_id, linenum) | |
); | |
CREATE TABLE IF NOT EXISTS Tests ( | |
id INTEGER PRIMARY KEY auto_increment, | |
name varchar(200), | |
name_2 varchar(200) | |
); | |
CREATE TABLE IF NOT EXISTS Results ( | |
id INTEGER PRIMARY KEY auto_increment, | |
test_id INTEGER, | |
function_id INTEGER, | |
count INTEGER, | |
FOREIGN KEY(test_id) REFERENCES Tests(id), | |
FOREIGN KEY(function_id) REFERENCES Functions(id) | |
); | |
CREATE TABLE IF NOT EXISTS FileNames ( | |
id INTEGER PRIMARY KEY auto_increment, | |
name varchar(300), | |
UNIQUE(name) | |
);""") | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
def clear_db(connection): | |
t0 = time.time() | |
cursor = connection.cursor() | |
cursor.execute("""TRUNCAT Results; | |
TRUNCAT Functions; | |
TRUNCAT FileNames; | |
TRUNCAT Tests;""") | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
def is_empty_db(connection): | |
cursor = connection.cursor() | |
cursor.execute("""SELECT sum(records) FROM (SELECT count(*) AS records FROM FileNames | |
union | |
SELECT count(*) AS records FROM Functions | |
union | |
SELECT count(*) AS records FROM Results | |
union | |
SELECT count(*) AS records FROM Tests) AS sumRecords;""") | |
value = int(cursor.fetchone()[0]) | |
cursor.close() | |
if value > 0: | |
return False | |
return True | |
def save_testcaseslist(connection, dirpath, filepath): | |
t0 = time.time() | |
cursor = connection.cursor() | |
repeat = False | |
for x in csv.reader(open(filepath,'r'), delimiter=';'): | |
if(x[0] == "1"): | |
if repeat: | |
break | |
repeat = True | |
db_row = "" | |
split = x[1].split("!") | |
split2 = x[2].split("/") | |
if split[1] == "home": | |
db_row = ("!".join(split[6:]), "/".join(split2[6:])) | |
else: | |
db_row = (x[1], x[2]) | |
cursor.execute("INSERT INTO Tests(name, name_2) VALUES (%s, %s)", db_row) | |
last_id = cursor.lastrowid | |
save_testcase(connection, os.path.join(dirpath, db_row[0]), last_id) | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
def save_testcase(connection, filepath, last_testcase_id): | |
cursor = connection.cursor() | |
for x in csv.reader(open(filepath,'r'), delimiter=';'): | |
if(x[3] != 'NO LOCATION'): | |
split = x[3].partition(":") | |
cursor.execute("INSERT INTO FileNames(name) VALUES (%s) ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id)", split[0]) | |
last_id = cursor.lastrowid | |
db_row = (x[2], x[1], last_id, split[2]) | |
cursor.execute("INSERT INTO Functions(name, mangle, file_name_id, linenum) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id)", db_row) | |
last_id = cursor.lastrowid | |
save_test(connection, last_testcase_id, last_id, x[0]) | |
cursor.close() | |
def save_test(connection, test_id, function_id, count): | |
cursor = connection.cursor() | |
db_row = (test_id, function_id, count) | |
cursor.execute("INSERT INTO Results(test_id, function_id, count) VALUES (%s, %s, %s)", db_row) | |
cursor.close() | |
def query_functions_order_by_coverage(connection, file_name): | |
t0 = time.time() | |
cursor = connection.cursor() | |
cursor.execute("SELECT function_id, Functions.name, count(test_id) as count_test_id FROM Results, Functions WHERE Results.function_id=Functions.id GROUP BY function_id ORDER BY count_test_id DESC") | |
rows = cursor.fetchall() | |
for row in rows: | |
writer = csv.writer(open(file_name + ".csv", "w+"), delimiter=';') | |
writer.writerow(row) | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
def query_tests(connection, file_name): | |
t0 = time.time() | |
cursor = connection.cursor() | |
cursor.execute("SELECT function_id, Functions.name, count(test_id) FROM Results, Functions WHERE Results.function_id=Functions.id GROUP BY function_id") | |
rows = cursor.fetchall() | |
for row in rows: | |
writer = csv.writer(open(file_name + ".csv", "w+"), delimiter=';') | |
writer.writerow(row) | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
def query_uncovered_functions(connection, file_name): | |
t0 = time.time() | |
cursor = connection.cursor() | |
cursor.execute("SELECT FileNames.name FROM FileNames WHERE FileNames.id NOT IN (SELECT Functions.file_name_id FROM Functions)") | |
rows = cursor.fetchall() | |
for row in rows: | |
writer = csv.writer(open(file_name + ".csv", "w+"), delimiter=';') | |
writer.writerow(row) | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
def create_connection(host, db_username, password, database): | |
try: | |
connection = mdb.connect(host, db_username, password, database) | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
return connection | |
def timer(t0): | |
m, s = divmod(int(time.time() - t0), 60) | |
h, m = divmod(m, 60) | |
return (h, m, s) | |
def query_yes_no(question, default="yes"): | |
"""Ask a yes/no question via raw_input() and return their answer. | |
"question" is a string that is presented to the user. | |
"default" is the presumed answer if the user just hits <Enter>. | |
It must be "yes" (the default), "no" or None (meaning | |
an answer is required of the user). | |
The "answer" return value is one of "yes" or "no". | |
""" | |
valid = {"yes":"yes", "y":"yes", "ye":"yes", | |
"no":"no", "n":"no"} | |
if default == None: | |
prompt = " [y/n] " | |
elif default == "yes": | |
prompt = " [Y/n] " | |
elif default == "no": | |
prompt = " [y/N] " | |
else: | |
raise ValueError("invalid default answer: '%s'" % default) | |
while 1: | |
sys.stdout.write(question + prompt) | |
choice = raw_input().lower() | |
if default is not None and choice == '': | |
return default | |
elif choice in valid.keys(): | |
return valid[choice] | |
else: | |
sys.stdout.write("Please respond with 'yes' or 'no' "\ | |
"(or 'y' or 'n').\n") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment