Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Functions for extracting the descriptive features for the 2017 operational analysis
- # Changes from make2017FinalFeatures.py:
- # - min
- #############
- import StringIO
- import os
- import traceback
- from pdia.durSinceBlockStart import *
- from pdia.writing2017.addFeatureTextChange import addTextChangeVars
- from pdia.writing2017.addKeyPressVars import addKeyPressVars
- from pdia.writing2017.burstFeatures import *
- from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures
- from pdia.writing2017.featuresConfig2017 import featureConfig2017
- from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress
- from pdia.writing2017.getData import getData
- from pdia.writing2017.reducePauseFeatures import reducePauseFeatures
- from pdia.writing2017.addWordTokens import *
- # Configs
- # parser config
- from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures
- # 2016 default configuration
- # 2017 data config
- # Read data
- def getData2017(filename, featureConfig=featureConfig2017):
- """
- Simply a wrap of getData with the 2017 config
- :param filename: the file name to process
- :param featureConfig: using the 2017 configuration
- :return: the parsed df
- """
- return getData(filename, featureConfig=featureConfig)
- def mapStep(df, feaConfig, verbose=False):
- """
- MAP step: creating keystroke level features, adding columns
- :param df: the data frame for a booklet, contain potentially multiple blocks
- :param feaConfig: the configuration for data import/parsing
- :param verbose: if True, saves the interim data
- """
- # asserts
- if df is None:
- logger.error("MapStep: input df is None; quitting")
- return None
- if not any([(k in df.columns) for k in feaConfig["byVars"]]):
- # keyword missing
- return None
- studentID = df["BookletNumber"].unique()[0]
- # ##### MAP ####
- # to handle the feature functions in the featureMap object
- # ##############
- def mapBlock(d):
- # return None if no keystroke log is available
- if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0:
- # print("mapBlock: No Observable data for the block")
- logger.debug("mapBlock: No Observable data for the block")
- return None
- d = durSinceBlockStart(d) if d is not None else None
- #d = addKeyPressVars(d) if d is not None else None
- #d = addTextChangeVars(d) if d is not None else None
- d = addFeatureIKI(d) if d is not None else None
- d = addWordTokens(d) if d is not None else None
- # garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5
- d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None
- return d
- try:
- # the following groupby().apply() is causing occasional python crashes
- # df = df \
- # .groupby(feaConfig["byVars"]) \
- # .apply(mapBlock)
- # taking a stupid method here
- tmp=[]
- for b in df["BlockCode"].unique():
- tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock))
- df = pd.concat(tmp)
- except Exception as e:
- logger.error("Error in mapStep")
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8")
- return
- # saving
- if verbose:
- outputFileName = "{}_mapStep.csv".format(
- df["BookletNumber"].unique()[0]
- )
- # remove
- df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8")
- # simplified for human reading
- outputFileName = "{}_mapStep_simplified.csv".format(
- df["BookletNumber"].unique()[0]
- )
- rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"])
- df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len()
- colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount',
- 'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI',
- 'reconCursorPosition', 'textLength', "textLenReconText",
- 'textContext', 'intendedWord', 'currentToken',
- # 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial',
- 'intraWord',
- 'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace',
- 'reconstructedText']
- # to get rid of duplicated columns, remove the multiple index first
- df.loc[rowsToKeep, colsToKeep]\
- .to_csv(outputFileName, encoding="utf-8")
- return df
- # note we are not catching exceptions here, to save time.
- # errors are caught at the highest level
- def reduceStep(df, feaConfig, verbose=False):
- """
- REDUCE step: taking the df after the MAP step, and reduce to features, one block a row.
- :param df: the df passed from the mapStep
- :param feaConfig: the configuration file with parameters setting the byVars
- :param verbose: to be passed to reduce functions to save interim data frame if True
- :return: a Pandas data frame, with # of rows as blocks, and features as columns
- """
- # asserts
- if df is None:
- logger.error("ReduceStep: input df is None; quitting")
- return None
- if not any([(k in df.columns) for k in feaConfig["byVars"]]):
- # keyword missing
- return None
- studentID = df["BookletNumber"].unique()[0]
- # #### Reduce ####
- # here we begin to summarize the feature columns
- # ################
- # This is obviously a waste of time to repeat some feature steps in these
- # will deal with this later. For now, this is pleasing to the eyes
- try:
- dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply(
- lambda d: reduceFeatureInitialKeypress(d, verbose=verbose)
- ).reset_index()
- #print dfFeaInitialKeypress
- except Exception as e:
- logger.error("Error in reduceStep: reduceFeatureInitialKeypress")
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
- return
- try:
- dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply(
- lambda d: reduceWordBasedFeatures(d, verbose=verbose)
- ).reset_index()
- #print dfFeaWordBased
- except Exception as e:
- logger.error("Error in reduceStep: reduceWordBasedFeatures")
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
- return
- try:
- dfFeaPauses = df.groupby(feaConfig["byVars"]).apply(
- lambda d: reducePauseFeatures(d, verbose=verbose)
- ).reset_index()
- #print dfFeaPauses
- except Exception as e:
- logger.error("Error in reduceStep: reducePauseFeatures")
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
- return
- try:
- # garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions
- dfFeaDelete = df.groupby(feaConfig["byVars"]).apply(
- lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1)
- ).reset_index()
- #print dfFeaDelete
- except Exception as e:
- logger.error("Error in reduceStep: reduceDeleteFeatures")
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
- return
- try:
- dfFeaEditing = df.groupby(feaConfig["byVars"]).apply(
- lambda d: reduceEditingFeatures(d, verbose=verbose)
- ).reset_index()
- #print dfFeaEditing
- except Exception as e:
- logger.error("Error in reduceStep: reduceEditingFeatures")
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
- return
- try:
- nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply(
- lambda d: d\
- .loc[d.reconstructedText.notnull()]\
- .reconstructedText.iloc[-1].count("`")
- ).rename("flagDiscrepancyMarkers").reset_index()
- except Exception as e:
- logger.error("Error in reduceStep: reduceEditingFeatures")
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
- return
- try:
- adminEventList = feaConfig['adminEventList']
- nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply(
- lambda d: d\
- .loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \
- .shape[0]
- ).rename("flagAdminRaiseHandEvents").reset_index()
- except Exception as e:
- logger.error("Error in reduceStep: reduceEditingFeatures")
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
- return
- try:
- dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased,
- dfFeaPauses, dfFeaDelete, dfFeaEditing,
- nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1)
- except Exception as e:
- logger.error("Error in reduceStep: merging all features")
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
- return
- return dfSummary
- def processBooklet(filename,
- featureConfig,
- verbose=False,
- outputFeaturePath = ".",
- featureSetName = "finalFeatures", ):
- """
- Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving.
- :param filename: full path to the CSV file
- :param featureConfig: the dict with config info
- :param verbose: if true, save intermediate data frames to the current directory
- :param outputFeaturePath: output path
- :param featureSetName: name of the final feature set; will be the last part of the output csv file name
- :return: none
- """
- # output file path and name
- outputFeatureFileName = os.path.join(outputFeaturePath,
- os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
- # debug
- logger.info("Processing %s", filename)
- #############
- # Get Data
- try:
- df = getData(filename, featureConfig=featureConfig)
- except:
- df = None
- if df is None:
- logger.error("processBooklet: getData failed for %s", filename)
- return
- studentID = df["BookletNumber"].unique()[0]
- #############
- # Map
- #logger.info("Map %s", filename)
- try:
- df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
- except Exception as e:
- logger.error("Error in mapStep: %s", filename)
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- return
- if df is None:
- logger.error("processBooklet: mapStep failed for %s", filename)
- return
- #############
- # Reduce
- #logger.info("Reduce %s", filename)
- try:
- df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
- except Exception as e:
- logger.error("Error in reduceStep: %s", filename)
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- return
- if df is None:
- logger.error("processBooklet: reduceStep failed for %s", filename)
- return
- #############
- # Save Data
- # debug
- logger.info("Saving %s", filename)
- try:
- # first drop duplicated rows (occasionally there will be)
- # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
- df \
- .loc[:, ~df.columns.duplicated()]\
- .drop_duplicates() \
- .to_csv(outputFeatureFileName, encoding='utf-8')
- except Exception as e:
- logger.error("Error writing to_csv: %s", outputFeatureFileName)
- logger.exception(e)
- exc_buffer = StringIO.StringIO()
- traceback.print_exc(file=exc_buffer)
- logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
- return
- logger.info("Done. Output= %s", outputFeatureFileName)
- return
- def processBooklet_dask(filename,
- featureConfig,
- verbose=False,
- outputFeaturePath = ".",
- featureSetName = "finalFeatures"):
- """
- processing a writing CSV file, for dask parallel processing. We remove any logger reference here.
- :param filename: full path to the CSV file
- :param featureConfig: the dict with config info
- :param verbose: if true, save intermediate data frames to the current directory
- :param outputFeaturePath: output path
- :param featureSetName: name of the final feature set; will be the last part of the output csv file name
- :return: none
- """
- # output file path and name
- outputFeatureFileName = os.path.join(outputFeaturePath,
- os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
- #############
- # Get Data
- try:
- df = getData(filename, featureConfig=featureConfig)
- except:
- return
- if df is None:
- logger.error("processBooklet: getData failed for %s", filename)
- return
- #############
- # Map
- try:
- df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
- except:
- return
- if df is None:
- #logger.error("processBooklet: mapStep failed for %s", filename)
- return
- #############
- # Reduce
- try:
- df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
- except:
- return
- if df is None:
- return
- #############
- # Save Data
- try:
- # first drop duplicated rows (occasionally there will be)
- # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
- df \
- .loc[:, ~df.columns.duplicated()]\
- .drop_duplicates() \
- .to_csv(outputFeatureFileName, encoding='utf-8')
- except Exception as e:
- return
- return
- import sys
- if __name__ == '__main__':
- if len(sys.argv) == 1:
- print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
- exit()
- if sys.argv[1] not in ["Grade4", "Grade8", "test"]:
- print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
- exit()
- import glob
- from pdia import *
- from pdia.writing2017.make2017Features import *
- import dask.bag as db
- from distributed import Client
- import datetime
- import time
- # paths
- today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script
- # garyfeng: to resume from a run:
- # today = "20180709_21"
- ####
- grade = sys.argv[1]
- inputCSVPath = "{}/".format(grade)
- outputFeaturePath = "{}_descFeatures_{}/".format(grade, today)
- if not os.path.exists(outputFeaturePath):
- os.makedirs(outputFeaturePath)
- featureSetName = "descFeature{}".format(today)
- print "input folder: {}".format(inputCSVPath)
- print "output folder: {}".format(outputFeaturePath)
- print "featureSetName: {}".format(featureSetName)
- #########
- # getting the files to process
- print "======= Scanning for CSV files ============"
- print datetime.datetime.now()
- fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv"))
- if len(fileList)==0:
- print "\nNo CSV files found in the directory\n"
- exit()
- ##########
- # garyfeng: to resume by ignoring ones with output already.
- finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName)))
- finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles]
- fileList = list(set(fileList) - set(finishedFiles))
- ##########
- print "Total input CSV files: %i" % len(fileList)
- print datetime.datetime.now()
- import gc
- def processIt(filename):
- processBooklet_dask(filename,
- featureConfig=featureConfig2017,
- verbose=False,
- outputFeaturePath=outputFeaturePath,
- featureSetName=featureSetName)
- gc.collect()
- return
- print "======= Begin Processing ============"
- print datetime.datetime.now()
- print "====================================="
- # test with 1 file
- # processFile(fileList[0])
- # Using distributed clients
- client = Client()
- # run parallel with dask
- db.from_sequence(fileList).map(processIt).compute()
- print "======== End Processing ==========="
- print datetime.datetime.now()
- print "==================================="
Add Comment
Please, Sign In to add comment