Created
July 19, 2018 14:11
-
-
Save deconstructionalism/9c375222b9defcaf65b4c274bf9abf59 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import os | |
| import pandas as pd | |
| import sys | |
| from crayons import white, green, blue, red | |
| from datetime import datetime as dt | |
| from pandas.errors import EmptyDataError | |
| from tabulate import tabulate | |
| try: | |
| input = raw_input | |
| except NameError: | |
| pass | |
| FILE = { | |
| 'name': 'data.csv', | |
| } | |
| DATA = {} | |
| QUESTIONS = [ | |
| { | |
| 'query': 'Enter date (leave blank to use today\'s date)', | |
| 'key_name': 'date', | |
| 'validate_func': lambda x: dt.strptime(x, '%Y-%m-%d').strftime('%Y-%m-%d'), | |
| 'if_blank_func': lambda: dt.today().strftime('%Y-%m-%d') | |
| }, | |
| { | |
| 'query': 'Enter time (leave blank to use time right now)', | |
| 'key_name': 'time', | |
| 'validate_func': lambda x: dt.strptime(x, '%I:%M %p').strftime('%I:%M %p'), | |
| 'if_blank_func': lambda: dt.today().strftime('%I:%M %p') | |
| }, | |
| { | |
| 'query': 'Description of event', | |
| 'key_name': 'description' | |
| }, | |
| { | |
| 'query': 'Physical state', | |
| 'key_name': 'physical_state' | |
| } | |
| ] | |
| def print_t(x): print(white('{}\n{}\n'.format(x.upper(), '_' * len(x)), bold=True)) | |
| def print_q(x): print(green(x.upper(), bold=True)) | |
| def print_e(x): print(red(x, bold=True)) | |
| def print_r(x): print(blue(x, bold=True)) | |
| def ask_question(q_index=-1, query='', key_name=None, validate_func=False, if_blank_func=False, add_to_data=True): | |
| while True: | |
| print_q('{}{}'.format(str(q_index) + ') ' if q_index > -1 else '', query)) | |
| val = input(': ') | |
| if val == '': | |
| if if_blank_func: | |
| val = if_blank_func() | |
| break | |
| else: | |
| print_e('blank value not allowed!\n') | |
| continue | |
| else: | |
| try: | |
| if validate_func: | |
| val = validate_func(val) | |
| break | |
| except ValueError as e: | |
| print_e('ValueError: {}\n'.format(e)) | |
| continue | |
| break | |
| if add_to_data: | |
| print_r('Added {}: "{}"\n'.format(key_name, val)) | |
| DATA[key_name] = val | |
| else: | |
| return val | |
| def validate_yes_no(x): | |
| if x not in ['Y', 'y', 'n', 'N']: | |
| raise ValueError('You must choose "Y" or "N"') | |
| else: | |
| return True if x in ['Y', 'y'] else False | |
| def confirm_add(): | |
| print('') | |
| print_t('add data to database') | |
| for key, val in DATA.items(): | |
| print_r('{}:\t"{}"'.format(key, val)) | |
| print('') | |
| return ask_question(**{ | |
| 'query': 'Do you want to add this data?', | |
| 'validate_func': validate_yes_no, | |
| 'add_to_data': False | |
| }) | |
| def confirm_make_new_db(): | |
| print('') | |
| print_t('you have a "data.csv" file which contains no csv columns') | |
| return ask_question(**{ | |
| 'query': 'Do you want to make a new "data.csv" (your old "data.csv" will be renamed, not overwritten)?', 'validate_func': validate_yes_no, | |
| 'add_to_data': False | |
| }) | |
| def make_new_db(move_old_file=False): | |
| if move_old_file and os.path.exists(FILE['path']): | |
| dt_string = dt.today().strftime('%Y%m%d%H%M') | |
| new_name = '{}_{}'.format(dt_string, FILE['name']) | |
| new_path = os.path.join(FILE['dir'], new_name) | |
| os.rename(FILE['path'], new_path) | |
| print_r('Renamed "{}" to "{}"'.format(FILE['path'], new_path)) | |
| with open(FILE['path'], 'w') as f: | |
| keys = get_keys_from_questions() | |
| f.write(','.join(keys)) | |
| print_r('Made file "{}"'.format(FILE['path'])) | |
| def get_keys_from_questions(): | |
| return list(map(lambda x: x['key_name'], QUESTIONS)) | |
| # def load_csv(create_if_missing=False): | |
| # try: | |
| # df = pd.read_csv(FILE['path']) | |
| # return df | |
| # except (OSError, FileNotFoundError) as e: | |
| # if create_if_missing: | |
| # make_new_db() | |
| # df = pd.read_csv(FILE['path']) | |
| # return df | |
| # except (EmptyDataError) as e: | |
| # print_e('EmptyDataError: {}\n'.format(e)) | |
| # if create_if_missing: | |
| # make_new = confirm_make_new_db() | |
| # if make_new: | |
| # make_new_db(move_old_file=True) | |
| # df = pd.read_csv(FILE['path']) | |
| # return df | |
| # return False | |
| def add_data_to_csv(): | |
| try: | |
| df = pd.read_csv(FILE['path']) | |
| except (OSError, FileNotFoundError) as e: | |
| make_new_db() | |
| df = pd.read_csv(FILE['path']) | |
| except (EmptyDataError) as e: | |
| print_e('EmptyDataError: {}\n'.format(e)) | |
| make_new = confirm_make_new_db() | |
| if not make_new: | |
| print_e('\nNot able to add data to {}!'.format(FILE['path'])) | |
| sys.exit(1) | |
| else: | |
| make_new_db(move_old_file=True) | |
| df = pd.read_csv(FILE['path']) | |
| keys = get_keys_from_questions() | |
| columns = df.columns.values | |
| all_keys_in_df = all([key in columns for key in keys]) | |
| keys_same_len = len(columns) == len(keys) | |
| if not (all_keys_in_df and keys_same_len): | |
| print_e('Columns in "{}": {} do not match question keys: {}\n'.format(FILE['path'], columns, keys)) | |
| make_new = confirm_make_new_db() | |
| if not make_new: | |
| print_e('\nNot able to add data to {}!'.format(FILE['path'])) | |
| sys.exit(1) | |
| else: | |
| make_new_db(move_old_file=True) | |
| df = pd.read_csv(FILE['path']) | |
| df = df.append(DATA, ignore_index=True) | |
| df.to_csv(FILE['path'], index=False) | |
| def main(read=False): | |
| os.system('cls') # For Windows | |
| os.system('clear') # For Linux/OS X | |
| script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| FILE['path'] = os.path.join(script_dir, FILE['name']) | |
| FILE['dir'], FILE['name'] = os.path.split(FILE['path']) | |
| if read: | |
| df = pd.read_csv(FILE['path']) | |
| print(tabulate(df, headers='keys', tablefmt='fancy_grid')) | |
| sys.exit(0) | |
| try: | |
| print_t('Input your data') | |
| for i, question in enumerate(QUESTIONS): | |
| ask_question(i + 1, **question) | |
| add_data = confirm_add() | |
| if not add_data: | |
| print_e('\nEXITING') | |
| sys.exit(0) | |
| else: | |
| add_data_to_csv() | |
| except KeyboardInterrupt: | |
| print_e('\nEXITING') | |
| sys.exit(0) | |
| if __name__ == '__main__': | |
| read = True if len(sys.argv) > 1 and sys.argv[1] in ['-r', '--read'] else False | |
| main(read) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment