Advertisement
Guest User

Untitled

a guest
Jun 26th, 2019
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 15.52 KB | None | 0 0
  1. val sparkSession = SparkSession.builder.master("local[*]").getOrCreate()
  2. val xmldf = sparkSession.read.format(SEAConstant.STR_IMPORT_SPARK_DATA_BRICK_XML)
  3. .option(SEAConstant.STR_ROW_TAG, "Employee").option("nullValue", "").load("demo.xml")
  4. val columnNames = xmldf.columns.toSeq
  5. val sdf = xmldf.select(columnNames.map(c => xmldf.col(c)): _*)
  6. sdf.write.format("com.databricks.spark.xml").option("rootTag", "Company")
  7. .option("rowTag", "Employee").save("Rel")
  8.  
  9. <?xml version="1.0"?>
  10. <Company>
  11. <Employee id="id47" masterRef="#id53" revision="" nomenclature="">
  12. <ApplicationRef version="J.0" application="Teamcenter"></ApplicationRef>
  13. <UserData id="id52">
  14. <UserValue valueRef="#id4" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
  15. <Employee id="id47" masterRef="#id53" revision="" nomenclature="">
  16. <ApplicationRef version="B.0" application="Teamcenter"></ApplicationRef>
  17. <UserData id="id63">
  18. <UserValue valueRef="#id5" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
  19. </Company>
  20.  
  21. 19/06/25 14:45:14 ERROR Utils: Aborting task
  22. java.lang.NullPointerException
  23. at com.databricks.spark.xml.parsers.StaxXmlGenerator$$anonfun$apply$4.apply(StaxXmlGenerator.scala:131)
  24. at com.databricks.spark.xml.parsers.StaxXmlGenerator$$anonfun$apply$4.apply(StaxXmlGenerator.scala:129)
  25. at scala.collection.immutable.List.foreach(List.scala:383)
  26. at com.databricks.spark.xml.parsers.StaxXmlGenerator$.apply(StaxXmlGenerator.scala:129)
  27. at com.databricks.spark.xml.util.XmlFile$$anonfun$1$$anon$1.next(XmlFile.scala:108)
  28. at com.databricks.spark.xml.util.XmlFile$$anonfun$1$$anon$1.next(XmlFile.scala:96)
  29. at scala.collection.Iterator$$anon$11.next(Iterator.scala:363)
  30. at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:125)
  31. at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
  32. at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414)
  33. at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
  34. at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
  35. at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
  36. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  37. at org.apache.spark.scheduler.Task.run(Task.scala:109)
  38. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  39. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  40. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  41. at java.lang.Thread.run(Thread.java:748)
  42. 19/06/25 14:45:14 ERROR SparkHadoopWriter: Task attempt_20190625144513_0012_m_000000_0 aborted.
  43. 19/06/25 14:45:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
  44. org.apache.spark.SparkException: Task failed while writing rows
  45. at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
  46. at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
  47. at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
  48. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  49. at org.apache.spark.scheduler.Task.run(Task.scala:109)
  50. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  51. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  52. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  53. at java.lang.Thread.run(Thread.java:748)
  54.  
  55. <dependency>
  56. <groupId>com.databricks</groupId>
  57. <artifactId>spark-xml_2.11</artifactId>
  58. <version>0.4.1</version>
  59. </dependency>
  60.  
  61. package com.examples
  62.  
  63. import java.io.File
  64.  
  65. import org.apache.commons.io.FileUtils
  66. import org.apache.log4j.Level
  67. import org.apache.spark.sql.{SQLContext, SparkSession}
  68.  
  69. /**
  70. * Created by Ram Ghadiyaram
  71. */
  72. object SparkXmlTest {
  73. org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
  74. def main(args: Array[String]) {
  75.  
  76. val spark = SparkSession.builder.
  77. master("local")
  78. .appName(this.getClass.getName)
  79. .getOrCreate()
  80. spark.sparkContext.setLogLevel("ERROR")
  81. val sc = spark.sparkContext
  82. val sqlContext = new SQLContext(sc)
  83. val str =
  84. """
  85. |<?xml version="1.0"?>
  86. |<Company>
  87. | <Employee id="1">
  88. | <Email>tp@xyz.com</Email>
  89. | <UserData id="id32" type="AttributesInContext">
  90. | <UserValue value="7in" title="Height"></UserValue>
  91. | <UserValue value="23lb" title="Weight"></UserValue>
  92. |</UserData>
  93. | </Employee>
  94. | <Measures id="1">
  95. | <Email>newdata@rty.com</Email>
  96. | <UserData id="id32" type="SitesInContext">
  97. |</UserData>
  98. | </Measures>
  99. | <Employee id="2">
  100. | <Email>tp@xyz.com</Email>
  101. | <UserData id="id33" type="AttributesInContext">
  102. | <UserValue value="7in" title="Height"></UserValue>
  103. | <UserValue value="34lb" title="Weight"></UserValue>
  104. |</UserData>
  105. | </Employee>
  106. | <Measures id="2">
  107. | <Email>nextrow@rty.com</Email>
  108. | <UserData id="id35" type="SitesInContext">
  109. |</UserData>
  110. | </Measures>
  111. | <Employee id="3">
  112. | <Email>tp@xyz.com</Email>
  113. | <UserData id="id34" type="AttributesInContext">
  114. | <UserValue value="7in" title="Height"></UserValue>
  115. | <UserValue value="" title="Weight"></UserValue>
  116. |</UserData>
  117. | </Employee>
  118. |</Company>
  119. """.stripMargin
  120. println("save to file ")
  121.  
  122. val f = new File("xmltest.xml")
  123. FileUtils.writeStringToFile(f, str)
  124.  
  125.  
  126. val xmldf = spark.read.format("com.databricks.spark.xml")
  127. .option("rootTag", "Company")
  128. .option("rowTag", "Employee")
  129. .option("nullValue", "")
  130. .load(f.getAbsolutePath)
  131. val columnNames = xmldf.columns.toSeq
  132. val sdf = xmldf.select(columnNames.map(c => xmldf.col(c)): _*)
  133. sdf.write.format("com.databricks.spark.xml")
  134. .option("rootTag", "Company")
  135. .option("rowTag", "Employee")
  136. .mode("overwrite")
  137. .save("/src/main/resources/Rel1")
  138.  
  139.  
  140. println("read back from saved file ....")
  141. val readbackdf = spark.read.format("com.databricks.spark.xml")
  142. .option("rootTag", "Company")
  143. .option("rowTag", "Employee")
  144. .option("nullValue", "")
  145. .load("/src/main/resources/Rel1")
  146. readbackdf.show(false)
  147. }
  148.  
  149. }
  150.  
  151. save to file
  152. read back from saved file ....
  153. +----------+------------------------------------------------------------------------------+---+
  154. |Email |UserData |_id|
  155. +----------+------------------------------------------------------------------------------+---+
  156. |tp@xyz.com|[WrappedArray([null,Height,7in], [null,Weight,23lb]),id32,AttributesInContext]|1 |
  157. |tp@xyz.com|[WrappedArray([null,Height,7in], [null,Weight,34lb]),id33,AttributesInContext]|2 |
  158. |tp@xyz.com|[WrappedArray([null,Height,7in], [null,Weight,null]),id34,AttributesInContext]|3 |
  159. +----------+------------------------------------------------------------------------------+---+
  160.  
  161. .option("attributePrefix", "_Att")
  162. .option("valueTag", "_VALUE")
  163.  
  164. package com.examples
  165.  
  166. import java.io.File
  167.  
  168. import org.apache.commons.io.FileUtils
  169. import org.apache.spark.sql.{SQLContext, SparkSession}
  170.  
  171. /**
  172. * Created by Ram Ghadiyaram
  173. */
  174. object SparkXmlTest {
  175. // org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
  176. def main(args: Array[String]) {
  177.  
  178. val spark = SparkSession.builder.
  179. master("local")
  180. .appName(this.getClass.getName)
  181. .getOrCreate()
  182. // spark.sparkContext.setLogLevel("ERROR")
  183. val sc = spark.sparkContext
  184. val sqlContext = new SQLContext(sc)
  185. // val str =
  186. // """
  187. // |<?xml version="1.0"?>
  188. // |<Company>
  189. // | <Employee id="1">
  190. // | <Email>tp@xyz.com</Email>
  191. // | <UserData id="id32" type="AttributesInContext">
  192. // | <UserValue value="7in" title="Height"></UserValue>
  193. // | <UserValue value="23lb" title="Weight"></UserValue>
  194. // |</UserData>
  195. // | </Employee>
  196. // | <Measures id="1">
  197. // | <Email>newdata@rty.com</Email>
  198. // | <UserData id="id32" type="SitesInContext">
  199. // |</UserData>
  200. // | </Measures>
  201. // | <Employee id="2">
  202. // | <Email>tp@xyz.com</Email>
  203. // | <UserData id="id33" type="AttributesInContext">
  204. // | <UserValue value="7in" title="Height"></UserValue>
  205. // | <UserValue value="34lb" title="Weight"></UserValue>
  206. // |</UserData>
  207. // | </Employee>
  208. // | <Measures id="2">
  209. // | <Email>nextrow@rty.com</Email>
  210. // | <UserData id="id35" type="SitesInContext">
  211. // |</UserData>
  212. // | </Measures>
  213. // | <Employee id="3">
  214. // | <Email>tp@xyz.com</Email>
  215. // | <UserData id="id34" type="AttributesInContext">
  216. // | <UserValue value="7in" title="Height"></UserValue>
  217. // | <UserValue value="" title="Weight"></UserValue>
  218. // |</UserData>
  219. // | </Employee>
  220. // |</Company>
  221. // """.stripMargin
  222. val str =
  223. """
  224. |<Company>
  225. | <Employee id="id47" masterRef="#id53" revision="" nomenclature="">
  226. |<ApplicationRef version="J.0" application="Teamcenter"></ApplicationRef>
  227. |<UserData id="id52">
  228. |<UserValue valueRef="#id4" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
  229. |<Employee id="id47" masterRef="#id53" revision="" nomenclature="">
  230. |<ApplicationRef version="B.0" application="Teamcenter"></ApplicationRef>
  231. |<UserData id="id63">
  232. |<UserValue valueRef="#id5" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
  233. |</Company>
  234. """.stripMargin
  235. println("save to file ")
  236.  
  237. val f = new File("xmltest.xml")
  238. FileUtils.writeStringToFile(f, str)
  239.  
  240.  
  241. val xmldf = spark.read.format("com.databricks.spark.xml")
  242. .option("rootTag", "Company")
  243. .option("rowTag", "Employee")
  244. .option("nullValue", "")
  245. .load(f.getAbsolutePath)
  246. val columnNames = xmldf.columns.toSeq
  247. val sdf = xmldf.select(columnNames.map(c => xmldf.col(c)): _*)
  248. sdf.write.format("com.databricks.spark.xml")
  249. .option("rootTag", "Company")
  250. .option("rowTag", "Employee")
  251. .option("attributePrefix", "_Att")
  252. .option("valueTag", "_VALUE")
  253. .mode("overwrite")
  254. .save("./src/main/resources/Rel1")
  255.  
  256.  
  257. println("read back from saved file ....")
  258. val readbackdf = spark.read.format("com.databricks.spark.xml")
  259. .option("rootTag", "Company")
  260. .option("rowTag", "Employee")
  261. .option("nullValue", "")
  262. .load("./src/main/resources/Rel1")
  263. readbackdf.show(false)
  264. }
  265. }
  266.  
  267. package com.examples
  268.  
  269. import java.io.File
  270.  
  271. import org.apache.commons.io.FileUtils
  272. import org.apache.spark.sql.{SQLContext, SparkSession}
  273.  
  274. /**
  275. * Created by Ram Ghadiyaram
  276. */
  277. object SparkXmlTest {
  278. // org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
  279. def main(args: Array[String]) {
  280.  
  281. val spark = SparkSession.builder.
  282. master("local")
  283. .appName(this.getClass.getName)
  284. .getOrCreate()
  285. // spark.sparkContext.setLogLevel("ERROR")
  286. val sc = spark.sparkContext
  287. val sqlContext = new SQLContext(sc)
  288. // val str =
  289. // """
  290. // |<?xml version="1.0"?>
  291. // |<Company>
  292. // | <Employee id="1">
  293. // | <Email>tp@xyz.com</Email>
  294. // | <UserData id="id32" type="AttributesInContext">
  295. // | <UserValue value="7in" title="Height"></UserValue>
  296. // | <UserValue value="23lb" title="Weight"></UserValue>
  297. // |</UserData>
  298. // | </Employee>
  299. // | <Measures id="1">
  300. // | <Email>newdata@rty.com</Email>
  301. // | <UserData id="id32" type="SitesInContext">
  302. // |</UserData>
  303. // | </Measures>
  304. // | <Employee id="2">
  305. // | <Email>tp@xyz.com</Email>
  306. // | <UserData id="id33" type="AttributesInContext">
  307. // | <UserValue value="7in" title="Height"></UserValue>
  308. // | <UserValue value="34lb" title="Weight"></UserValue>
  309. // |</UserData>
  310. // | </Employee>
  311. // | <Measures id="2">
  312. // | <Email>nextrow@rty.com</Email>
  313. // | <UserData id="id35" type="SitesInContext">
  314. // |</UserData>
  315. // | </Measures>
  316. // | <Employee id="3">
  317. // | <Email>tp@xyz.com</Email>
  318. // | <UserData id="id34" type="AttributesInContext">
  319. // | <UserValue value="7in" title="Height"></UserValue>
  320. // | <UserValue value="" title="Weight"></UserValue>
  321. // |</UserData>
  322. // | </Employee>
  323. // |</Company>
  324. // """.stripMargin
  325. val str =
  326. """
  327. |<Company>
  328. | <Employee id="id47" masterRef="#id53" revision="" nomenclature="">
  329. |<ApplicationRef version="J.0" application="Teamcenter"></ApplicationRef>
  330. |<UserData id="id52">
  331. |<UserValue valueRef="#id4" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
  332. |<Employee id="id47" masterRef="#id53" revision="" nomenclature="">
  333. |<ApplicationRef version="B.0" application="Teamcenter"></ApplicationRef>
  334. |<UserData id="id63">
  335. |<UserValue valueRef="#id5" value="" title="_CONFIG_CONTEXT"></UserValue></UserData></Employee>
  336. |</Company>
  337. """.stripMargin
  338. println("save to file ")
  339.  
  340. val f = new File("xmltest.xml")
  341. FileUtils.writeStringToFile(f, str)
  342.  
  343.  
  344. val xmldf = spark.read.format("com.databricks.spark.xml")
  345. .option("rootTag", "Company")
  346. .option("rowTag", "Employee")
  347. .option("nullValue", "")
  348. .load(f.getAbsolutePath)
  349. val columnNames = xmldf.columns.toSeq
  350. val sdf = xmldf.select(columnNames.map(c => xmldf.col(c)): _*)
  351. sdf.write.format("com.databricks.spark.xml")
  352. .option("rootTag", "Company")
  353. .option("rowTag", "Employee")
  354. .option("attributePrefix", "_Att")
  355. .option("valueTag", "_VALUE")
  356. .mode("overwrite")
  357. .save("./src/main/resources/Rel1")
  358.  
  359.  
  360. println("read back from saved file ....")
  361. val readbackdf = spark.read.format("com.databricks.spark.xml")
  362. .option("rootTag", "Company")
  363. .option("rowTag", "Employee")
  364. .option("nullValue", "")
  365. .load("./src/main/resources/Rel1")
  366. readbackdf.show(false)
  367. }
  368. }
  369.  
  370. save to file
  371. read back from saved file ....
  372. +-----------------+-------------------------------+----+----------+
  373. |ApplicationRef |UserData |_id |_masterRef|
  374. +-----------------+-------------------------------+----+----------+
  375. |[Teamcenter, J.0]|[[_CONFIG_CONTEXT, #id4], id52]|id47|#id53 |
  376. |[Teamcenter, B.0]|[[_CONFIG_CONTEXT, #id5], id63]|id47|#id53 |
  377. +-----------------+-------------------------------+----+----------+
  378.  
  379. xmldf.write.format("com.databricks.spark.xml").option("rootTag", "Company")
  380. .option("rowTag", "Employee").option("attributePrefix", "_Att")
  381. .option("valueTag","_VALUE").save("Rel")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement