Forked from anonymous/coverage_result_processor.py
Created
September 26, 2012 14:05
-
-
Save azbesthu/3788253 to your computer and use it in GitHub Desktop.
coverage_result_processor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# sudo apt-get install python-mysqldb | |
import csv | |
import getopt | |
import glob | |
import MySQLdb as mdb | |
import os | |
import readline | |
import sys | |
import time | |
def main(): | |
if len(sys.argv) != 5: | |
sys.exit("Mandatory parameters: [database host address], [database username], [password], [database]!") | |
else: | |
host = sys.argv[1] | |
db_username = sys.argv[2] | |
password = sys.argv[3] | |
database = sys.argv[4] | |
while True: | |
print "1. Create tables." | |
print "2. Fill the database." | |
print "3. Clear the database." | |
print "4. Query the functions order by coverage." | |
print "5. Query the number of tests that contain the current function." | |
print "6. Query the uncovered functions." | |
print "9. Help." | |
print "0. Exit." | |
var = raw_input("Selected menu: ") | |
if var == "0" or var == "exit": | |
sys.exit(0) | |
elif var == "1": | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
if not there_are_tables(connection, database): | |
create_tables(connection) | |
else: | |
print "The database has already tables." | |
connection.close() | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "2": | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
if there_are_tables(connection, database) and is_empty_db(connection): | |
if query_yes_no("It will take hours. Are you sure you want to continue?", default="no") == "yes": | |
def complete(text, state): | |
if text == '~/': | |
text = os.path.expanduser('~') + "/" | |
return (glob.glob(text+'*')+[None])[state] | |
readline.set_completer_delims(' \t\n;') | |
readline.parse_and_bind("tab: complete") | |
readline.set_completer(complete) | |
dirpath = raw_input("Directory path of coverage results: ") | |
if dirpath == 'exit': | |
continue | |
testcaseslist_path = raw_input("Testcases path: ") | |
if testcaseslist_path == 'exit': | |
continue | |
save_testcaseslist(connection, dirpath, testcaseslist_path) | |
else: | |
print "The database is not empty! You can only use this menu with empty database!" | |
connection.close() | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "3": | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
if there_are_tables(connection, database) and is_empty_db(connection): | |
print "The database has already been empty." | |
else: | |
if query_yes_no("It will clear the database and leave the table structure unmodified! Are you sure you want to continue?", default="no") == "yes": | |
if query_yes_no("Are you sure you want to continue!?", default="no") == "yes": | |
clear_db(connection) | |
connection.close() | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "4": | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
if there_are_tables(connection, database) and not is_empty_db(connection): | |
file_name = raw_input("The output file name: ") | |
if file_name == 'exit': | |
continue | |
query_functions_order_by_coverage(connection, file_name) | |
connection.close() | |
else: | |
print "There are no datas in the database!" | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "5": | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
if there_are_tables(connection, database) and not is_empty_db(connection): | |
file_name = raw_input("The output file name: ") | |
if file_name == 'exit': | |
continue | |
query_tests(connection, file_name) | |
connection.close() | |
else: | |
print "There are no datas in the database!" | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "6": | |
try: | |
connection = create_connection(host, db_username, password ,database) | |
if there_are_tables(connection, database) and not is_empty_db(connection): | |
file_name = raw_input("The output file name: ") | |
if file_name == 'exit': | |
continue | |
query_uncovered_functions(connection, file_name) | |
connection.close() | |
else: | |
print "There are no datas in the database!" | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
print "\n" | |
elif var == "9": | |
print "---------------------------------------------------" | |
print "You can use the TAB key for filepath autocomletion." | |
print "You can exit from a submenu with 'exit' command." | |
print "All the query results will be in an output csv file." | |
print "---------------------------------------------------" | |
raw_input("Press any key to continue...") | |
print "\n" | |
else: | |
print "Wrong number!\n" | |
def create_tables(connection): | |
t0 = time.time() | |
cursor = connection.cursor() | |
cursor.execute("""CREATE TABLE IF NOT EXISTS Functions ( | |
id INTEGER PRIMARY KEY auto_increment, | |
name varchar(400), | |
mangle varchar(400), | |
file_name_id INTEGER, | |
linenum INTEGER, | |
mem_addr varchar(200), | |
FOREIGN KEY(file_name_id) REFERENCES FileNames(id), | |
CONSTRAINT uc_functions UNIQUE (mangle, file_name_id, linenum) | |
); | |
CREATE TABLE IF NOT EXISTS Tests ( | |
id INTEGER PRIMARY KEY auto_increment, | |
name varchar(200), | |
name_2 varchar(200) | |
); | |
CREATE TABLE IF NOT EXISTS Results ( | |
id INTEGER PRIMARY KEY auto_increment, | |
test_id INTEGER, | |
function_id INTEGER, | |
count INTEGER, | |
FOREIGN KEY(test_id) REFERENCES Tests(id), | |
FOREIGN KEY(function_id) REFERENCES Functions(id) | |
); | |
CREATE TABLE IF NOT EXISTS FileNames ( | |
id INTEGER PRIMARY KEY auto_increment, | |
name varchar(300), | |
UNIQUE(name) | |
);""") | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
print "Success!" | |
def clear_db(connection): | |
t0 = time.time() | |
cursor = connection.cursor() | |
cursor.execute("""TRUNCATE Results; | |
TRUNCATE Functions; | |
TRUNCATE FileNames; | |
TRUNCATE Tests;""") | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
print "Success!" | |
def there_are_tables(connection, database): | |
cursor = connection.cursor() | |
cursor.execute("SELECT COUNT(DISTINCT `table_name`) FROM `information_schema`.`columns` WHERE `table_schema` = '%s'" % database) | |
value = int(cursor.fetchone()[0]) | |
cursor.close() | |
if value == 0: | |
return False | |
else: | |
return True | |
def is_empty_db(connection): | |
cursor = connection.cursor() | |
cursor.execute("""SELECT sum(records) FROM (SELECT count(*) AS records FROM FileNames | |
union | |
SELECT count(*) AS records FROM Functions | |
union | |
SELECT count(*) AS records FROM Results | |
union | |
SELECT count(*) AS records FROM Tests) AS sumRecords;""") | |
value = int(cursor.fetchone()[0]) | |
cursor.close() | |
if value > 0: | |
return False | |
return True | |
def save_testcaseslist(connection, dirpath, filepath): | |
t0 = time.time() | |
cursor = connection.cursor() | |
repeat = False | |
for x in csv.reader(open(filepath,'r'), delimiter=';'): | |
if(x[0] == "1"): | |
if repeat: | |
break | |
repeat = True | |
db_row = "" | |
split = x[1].split("!") | |
split2 = x[2].split("/") | |
if split[1] == "home": | |
db_row = ("!".join(split[6:]), "/".join(split2[6:])) | |
else: | |
db_row = (x[1], x[2]) | |
cursor.execute("INSERT INTO Tests(name, name_2) VALUES (%s, %s)", db_row) | |
last_id = cursor.lastrowid | |
save_testcase(connection, os.path.join(dirpath, db_row[0]), last_id) | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
print "Success!" | |
def save_testcase(connection, filepath, last_testcase_id): | |
cursor = connection.cursor() | |
for x in csv.reader(open(filepath,'r'), delimiter=';'): | |
if(x[3] != 'NO LOCATION'): | |
split = x[3].partition(":") | |
cursor.execute("INSERT INTO FileNames(name) VALUES (%s) ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id)", split[0]) | |
last_id = cursor.lastrowid | |
db_row = (x[2], x[1], last_id, split[2]) | |
cursor.execute("INSERT INTO Functions(name, mangle, file_name_id, linenum) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id)", db_row) | |
last_id = cursor.lastrowid | |
save_test(connection, last_testcase_id, last_id, x[0]) | |
cursor.close() | |
def save_test(connection, test_id, function_id, count): | |
cursor = connection.cursor() | |
db_row = (test_id, function_id, count) | |
cursor.execute("INSERT INTO Results(test_id, function_id, count) VALUES (%s, %s, %s)", db_row) | |
cursor.close() | |
def query_functions_order_by_coverage(connection, file_name): | |
t0 = time.time() | |
cursor = connection.cursor() | |
cursor.execute("SELECT Results.function_id, Functions.name, COUNT(Results.Function_id) as count_test from Results, Functions WHERE Results.function_id=Functions.id GROUP BY Results.function_id ORDER BY count_test DESC") | |
writer = csv.writer(open(file_name + ".csv", "w+"), delimiter=';') | |
rows = cursor.fetchall() | |
for row in rows: | |
writer.writerow(row) | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
print "Success!" | |
def query_tests(connection, file_name): | |
t0 = time.time() | |
cursor = connection.cursor() | |
cursor.execute("SELECT function_id, Functions.name, count(test_id) FROM Results, Functions WHERE Results.function_id=Functions.id GROUP BY function_id") | |
writer = csv.writer(open(file_name + ".csv", "w+"), delimiter=';') | |
rows = cursor.fetchall() | |
for row in rows: | |
writer.writerow(row) | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
print "Success!" | |
def query_uncovered_functions(connection, file_name): | |
t0 = time.time() | |
cursor = connection.cursor() | |
cursor.execute("SELECT FileNames.name FROM FileNames WHERE FileNames.id NOT IN (SELECT Functions.file_name_id FROM Functions)") | |
writer = csv.writer(open(file_name + ".csv", "w+"), delimiter=';') | |
rows = cursor.fetchall() | |
for row in rows: | |
writer.writerow(row) | |
cursor.close() | |
print "Total time: %d:%02d:%02d" % timer(t0) | |
print "Success!" | |
def create_connection(host, db_username, password, database): | |
try: | |
connection = mdb.connect(host, db_username, password, database) | |
except mdb.Error, e: | |
print "Error %d: %s" % (e.args[0],e.args[1]) | |
sys.exit(1) | |
return connection | |
def timer(t0): | |
m, s = divmod(int(time.time() - t0), 60) | |
h, m = divmod(m, 60) | |
return (h, m, s) | |
def query_yes_no(question, default="yes"): | |
"""Ask a yes/no question via raw_input() and return their answer. | |
"question" is a string that is presented to the user. | |
"default" is the presumed answer if the user just hits <Enter>. | |
It must be "yes" (the default), "no" or None (meaning | |
an answer is required of the user). | |
The "answer" return value is one of "yes" or "no". | |
""" | |
valid = {"yes":"yes", "y":"yes", "ye":"yes", | |
"no":"no", "n":"no"} | |
if default == None: | |
prompt = " [y/n] " | |
elif default == "yes": | |
prompt = " [Y/n] " | |
elif default == "no": | |
prompt = " [y/N] " | |
else: | |
raise ValueError("invalid default answer: '%s'" % default) | |
while 1: | |
sys.stdout.write(question + prompt) | |
choice = raw_input().lower() | |
if default is not None and choice == '': | |
return default | |
elif choice in valid.keys(): | |
return valid[choice] | |
else: | |
sys.stdout.write("Please respond with 'yes' or 'no' "\ | |
"(or 'y' or 'n').\n") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment