Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // This is a local path in my laptop
- val is = new GZIPInputStream(new FileInputStream(basepath + fileName))
- val reader = new E4GTraceFileReader(is,fileName)
- // Here I invoke the legacy Java code
- // The result here is correct
- val result = reader.readTraces()
- val hdfs = FileSystem.get(new URI("hdfs://HDFS_IP_PORT/"), new Configuration())
- val hdfsFiles = spark.sparkContext.parallelize(hdfs.listStatus(new Path("SOME_PATH")).map(_.getPath))
- // Create Input Stream from each file in the folder
- val inputStreamsRDD = hdfsFiles.map(x =>{
- val hdfs = FileSystem.get(new URI("hdfs://HDFS_IP_PORT/"), new Configuration())
- (hdfs.open(x).getWrappedStream,x)
- })
Add Comment
Please, Sign In to add comment