Guest User

Untitled

a guest
Jul 16th, 2018
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 18.60 KB | None | 0 0
  1. # Functions for extracting the descriptive features for the 2017 operational analysis
  2. # Changes from make2017FinalFeatures.py:
  3. # - min
  4.  
  5. #############
  6.  
  7. import StringIO
  8. import os
  9. import traceback
  10.  
  11. from pdia.durSinceBlockStart import *
  12. from pdia.writing2017.addFeatureTextChange import addTextChangeVars
  13. from pdia.writing2017.addKeyPressVars import addKeyPressVars
  14. from pdia.writing2017.burstFeatures import *
  15. from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures
  16. from pdia.writing2017.featuresConfig2017 import featureConfig2017
  17. from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress
  18. from pdia.writing2017.getData import getData
  19. from pdia.writing2017.reducePauseFeatures import reducePauseFeatures
  20. from pdia.writing2017.addWordTokens import *
  21. # Configs
  22. # parser config
  23. from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures
  24.  
  25.  
  26. # 2016 default configuration
  27.  
  28. # 2017 data config
  29.  
  30.  
  31. # Read data
  32. def getData2017(filename, featureConfig=featureConfig2017):
  33. """
  34. Simply a wrap of getData with the 2017 config
  35.  
  36. :param filename: the file name to process
  37. :param featureConfig: using the 2017 configuration
  38. :return: the parsed df
  39. """
  40. return getData(filename, featureConfig=featureConfig)
  41.  
  42.  
  43. def mapStep(df, feaConfig, verbose=False):
  44. """
  45. MAP step: creating keystroke level features, adding columns
  46.  
  47. :param df: the data frame for a booklet, contain potentially multiple blocks
  48. :param feaConfig: the configuration for data import/parsing
  49. :param verbose: if True, saves the interim data
  50.  
  51. """
  52.  
  53. # asserts
  54. if df is None:
  55. logger.error("MapStep: input df is None; quitting")
  56. return None
  57.  
  58. if not any([(k in df.columns) for k in feaConfig["byVars"]]):
  59. # keyword missing
  60. return None
  61.  
  62. studentID = df["BookletNumber"].unique()[0]
  63.  
  64. # ##### MAP ####
  65. # to handle the feature functions in the featureMap object
  66. # ##############
  67. def mapBlock(d):
  68. # return None if no keystroke log is available
  69. if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0:
  70. # print("mapBlock: No Observable data for the block")
  71. logger.debug("mapBlock: No Observable data for the block")
  72. return None
  73. d = durSinceBlockStart(d) if d is not None else None
  74. #d = addKeyPressVars(d) if d is not None else None
  75. #d = addTextChangeVars(d) if d is not None else None
  76. d = addFeatureIKI(d) if d is not None else None
  77. d = addWordTokens(d) if d is not None else None
  78. # garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5
  79. d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None
  80. return d
  81.  
  82. try:
  83. # the following groupby().apply() is causing occasional python crashes
  84. # df = df \
  85. # .groupby(feaConfig["byVars"]) \
  86. # .apply(mapBlock)
  87. # taking a stupid method here
  88. tmp=[]
  89. for b in df["BlockCode"].unique():
  90. tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock))
  91. df = pd.concat(tmp)
  92. except Exception as e:
  93. logger.error("Error in mapStep")
  94. logger.exception(e)
  95. exc_buffer = StringIO.StringIO()
  96. traceback.print_exc(file=exc_buffer)
  97. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  98.  
  99. df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8")
  100.  
  101. return
  102.  
  103. # saving
  104. if verbose:
  105. outputFileName = "{}_mapStep.csv".format(
  106. df["BookletNumber"].unique()[0]
  107. )
  108. # remove
  109. df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8")
  110.  
  111. # simplified for human reading
  112. outputFileName = "{}_mapStep_simplified.csv".format(
  113. df["BookletNumber"].unique()[0]
  114. )
  115. rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"])
  116. df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len()
  117. colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount',
  118. 'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI',
  119. 'reconCursorPosition', 'textLength', "textLenReconText",
  120. 'textContext', 'intendedWord', 'currentToken',
  121. # 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial',
  122. 'intraWord',
  123. 'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace',
  124. 'reconstructedText']
  125. # to get rid of duplicated columns, remove the multiple index first
  126. df.loc[rowsToKeep, colsToKeep]\
  127. .to_csv(outputFileName, encoding="utf-8")
  128.  
  129. return df
  130.  
  131. # note we are not catching exceptions here, to save time.
  132. # errors are caught at the highest level
  133.  
  134. def reduceStep(df, feaConfig, verbose=False):
  135. """
  136. REDUCE step: taking the df after the MAP step, and reduce to features, one block a row.
  137.  
  138. :param df: the df passed from the mapStep
  139. :param feaConfig: the configuration file with parameters setting the byVars
  140. :param verbose: to be passed to reduce functions to save interim data frame if True
  141. :return: a Pandas data frame, with # of rows as blocks, and features as columns
  142.  
  143. """
  144.  
  145. # asserts
  146. if df is None:
  147. logger.error("ReduceStep: input df is None; quitting")
  148. return None
  149.  
  150. if not any([(k in df.columns) for k in feaConfig["byVars"]]):
  151. # keyword missing
  152. return None
  153.  
  154. studentID = df["BookletNumber"].unique()[0]
  155.  
  156. # #### Reduce ####
  157. # here we begin to summarize the feature columns
  158. # ################
  159.  
  160. # This is obviously a waste of time to repeat some feature steps in these
  161. # will deal with this later. For now, this is pleasing to the eyes
  162. try:
  163. dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply(
  164. lambda d: reduceFeatureInitialKeypress(d, verbose=verbose)
  165. ).reset_index()
  166. #print dfFeaInitialKeypress
  167. except Exception as e:
  168. logger.error("Error in reduceStep: reduceFeatureInitialKeypress")
  169. logger.exception(e)
  170. exc_buffer = StringIO.StringIO()
  171. traceback.print_exc(file=exc_buffer)
  172. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  173.  
  174. df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
  175.  
  176. return
  177.  
  178. try:
  179. dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply(
  180. lambda d: reduceWordBasedFeatures(d, verbose=verbose)
  181. ).reset_index()
  182. #print dfFeaWordBased
  183. except Exception as e:
  184. logger.error("Error in reduceStep: reduceWordBasedFeatures")
  185. logger.exception(e)
  186. exc_buffer = StringIO.StringIO()
  187. traceback.print_exc(file=exc_buffer)
  188. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  189.  
  190. df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
  191.  
  192. return
  193.  
  194. try:
  195. dfFeaPauses = df.groupby(feaConfig["byVars"]).apply(
  196. lambda d: reducePauseFeatures(d, verbose=verbose)
  197. ).reset_index()
  198. #print dfFeaPauses
  199. except Exception as e:
  200. logger.error("Error in reduceStep: reducePauseFeatures")
  201. logger.exception(e)
  202. exc_buffer = StringIO.StringIO()
  203. traceback.print_exc(file=exc_buffer)
  204. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  205.  
  206. df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
  207.  
  208. return
  209.  
  210. try:
  211. # garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions
  212. dfFeaDelete = df.groupby(feaConfig["byVars"]).apply(
  213. lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1)
  214. ).reset_index()
  215. #print dfFeaDelete
  216. except Exception as e:
  217. logger.error("Error in reduceStep: reduceDeleteFeatures")
  218. logger.exception(e)
  219. exc_buffer = StringIO.StringIO()
  220. traceback.print_exc(file=exc_buffer)
  221. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  222.  
  223. df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
  224.  
  225. return
  226.  
  227. try:
  228. dfFeaEditing = df.groupby(feaConfig["byVars"]).apply(
  229. lambda d: reduceEditingFeatures(d, verbose=verbose)
  230. ).reset_index()
  231. #print dfFeaEditing
  232. except Exception as e:
  233. logger.error("Error in reduceStep: reduceEditingFeatures")
  234. logger.exception(e)
  235. exc_buffer = StringIO.StringIO()
  236. traceback.print_exc(file=exc_buffer)
  237. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  238.  
  239. df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
  240.  
  241. return
  242.  
  243. try:
  244. nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply(
  245. lambda d: d\
  246. .loc[d.reconstructedText.notnull()]\
  247. .reconstructedText.iloc[-1].count("`")
  248. ).rename("flagDiscrepancyMarkers").reset_index()
  249. except Exception as e:
  250. logger.error("Error in reduceStep: reduceEditingFeatures")
  251. logger.exception(e)
  252. exc_buffer = StringIO.StringIO()
  253. traceback.print_exc(file=exc_buffer)
  254. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  255.  
  256. df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
  257.  
  258. return
  259.  
  260. try:
  261. adminEventList = feaConfig['adminEventList']
  262. nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply(
  263. lambda d: d\
  264. .loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \
  265. .shape[0]
  266. ).rename("flagAdminRaiseHandEvents").reset_index()
  267. except Exception as e:
  268. logger.error("Error in reduceStep: reduceEditingFeatures")
  269. logger.exception(e)
  270. exc_buffer = StringIO.StringIO()
  271. traceback.print_exc(file=exc_buffer)
  272. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  273.  
  274. df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
  275.  
  276. return
  277.  
  278. try:
  279. dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased,
  280. dfFeaPauses, dfFeaDelete, dfFeaEditing,
  281. nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1)
  282. except Exception as e:
  283. logger.error("Error in reduceStep: merging all features")
  284. logger.exception(e)
  285. exc_buffer = StringIO.StringIO()
  286. traceback.print_exc(file=exc_buffer)
  287. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  288.  
  289. df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
  290.  
  291. return
  292.  
  293. return dfSummary
  294.  
  295. def processBooklet(filename,
  296. featureConfig,
  297. verbose=False,
  298. outputFeaturePath = ".",
  299. featureSetName = "finalFeatures", ):
  300. """
  301. Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving.
  302.  
  303. :param filename: full path to the CSV file
  304. :param featureConfig: the dict with config info
  305. :param verbose: if true, save intermediate data frames to the current directory
  306. :param outputFeaturePath: output path
  307. :param featureSetName: name of the final feature set; will be the last part of the output csv file name
  308. :return: none
  309. """
  310.  
  311. # output file path and name
  312. outputFeatureFileName = os.path.join(outputFeaturePath,
  313. os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
  314. # debug
  315. logger.info("Processing %s", filename)
  316. #############
  317. # Get Data
  318. try:
  319. df = getData(filename, featureConfig=featureConfig)
  320. except:
  321. df = None
  322. if df is None:
  323. logger.error("processBooklet: getData failed for %s", filename)
  324. return
  325.  
  326. studentID = df["BookletNumber"].unique()[0]
  327.  
  328. #############
  329. # Map
  330. #logger.info("Map %s", filename)
  331. try:
  332. df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
  333. except Exception as e:
  334. logger.error("Error in mapStep: %s", filename)
  335. logger.exception(e)
  336. exc_buffer = StringIO.StringIO()
  337. traceback.print_exc(file=exc_buffer)
  338. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  339. return
  340.  
  341. if df is None:
  342. logger.error("processBooklet: mapStep failed for %s", filename)
  343. return
  344.  
  345.  
  346. #############
  347. # Reduce
  348. #logger.info("Reduce %s", filename)
  349. try:
  350. df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
  351. except Exception as e:
  352. logger.error("Error in reduceStep: %s", filename)
  353. logger.exception(e)
  354. exc_buffer = StringIO.StringIO()
  355. traceback.print_exc(file=exc_buffer)
  356. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  357. return
  358.  
  359. if df is None:
  360. logger.error("processBooklet: reduceStep failed for %s", filename)
  361. return
  362.  
  363. #############
  364. # Save Data
  365. # debug
  366. logger.info("Saving %s", filename)
  367. try:
  368. # first drop duplicated rows (occasionally there will be)
  369. # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
  370. df \
  371. .loc[:, ~df.columns.duplicated()]\
  372. .drop_duplicates() \
  373. .to_csv(outputFeatureFileName, encoding='utf-8')
  374.  
  375. except Exception as e:
  376. logger.error("Error writing to_csv: %s", outputFeatureFileName)
  377. logger.exception(e)
  378. exc_buffer = StringIO.StringIO()
  379. traceback.print_exc(file=exc_buffer)
  380. logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
  381. return
  382.  
  383. logger.info("Done. Output= %s", outputFeatureFileName)
  384. return
  385.  
  386.  
  387. def processBooklet_dask(filename,
  388. featureConfig,
  389. verbose=False,
  390. outputFeaturePath = ".",
  391. featureSetName = "finalFeatures"):
  392. """
  393. processing a writing CSV file, for dask parallel processing. We remove any logger reference here.
  394.  
  395. :param filename: full path to the CSV file
  396. :param featureConfig: the dict with config info
  397. :param verbose: if true, save intermediate data frames to the current directory
  398. :param outputFeaturePath: output path
  399. :param featureSetName: name of the final feature set; will be the last part of the output csv file name
  400. :return: none
  401. """
  402.  
  403. # output file path and name
  404. outputFeatureFileName = os.path.join(outputFeaturePath,
  405. os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
  406. #############
  407. # Get Data
  408. try:
  409. df = getData(filename, featureConfig=featureConfig)
  410. except:
  411. return
  412.  
  413. if df is None:
  414. logger.error("processBooklet: getData failed for %s", filename)
  415. return
  416.  
  417. #############
  418. # Map
  419. try:
  420. df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
  421. except:
  422. return
  423.  
  424. if df is None:
  425. #logger.error("processBooklet: mapStep failed for %s", filename)
  426. return
  427.  
  428. #############
  429. # Reduce
  430. try:
  431. df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
  432. except:
  433. return
  434.  
  435. if df is None:
  436. return
  437.  
  438. #############
  439. # Save Data
  440. try:
  441. # first drop duplicated rows (occasionally there will be)
  442. # then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
  443. df \
  444. .loc[:, ~df.columns.duplicated()]\
  445. .drop_duplicates() \
  446. .to_csv(outputFeatureFileName, encoding='utf-8')
  447. except Exception as e:
  448. return
  449. return
  450.  
  451.  
  452.  
  453. import sys
  454.  
  455. if __name__ == '__main__':
  456.  
  457. if len(sys.argv) == 1:
  458. print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
  459. exit()
  460. if sys.argv[1] not in ["Grade4", "Grade8", "test"]:
  461. print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
  462. exit()
  463.  
  464.  
  465. import glob
  466. from pdia import *
  467. from pdia.writing2017.make2017Features import *
  468. import dask.bag as db
  469. from distributed import Client
  470. import datetime
  471. import time
  472.  
  473. # paths
  474. today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script
  475.  
  476. # garyfeng: to resume from a run:
  477. # today = "20180709_21"
  478. ####
  479.  
  480. grade = sys.argv[1]
  481.  
  482. inputCSVPath = "{}/".format(grade)
  483. outputFeaturePath = "{}_descFeatures_{}/".format(grade, today)
  484. if not os.path.exists(outputFeaturePath):
  485. os.makedirs(outputFeaturePath)
  486. featureSetName = "descFeature{}".format(today)
  487. print "input folder: {}".format(inputCSVPath)
  488. print "output folder: {}".format(outputFeaturePath)
  489. print "featureSetName: {}".format(featureSetName)
  490.  
  491. #########
  492. # getting the files to process
  493. print "======= Scanning for CSV files ============"
  494. print datetime.datetime.now()
  495. fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv"))
  496. if len(fileList)==0:
  497. print "\nNo CSV files found in the directory\n"
  498. exit()
  499.  
  500. ##########
  501. # garyfeng: to resume by ignoring ones with output already.
  502. finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName)))
  503. finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles]
  504. fileList = list(set(fileList) - set(finishedFiles))
  505. ##########
  506.  
  507. print "Total input CSV files: %i" % len(fileList)
  508. print datetime.datetime.now()
  509.  
  510.  
  511. import gc
  512. def processIt(filename):
  513. processBooklet_dask(filename,
  514. featureConfig=featureConfig2017,
  515. verbose=False,
  516. outputFeaturePath=outputFeaturePath,
  517. featureSetName=featureSetName)
  518. gc.collect()
  519. return
  520.  
  521.  
  522. print "======= Begin Processing ============"
  523. print datetime.datetime.now()
  524. print "====================================="
  525. # test with 1 file
  526. # processFile(fileList[0])
  527.  
  528. # Using distributed clients
  529. client = Client()
  530. # run parallel with dask
  531. db.from_sequence(fileList).map(processIt).compute()
  532.  
  533. print "======== End Processing ==========="
  534. print datetime.datetime.now()
  535. print "==================================="
Add Comment
Please, Sign In to add comment