Last active
October 25, 2018 17:40
-
-
Save lgessler/051c419f69c7292c166ff0fe21250d76 to your computer and use it in GitHub Desktop.
For LING 463, HW4 at Georgetown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import argparse | |
| from openpyxl import load_workbook | |
| from collections import defaultdict | |
| def xlsx_files(directory): | |
| return [x for x in os.listdir(directory) if x.endswith("xlsx")] | |
| def get_tags(dir1, dir2, tag_column): | |
| dir1_files = xlsx_files(dir1) | |
| dir2_files = xlsx_files(dir2) | |
| common_files = [] | |
| for f in dir1_files: | |
| if f not in dir2_files: | |
| print("Warning: file {} exists in {} but not in {}".format(f, dir1, dir2)) | |
| for f in dir2_files: | |
| if f not in dir1_files: | |
| print("Warning: file {} exists in {} but not in {}".format(f, dir2, dir1)) | |
| else: | |
| common_files.append(f) | |
| all_tags_1 = [] | |
| all_tags_2 = [] | |
| for f in common_files: | |
| sheet1 = load_workbook(dir1 + os.sep + f) | |
| sheet2 = load_workbook(dir2 + os.sep + f) | |
| column1 = sheet1[sheet1.sheetnames[0]][tag_column] | |
| column2 = sheet2[sheet2.sheetnames[0]][tag_column] | |
| tags1 = [x.value for x in column1][1:] | |
| tags2 = [x.value for x in column2][1:] | |
| if len(tags1) != len(tags2): | |
| print("Warning: {} has {} tags in {}, but {} tags in {}." | |
| .format(f, len(tags1), dir1, len(tags2), dir2)) | |
| for i in range(min(len(tags1), len(tags2))): | |
| if not (tags1[i] == None and tags2[i] == None): | |
| if tags1[i] == None: tags1[i] = "<EMPTY>" | |
| if tags2[i] == None: tags2[i] = "<EMPTY>" | |
| all_tags_1.append(tags1[i]) | |
| all_tags_2.append(tags2[i]) | |
| return all_tags_1, all_tags_2 | |
| def raw_agreement(tags1, tags2): | |
| common = [x for i, x in enumerate(tags1) if tags2[i] == x] | |
| return len(common) / len(tags1) | |
| def tagset(tags1, tags2): | |
| return set(tags1).union(set(tags2)) | |
| def calc_freqs(tags1, tags2): | |
| freqs1 = defaultdict(int) | |
| for x in tags1: freqs1[x] += 1 | |
| freqs2 = defaultdict(int) | |
| for x in tags2: freqs2[x] += 1 | |
| return freqs1, freqs2 | |
| def expected_agreement(tags1, tags2): | |
| freqs1, freqs2 = calc_freqs(tags1, tags2) | |
| s = 0 | |
| for x in tagset(tags1, tags2): | |
| s += freqs1[x] * freqs2[x] | |
| s /= len(tags1) ** 2 | |
| return s | |
| def cohens_kappa(tags1, tags2): | |
| raw = raw_agreement(tags1, tags2) | |
| expected = expected_agreement(tags1, tags2) | |
| return (raw - expected) / (1 - expected) | |
| def write_confusion_matrix(tags1, tags2, outfile="confusion.csv"): | |
| tags = sorted(list(tagset(tags1, tags2))) | |
| tag_index = {x: i for i,x in enumerate(tags)} | |
| freqs = calc_freqs(tags1, tags2) | |
| m = [[0 for j in range(len(tags))] | |
| for i in range(len(tags))] | |
| for x in range(len(tags1)): | |
| i = tag_index[tags1[x]] | |
| j = tag_index[tags2[x]] | |
| m[i][j] += 1 | |
| with open(outfile, 'w') as f: | |
| f.write(",".join(tags) + "\n") | |
| for row in m: | |
| f.write(",".join(map(str, row)) + "\n") | |
| print("Wrote confusion matrix to {}".format(outfile)) | |
| def main(dir1, dir2, tagcol="J"): | |
| tags1, tags2 = get_tags(dir1, dir2, tagcol) | |
| print("Raw agreement: {}".format(raw_agreement(tags1, tags2))) | |
| print("Cohen's kappa: {}".format(cohens_kappa(tags1, tags2))) | |
| write_confusion_matrix(tags1, tags2) | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser(description="Compute agreement statistics and confusion matrix for spreadsheets across two directories") | |
| parser.add_argument("dir1") | |
| parser.add_argument("dir2") | |
| parser.add_argument("--tagcol", help="Letter of column that contains tags") | |
| args = parser.parse_args() | |
| dir1 = args.dir1 | |
| dir2 = args.dir2 | |
| if not (os.path.isdir(dir1) and os.path.exists(dir1)): | |
| print("Error: directory '{}' does not exist.".format(dir1)) | |
| exit(-1) | |
| if not (os.path.isdir(dir2) and os.path.exists(dir2)): | |
| print("Error: directory '{}' does not exist.".format(dir2)) | |
| exit(-1) | |
| if args.tagcol: | |
| main(dir1, dir2, tagcol) | |
| else: | |
| main(dir1, dir2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment