Advertisement
Guest User

Untitled

a guest
Jun 17th, 2019
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.89 KB | None | 0 0
  1. trait DataFrameSuiteBase extends TestSuite
  2. with SharedSparkContext with DataFrameSuiteBaseLike { self: Suite =>
  3. import spark.implicits._
  4.  
  5. override def beforeAll() {
  6. super.beforeAll()
  7. super.sqlBeforeAllTestCases()
  8. }
  9.  
  10. override def afterAll() {
  11. super.afterAll()
  12. if (!reuseContextIfPossible) {
  13. SparkSessionProvider._sparkSession = null
  14. }
  15. }
  16.  
  17. def csvStringToDataFrame(csv: String, schema: StructType = null): DataFrame = {
  18. val csvList = csv.stripMargin.lines.toList.filterNot(_.isEmpty)
  19. val dataset = context.parallelize(csvList).toDS
  20. val readCSV = spark.read.option("header", true)
  21. val dataFrameReader = if(schema !== null) {
  22. readCSV.schema(schema)
  23. } else {
  24. readCSV.option("inferSchema", true)
  25. }
  26. dataFrameReader.csv(dataset)
  27. }
  28.  
  29. }
  30.  
  31. case class Toolbox(session: SparkSession) extends Serializable {
  32.  
  33. import session.implicits._
  34.  
  35. def checkExtremes(dataframe: DataFrame, colName: String, bounds: Bounds): DataFrame = {
  36. if (hasColumn(dataframe, colName)) {
  37. val flagColumnName: String = s"${colName}_flag"
  38. val outliers =
  39. dataframe
  40. .withColumn(flagColumnName,
  41. when(!col(colName).between(bounds.lower, bounds.upper), true).otherwise(false))
  42. } else dataframe
  43. }
  44. }
  45.  
  46.  
  47. class ToolboxSpec extends FunSpec with DataFrameSuiteBase
  48. with DataFrameComparer
  49. with BeforeAndAfter {
  50.  
  51. var Toolbox: Toolbox = _
  52.  
  53. before {
  54. Toolbox = Toolbox(spark)
  55. }
  56.  
  57. describe("Toolbox") {
  58.  
  59. describe("checkExtremes") {
  60. it("should be checking for extreme values") {
  61. val inputCSV =
  62. """
  63. |"id","time","code","emi","v","t1","t2","t3","t4","t5","x_acc","y_acc","z_acc"
  64. |"46","2019-04-01 00:00:57","1",1444,"1",66,12,34,5,29,31,64,56,38,31,67,32,9,64,31,53
  65. |"46","2019-04-01 00:00:52","1",1515,"1",66,34,5,29,31,64,56,38,31,69,08,24,91,36,7
  66. |"46","2019-04-01 00:00:46","1",1452,"1",66,12,34,5,29,31,64,5,38,31,66,88,11,12,34,43
  67. |"47","2019-04-01 00:00:46","1",1452,"1",100,12,34,5,29,31,64,5,38,31,66,88,11,12,34,43
  68. |"77","2019-04-01 00:00:41","1",1319,"1",66,19,34,5,29,31,64,5,38,31,67,82,8,66,34,79
  69. """
  70. val inputColName = "t1"
  71. val flagColName = s"${inputColName}_flag"
  72. val expectedCSV =
  73. s"""
  74. |"id","time","code","emi","v","t1","t2","t3","t4","t5","x_acc","y_acc","z_acc","$flagColName"
  75. |"46","2019-04-01 00:00:57","1",1444,"1",66,12,34,5,29,31,64,56,38,31,67,32,9,64,31,53,false
  76. |"46","2019-04-01 00:00:52","1",1515,"1",66,34,5,29,31,64,56,38,31,69,08,24,91,36,7,false
  77. |"46","2019-04-01 00:00:46","1",1452,"1",66,12,34,5,29,31,64,5,38,31,66,88,11,12,34,43,false
  78. |"47","2019-04-01 00:00:46","1",1452,"1",100,12,34,5,29,31,64,5,38,31,66,88,11,12,34,43,true
  79. |"77","2019-04-01 00:00:41","1",1319,"1",66,19,34,5,29,31,64,5,38,31,67,82,8,66,34,79,false
  80. """
  81. val inputSchema = StructType(
  82. Array(
  83. StructField("id", StringType, false),
  84. StructField("time", TimestampType, false),
  85. StructField("code", StringType, true),
  86. StructField("emi", IntegerType, true),
  87. StructField("v", StringType, true),
  88. StructField("t1", DoubleType, true),
  89. StructField("t2", DoubleType, true),
  90. StructField("t3", DoubleType, true),
  91. StructField("t4", DoubleType, true),
  92. StructField("t5", DoubleType, true),
  93. StructField("x_acc", DoubleType, true),
  94. StructField("y_acc", DoubleType, true),
  95. StructField("z_acc", DoubleType, true)
  96. )
  97. )
  98. val expectedSchema = StructType(
  99. Array(
  100. StructField("id", StringType, false),
  101. StructField("time", TimestampType, false),
  102. StructField("code", StringType, true),
  103. StructField("emi", IntegerType, true),
  104. StructField("v", StringType, true),
  105. StructField("t1", DoubleType, true),
  106. StructField("t2", DoubleType, true),
  107. StructField("t3", DoubleType, true),
  108. StructField("t4", DoubleType, true),
  109. StructField("t5", DoubleType, true),
  110. StructField("x_acc", DoubleType, true),
  111. StructField("y_acc", DoubleType, true),
  112. StructField("z_acc", DoubleType, true),
  113. StructField(flagColName, BooleanType, true)
  114. )
  115. )
  116. val input = csvStringToDataFrame(inputCSV, inputSchema)
  117. val bounds = Bounds(10, 70)
  118. val output = Toolbox.checkExtremes(input, inputColName, bounds)
  119. output.show(5)
  120. output.printSchema()
  121. val expected = csvStringToDataFrame(expectedCSV, expectedSchema)
  122. // expected.printSchema()
  123. // assertSmallDatasetEquality(output, expected) // log 1
  124. // assertDataFrameEquals(expected, output) // log 2
  125. }
  126. }
  127. }
  128. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement