Last active
July 17, 2018 03:37
-
-
Save garyfeng/72545b10424acb8a081a07c58f46c07d to your computer and use it in GitHub Desktop.
The following scripts were used on 2018-07-09 to generate descriptive writing features for 2017 reporting. It's done on Kurt, using the pdia docker. Steps:
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Functions for extracting the descriptive features for the 2017 operational analysis | |
# Changes from make2017FinalFeatures.py: | |
# - min | |
############# | |
import StringIO | |
import os | |
import traceback | |
from pdia.durSinceBlockStart import * | |
from pdia.writing2017.addFeatureTextChange import addTextChangeVars | |
from pdia.writing2017.addKeyPressVars import addKeyPressVars | |
from pdia.writing2017.burstFeatures import * | |
from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures | |
from pdia.writing2017.featuresConfig2017 import featureConfig2017 | |
from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress | |
from pdia.writing2017.getData import getData | |
from pdia.writing2017.reducePauseFeatures import reducePauseFeatures | |
from pdia.writing2017.addWordTokens import * | |
# Configs | |
# parser config | |
from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures | |
# 2016 default configuration | |
# 2017 data config | |
# Read data | |
def getData2017(filename, featureConfig=featureConfig2017): | |
""" | |
Simply a wrap of getData with the 2017 config | |
:param filename: the file name to process | |
:param featureConfig: using the 2017 configuration | |
:return: the parsed df | |
""" | |
return getData(filename, featureConfig=featureConfig) | |
def mapStep(df, feaConfig, verbose=False): | |
""" | |
MAP step: creating keystroke level features, adding columns | |
:param df: the data frame for a booklet, contain potentially multiple blocks | |
:param feaConfig: the configuration for data import/parsing | |
:param verbose: if True, saves the interim data | |
""" | |
# asserts | |
if df is None: | |
logger.error("MapStep: input df is None; quitting") | |
return None | |
if not any([(k in df.columns) for k in feaConfig["byVars"]]): | |
# keyword missing | |
return None | |
studentID = df["BookletNumber"].unique()[0] | |
# ##### MAP #### | |
# to handle the feature functions in the featureMap object | |
# ############## | |
def mapBlock(d): | |
# return None if no keystroke log is available | |
if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0: | |
# print("mapBlock: No Observable data for the block") | |
logger.debug("mapBlock: No Observable data for the block") | |
return None | |
d = durSinceBlockStart(d) if d is not None else None | |
#d = addKeyPressVars(d) if d is not None else None | |
#d = addTextChangeVars(d) if d is not None else None | |
d = addFeatureIKI(d) if d is not None else None | |
d = addWordTokens(d) if d is not None else None | |
# garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5 | |
d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None | |
return d | |
try: | |
# the following groupby().apply() is causing occasional python crashes | |
# df = df \ | |
# .groupby(feaConfig["byVars"]) \ | |
# .apply(mapBlock) | |
# taking a stupid method here | |
tmp=[] | |
for b in df["BlockCode"].unique(): | |
tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock)) | |
df = pd.concat(tmp) | |
except Exception as e: | |
logger.error("Error in mapStep") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8") | |
return | |
# saving | |
if verbose: | |
outputFileName = "{}_mapStep.csv".format( | |
df["BookletNumber"].unique()[0] | |
) | |
# remove | |
df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8") | |
# simplified for human reading | |
outputFileName = "{}_mapStep_simplified.csv".format( | |
df["BookletNumber"].unique()[0] | |
) | |
rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"]) | |
df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len() | |
colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount', | |
'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI', | |
'reconCursorPosition', 'textLength', "textLenReconText", | |
'textContext', 'intendedWord', 'currentToken', | |
# 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial', | |
'intraWord', | |
'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace', | |
'reconstructedText'] | |
# to get rid of duplicated columns, remove the multiple index first | |
df.loc[rowsToKeep, colsToKeep]\ | |
.to_csv(outputFileName, encoding="utf-8") | |
return df | |
# note we are not catching exceptions here, to save time. | |
# errors are caught at the highest level | |
def reduceStep(df, feaConfig, verbose=False): | |
""" | |
REDUCE step: taking the df after the MAP step, and reduce to features, one block a row. | |
:param df: the df passed from the mapStep | |
:param feaConfig: the configuration file with parameters setting the byVars | |
:param verbose: to be passed to reduce functions to save interim data frame if True | |
:return: a Pandas data frame, with # of rows as blocks, and features as columns | |
""" | |
# asserts | |
if df is None: | |
logger.error("ReduceStep: input df is None; quitting") | |
return None | |
if not any([(k in df.columns) for k in feaConfig["byVars"]]): | |
# keyword missing | |
return None | |
studentID = df["BookletNumber"].unique()[0] | |
# #### Reduce #### | |
# here we begin to summarize the feature columns | |
# ################ | |
# This is obviously a waste of time to repeat some feature steps in these | |
# will deal with this later. For now, this is pleasing to the eyes | |
try: | |
dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: reduceFeatureInitialKeypress(d, verbose=verbose) | |
).reset_index() | |
#print dfFeaInitialKeypress | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceFeatureInitialKeypress") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: reduceWordBasedFeatures(d, verbose=verbose) | |
).reset_index() | |
#print dfFeaWordBased | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceWordBasedFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
dfFeaPauses = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: reducePauseFeatures(d, verbose=verbose) | |
).reset_index() | |
#print dfFeaPauses | |
except Exception as e: | |
logger.error("Error in reduceStep: reducePauseFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
# garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions | |
dfFeaDelete = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1) | |
).reset_index() | |
#print dfFeaDelete | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceDeleteFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
dfFeaEditing = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: reduceEditingFeatures(d, verbose=verbose) | |
).reset_index() | |
#print dfFeaEditing | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceEditingFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: d\ | |
.loc[d.reconstructedText.notnull()]\ | |
.reconstructedText.iloc[-1].count("`") | |
).rename("flagDiscrepancyMarkers").reset_index() | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceEditingFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
adminEventList = feaConfig['adminEventList'] | |
nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: d\ | |
.loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \ | |
.shape[0] | |
).rename("flagAdminRaiseHandEvents").reset_index() | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceEditingFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased, | |
dfFeaPauses, dfFeaDelete, dfFeaEditing, | |
nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1) | |
except Exception as e: | |
logger.error("Error in reduceStep: merging all features") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
return dfSummary | |
def processBooklet(filename, | |
featureConfig, | |
verbose=False, | |
outputFeaturePath = ".", | |
featureSetName = "finalFeatures", ): | |
""" | |
Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving. | |
:param filename: full path to the CSV file | |
:param featureConfig: the dict with config info | |
:param verbose: if true, save intermediate data frames to the current directory | |
:param outputFeaturePath: output path | |
:param featureSetName: name of the final feature set; will be the last part of the output csv file name | |
:return: none | |
""" | |
# output file path and name | |
outputFeatureFileName = os.path.join(outputFeaturePath, | |
os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv") | |
# debug | |
logger.info("Processing %s", filename) | |
############# | |
# Get Data | |
try: | |
df = getData(filename, featureConfig=featureConfig) | |
except: | |
df = None | |
if df is None: | |
logger.error("processBooklet: getData failed for %s", filename) | |
return | |
studentID = df["BookletNumber"].unique()[0] | |
############# | |
# Map | |
#logger.info("Map %s", filename) | |
try: | |
df = mapStep(df, verbose=verbose, feaConfig=featureConfig) | |
except Exception as e: | |
logger.error("Error in mapStep: %s", filename) | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
return | |
if df is None: | |
logger.error("processBooklet: mapStep failed for %s", filename) | |
return | |
############# | |
# Reduce | |
#logger.info("Reduce %s", filename) | |
try: | |
df = reduceStep(df, verbose=verbose, feaConfig=featureConfig) | |
except Exception as e: | |
logger.error("Error in reduceStep: %s", filename) | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
return | |
if df is None: | |
logger.error("processBooklet: reduceStep failed for %s", filename) | |
return | |
############# | |
# Save Data | |
# debug | |
logger.info("Saving %s", filename) | |
try: | |
# first drop duplicated rows (occasionally there will be) | |
# then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force | |
df \ | |
.loc[:, ~df.columns.duplicated()]\ | |
.drop_duplicates() \ | |
.to_csv(outputFeatureFileName, encoding='utf-8') | |
except Exception as e: | |
logger.error("Error writing to_csv: %s", outputFeatureFileName) | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
return | |
logger.info("Done. Output= %s", outputFeatureFileName) | |
return | |
def processBooklet_dask(filename, | |
featureConfig, | |
verbose=False, | |
outputFeaturePath = ".", | |
featureSetName = "finalFeatures"): | |
""" | |
processing a writing CSV file, for dask parallel processing. We remove any logger reference here. | |
:param filename: full path to the CSV file | |
:param featureConfig: the dict with config info | |
:param verbose: if true, save intermediate data frames to the current directory | |
:param outputFeaturePath: output path | |
:param featureSetName: name of the final feature set; will be the last part of the output csv file name | |
:return: none | |
""" | |
# output file path and name | |
outputFeatureFileName = os.path.join(outputFeaturePath, | |
os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv") | |
############# | |
# Get Data | |
try: | |
df = getData(filename, featureConfig=featureConfig) | |
except: | |
return | |
if df is None: | |
logger.error("processBooklet: getData failed for %s", filename) | |
return | |
############# | |
# Map | |
try: | |
df = mapStep(df, verbose=verbose, feaConfig=featureConfig) | |
except: | |
return | |
if df is None: | |
#logger.error("processBooklet: mapStep failed for %s", filename) | |
return | |
############# | |
# Reduce | |
try: | |
df = reduceStep(df, verbose=verbose, feaConfig=featureConfig) | |
except: | |
return | |
if df is None: | |
return | |
############# | |
# Save Data | |
try: | |
# first drop duplicated rows (occasionally there will be) | |
# then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force | |
df \ | |
.loc[:, ~df.columns.duplicated()]\ | |
.drop_duplicates() \ | |
.to_csv(outputFeatureFileName, encoding='utf-8') | |
except Exception as e: | |
return | |
return | |
import sys | |
if __name__ == '__main__': | |
if len(sys.argv) == 1: | |
print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n" | |
exit() | |
if sys.argv[1] not in ["Grade4", "Grade8", "test"]: | |
print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n" | |
exit() | |
import glob | |
from pdia import * | |
from pdia.writing2017.make2017Features import * | |
import dask.bag as db | |
from distributed import Client | |
import datetime | |
import time | |
# paths | |
today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script | |
# garyfeng: to resume from a run: | |
# today = "20180709_21" | |
#### | |
grade = sys.argv[1] | |
inputCSVPath = "{}/".format(grade) | |
outputFeaturePath = "{}_descFeatures_{}/".format(grade, today) | |
if not os.path.exists(outputFeaturePath): | |
os.makedirs(outputFeaturePath) | |
featureSetName = "descFeature{}".format(today) | |
print "input folder: {}".format(inputCSVPath) | |
print "output folder: {}".format(outputFeaturePath) | |
print "featureSetName: {}".format(featureSetName) | |
######### | |
# getting the files to process | |
print "======= Scanning for CSV files ============" | |
print datetime.datetime.now() | |
fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv")) | |
if len(fileList)==0: | |
print "\nNo CSV files found in the directory\n" | |
exit() | |
########## | |
# garyfeng: to resume by ignoring ones with output already. | |
finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName))) | |
finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles] | |
fileList = list(set(fileList) - set(finishedFiles)) | |
########## | |
print "Total input CSV files: %i" % len(fileList) | |
print datetime.datetime.now() | |
import gc | |
def processIt(filename): | |
processBooklet_dask(filename, | |
featureConfig=featureConfig2017, | |
verbose=False, | |
outputFeaturePath=outputFeaturePath, | |
featureSetName=featureSetName) | |
gc.collect() | |
return | |
print "======= Begin Processing ============" | |
print datetime.datetime.now() | |
print "=====================================" | |
# test with 1 file | |
# processFile(fileList[0]) | |
# Using distributed clients | |
client = Client() | |
# run parallel with dask | |
db.from_sequence(fileList).map(processIt).compute() | |
print "======== End Processing ===========" | |
print datetime.datetime.now() | |
print "===================================" | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Functions for extracting the descriptive features for the 2017 operational analysis | |
# Changes from make2017FinalFeatures.py: | |
# - min | |
############# | |
import StringIO | |
import os | |
import traceback | |
from pdia.durSinceBlockStart import * | |
from pdia.writing2017.addFeatureTextChange import addTextChangeVars | |
from pdia.writing2017.addKeyPressVars import addKeyPressVars | |
from pdia.writing2017.burstFeatures import * | |
from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures | |
from pdia.writing2017.featuresConfig2017 import featureConfig2017 | |
from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress | |
from pdia.writing2017.getData import getData | |
from pdia.writing2017.reducePauseFeatures import reducePauseFeatures | |
from pdia.writing2017.addWordTokens import * | |
# Configs | |
# parser config | |
from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures | |
# 2016 default configuration | |
# 2017 data config | |
# Read data | |
def getData2017(filename, featureConfig=featureConfig2017): | |
""" | |
Simply a wrap of getData with the 2017 config | |
:param filename: the file name to process | |
:param featureConfig: using the 2017 configuration | |
:return: the parsed df | |
""" | |
return getData(filename, featureConfig=featureConfig) | |
def mapStep(df, feaConfig, verbose=False): | |
""" | |
MAP step: creating keystroke level features, adding columns | |
:param df: the data frame for a booklet, contain potentially multiple blocks | |
:param feaConfig: the configuration for data import/parsing | |
:param verbose: if True, saves the interim data | |
""" | |
# asserts | |
if df is None: | |
logger.error("MapStep: input df is None; quitting") | |
return None | |
if not any([(k in df.columns) for k in feaConfig["byVars"]]): | |
# keyword missing | |
return None | |
studentID = df["BookletNumber"].unique()[0] | |
# ##### MAP #### | |
# to handle the feature functions in the featureMap object | |
# ############## | |
def mapBlock(d): | |
# return None if no keystroke log is available | |
if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0: | |
# print("mapBlock: No Observable data for the block") | |
logger.debug("mapBlock: No Observable data for the block") | |
return None | |
d = durSinceBlockStart(d) if d is not None else None | |
#d = addKeyPressVars(d) if d is not None else None | |
#d = addTextChangeVars(d) if d is not None else None | |
d = addFeatureIKI(d) if d is not None else None | |
d = addWordTokens(d) if d is not None else None | |
# garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5 | |
d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None | |
return d | |
try: | |
# the following groupby().apply() is causing occasional python crashes | |
# df = df \ | |
# .groupby(feaConfig["byVars"]) \ | |
# .apply(mapBlock) | |
# taking a stupid method here | |
tmp=[] | |
for b in df["BlockCode"].unique(): | |
tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock)) | |
df = pd.concat(tmp) | |
except Exception as e: | |
logger.error("Error in mapStep") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8") | |
return | |
# saving | |
if verbose: | |
outputFileName = "{}_mapStep.csv".format( | |
df["BookletNumber"].unique()[0] | |
) | |
# remove | |
df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8") | |
# simplified for human reading | |
outputFileName = "{}_mapStep_simplified.csv".format( | |
df["BookletNumber"].unique()[0] | |
) | |
rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"]) | |
df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len() | |
colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount', | |
'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI', | |
'reconCursorPosition', 'textLength', "textLenReconText", | |
'textContext', 'intendedWord', 'currentToken', | |
# 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial', | |
'intraWord', | |
'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace', | |
'reconstructedText'] | |
# to get rid of duplicated columns, remove the multiple index first | |
df.loc[rowsToKeep, colsToKeep]\ | |
.to_csv(outputFileName, encoding="utf-8") | |
return df | |
# note we are not catching exceptions here, to save time. | |
# errors are caught at the highest level | |
def reduceStep(df, feaConfig, verbose=False): | |
""" | |
REDUCE step: taking the df after the MAP step, and reduce to features, one block a row. | |
:param df: the df passed from the mapStep | |
:param feaConfig: the configuration file with parameters setting the byVars | |
:param verbose: to be passed to reduce functions to save interim data frame if True | |
:return: a Pandas data frame, with # of rows as blocks, and features as columns | |
""" | |
# asserts | |
if df is None: | |
logger.error("ReduceStep: input df is None; quitting") | |
return None | |
if not any([(k in df.columns) for k in feaConfig["byVars"]]): | |
# keyword missing | |
return None | |
studentID = df["BookletNumber"].unique()[0] | |
# #### Reduce #### | |
# here we begin to summarize the feature columns | |
# ################ | |
# This is obviously a waste of time to repeat some feature steps in these | |
# will deal with this later. For now, this is pleasing to the eyes | |
try: | |
dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: reduceFeatureInitialKeypress(d, verbose=verbose) | |
).reset_index() | |
#print dfFeaInitialKeypress | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceFeatureInitialKeypress") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: reduceWordBasedFeatures(d, verbose=verbose) | |
).reset_index() | |
#print dfFeaWordBased | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceWordBasedFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
dfFeaPauses = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: reducePauseFeatures(d, verbose=verbose) | |
).reset_index() | |
#print dfFeaPauses | |
except Exception as e: | |
logger.error("Error in reduceStep: reducePauseFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
# garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions | |
dfFeaDelete = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1) | |
).reset_index() | |
#print dfFeaDelete | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceDeleteFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
dfFeaEditing = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: reduceEditingFeatures(d, verbose=verbose) | |
).reset_index() | |
#print dfFeaEditing | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceEditingFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: d\ | |
.loc[d.reconstructedText.notnull()]\ | |
.reconstructedText.iloc[-1].count("`") | |
).rename("flagDiscrepancyMarkers").reset_index() | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceEditingFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
adminEventList = feaConfig['adminEventList'] | |
nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply( | |
lambda d: d\ | |
.loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \ | |
.shape[0] | |
).rename("flagAdminRaiseHandEvents").reset_index() | |
except Exception as e: | |
logger.error("Error in reduceStep: reduceEditingFeatures") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
try: | |
dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased, | |
dfFeaPauses, dfFeaDelete, dfFeaEditing, | |
nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1) | |
except Exception as e: | |
logger.error("Error in reduceStep: merging all features") | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8") | |
return | |
return dfSummary | |
def processBooklet(filename, | |
featureConfig, | |
verbose=False, | |
outputFeaturePath = ".", | |
featureSetName = "finalFeatures", ): | |
""" | |
Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving. | |
:param filename: full path to the CSV file | |
:param featureConfig: the dict with config info | |
:param verbose: if true, save intermediate data frames to the current directory | |
:param outputFeaturePath: output path | |
:param featureSetName: name of the final feature set; will be the last part of the output csv file name | |
:return: none | |
""" | |
# output file path and name | |
outputFeatureFileName = os.path.join(outputFeaturePath, | |
os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv") | |
# debug | |
logger.info("Processing %s", filename) | |
############# | |
# Get Data | |
try: | |
df = getData(filename, featureConfig=featureConfig) | |
except: | |
df = None | |
if df is None: | |
logger.error("processBooklet: getData failed for %s", filename) | |
return | |
studentID = df["BookletNumber"].unique()[0] | |
############# | |
# Map | |
#logger.info("Map %s", filename) | |
try: | |
df = mapStep(df, verbose=verbose, feaConfig=featureConfig) | |
except Exception as e: | |
logger.error("Error in mapStep: %s", filename) | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
return | |
if df is None: | |
logger.error("processBooklet: mapStep failed for %s", filename) | |
return | |
############# | |
# Reduce | |
#logger.info("Reduce %s", filename) | |
try: | |
df = reduceStep(df, verbose=verbose, feaConfig=featureConfig) | |
except Exception as e: | |
logger.error("Error in reduceStep: %s", filename) | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
return | |
if df is None: | |
logger.error("processBooklet: reduceStep failed for %s", filename) | |
return | |
############# | |
# Save Data | |
# debug | |
logger.info("Saving %s", filename) | |
try: | |
# first drop duplicated rows (occasionally there will be) | |
# then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force | |
df \ | |
.loc[:, ~df.columns.duplicated()]\ | |
.drop_duplicates() \ | |
.to_csv(outputFeatureFileName, encoding='utf-8') | |
except Exception as e: | |
logger.error("Error writing to_csv: %s", outputFeatureFileName) | |
logger.exception(e) | |
exc_buffer = StringIO.StringIO() | |
traceback.print_exc(file=exc_buffer) | |
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) | |
return | |
logger.info("Done. Output= %s", outputFeatureFileName) | |
return | |
def processBooklet_dask(filename, | |
featureConfig, | |
verbose=False, | |
outputFeaturePath = ".", | |
featureSetName = "finalFeatures"): | |
""" | |
processing a writing CSV file, for dask parallel processing. We remove any logger reference here. | |
:param filename: full path to the CSV file | |
:param featureConfig: the dict with config info | |
:param verbose: if true, save intermediate data frames to the current directory | |
:param outputFeaturePath: output path | |
:param featureSetName: name of the final feature set; will be the last part of the output csv file name | |
:return: none | |
""" | |
# output file path and name | |
outputFeatureFileName = os.path.join(outputFeaturePath, | |
os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv") | |
############# | |
# Get Data | |
try: | |
df = getData(filename, featureConfig=featureConfig) | |
except: | |
return | |
if df is None: | |
logger.error("processBooklet: getData failed for %s", filename) | |
return | |
############# | |
# Map | |
try: | |
df = mapStep(df, verbose=verbose, feaConfig=featureConfig) | |
except: | |
return | |
if df is None: | |
#logger.error("processBooklet: mapStep failed for %s", filename) | |
return | |
############# | |
# Reduce | |
try: | |
df = reduceStep(df, verbose=verbose, feaConfig=featureConfig) | |
except: | |
return | |
if df is None: | |
return | |
############# | |
# Save Data | |
try: | |
# first drop duplicated rows (occasionally there will be) | |
# then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force | |
df \ | |
.loc[:, ~df.columns.duplicated()]\ | |
.drop_duplicates() \ | |
.to_csv(outputFeatureFileName, encoding='utf-8') | |
except Exception as e: | |
return | |
return | |
import sys | |
if __name__ == '__main__': | |
if len(sys.argv) == 1: | |
print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n" | |
exit() | |
if sys.argv[1] not in ["Grade4", "Grade8", "test"]: | |
print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n" | |
exit() | |
import glob | |
from pdia import * | |
from pdia.writing2017.make2017Features import * | |
import dask.bag as db | |
from distributed import Client | |
import datetime | |
import time | |
# paths | |
today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script | |
# today = "20180711_04" | |
grade = sys.argv[1] | |
inputCSVPath = "{}/".format(grade) | |
outputFeaturePath = "{}_descFeatures_{}/".format(grade, today) | |
if not os.path.exists(outputFeaturePath): | |
os.makedirs(outputFeaturePath) | |
featureSetName = "descFeature{}".format(today) | |
print "input folder: {}".format(inputCSVPath) | |
print "output folder: {}".format(outputFeaturePath) | |
print "featureSetName: {}".format(featureSetName) | |
######### | |
# getting the files to process | |
print "======= Scanning for CSV files ============" | |
print datetime.datetime.now() | |
fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv")) | |
if len(fileList)==0: | |
print "\nNo CSV files found in the directory\n" | |
exit() | |
########## | |
# garyfeng: to resume by ignoring ones with output already. | |
finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName))) | |
finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles] | |
fileList = list(set(fileList) - set(finishedFiles)) | |
# error files should not be repeated | |
finishedFiles = glob.glob("Error_*.csv") | |
finishedFiles = ["{}/Writing_Grade8_{}_ObservableData.csv".format(grade, f.split("_")[1]) for f in finishedFiles] | |
fileList = list(set(fileList) - set(finishedFiles)) | |
########## | |
print "Total input CSV files: %i" % len(fileList) | |
print datetime.datetime.now() | |
import gc | |
def processIt(filename): | |
processBooklet_dask(filename, | |
featureConfig=featureConfig2017, | |
verbose=False, | |
outputFeaturePath=outputFeaturePath, | |
featureSetName=featureSetName) | |
gc.collect() | |
return | |
print "======= Begin Processing ============" | |
print datetime.datetime.now() | |
print "=====================================" | |
# test with 1 file | |
# processFile(fileList[0]) | |
# Using distributed clients | |
client = Client() | |
# run parallel with dask | |
db.from_sequence(fileList).map(processIt).compute() | |
print "======== End Processing ===========" | |
print datetime.datetime.now() | |
print "===================================" | |
# To restart | |
# until python run_grade8.py Grade8; do | |
# echo "Program crashed with exit code $?. Respawning.." >&2 | |
# sleep 1 | |
# done |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SET NUM=0 | |
:LOOP | |
SET /A NUM = %NUM% + 1 | |
FOR /f "usebackq" %%G in (`docker ps -q -f "name=pdj-%NUM%"`) DO set PDJID=%%G | |
IF [%PDJID%] == [] GOTO QUERY | |
SET PDJID= | |
GOTO LOOP | |
:QUERY | |
set INPUT= | |
set /P INPUT=Which version of pdia would you like to use? (1=master, 2=2018xval): %=% | |
IF /I "%INPUT%"=="1" GOTO MASTER | |
IF /I "%INPUT%"=="2" GOTO 2018XVAL | |
GOTO QUERY | |
:MASTER | |
SET PDJTAG=master | |
GOTO RUN | |
:2018XVAL | |
SET PDJTAG=2018xval | |
GOTO RUN | |
:RUN | |
SET /A PORT = %NUM% + 8887 | |
docker pull pdia/docked-jupyter:%PDJTAG% | |
docker run -p %PORT%:8888 -h CONTAINER -d -it --rm --name pdj-%NUM%-%PDJTAG% -v "%cd%":/home/jovyan/work pdia/docked-jupyter:%PDJTAG% jupyter notebook --NotebookApp.token='pdia' | |
timeout /t 5 | |
start http://localhost:%PORT%/?token=pdia | |
timeout /t -1 | |
docker stop pdj-%NUM%-%PDJTAG% |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Steps:
D:\2017Writing
onKurt
, where we have writing logs in "Grade4" and "Grade8" folders.. activate pdia
to activate the python 2 env with pdiapython xxxx.py Grade4
orpython xxx.py Grade8
."until python xxxx.py Grade 4; do sleep 1;"
done; to relaunch it automatically.This may take 10-20 hours per grade on Kurt.