Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val s3Paths = "s3://yourbucket/path/to/file1.txt,s3://yourbucket/path/to/directory"
- val pageLength = 100
- val key = "YOURKEY"
- val secret = "YOUR_SECRET"
- import com.amazonaws.services.s3._, model._
- import com.amazonaws.auth.BasicAWSCredentials
- import com.amazonaws.services.s3.model.ObjectListing
- import scala.collection.JavaConverters._
- import scala.io.Source
- import java.io.InputStream
- import org.apache.spark.rdd.RDD
- def s3 = new AmazonS3Client(new BasicAWSCredentials(key, secret))
- var inputLinesRDD_raw:RDD[String] = null
- s3Paths.split(",").foreach{ s3Path =>
- val regex = """(?i)^s3://([^/]+)/(.*)""".r
- val bucket = regex.findFirstMatchIn(s3Path).map(_ group 1).getOrElse(null)
- val prefix = regex.findFirstMatchIn(s3Path).map(_ group 2).getOrElse(null)
- println("Processing s3 resource: bucket '%s', prefix '%s'".format(bucket, prefix))
- @transient val request = new ListObjectsRequest()
- request.setBucketName(bucket)
- request.setPrefix(prefix)
- request.setMaxKeys(pageLength)
- @transient var listing = s3.listObjects(request)
- var proceed = true
- while (proceed){
- if (listing.getObjectSummaries.isEmpty){
- proceed = false
- }else{
- @transient val s3FileKeys = listing.getObjectSummaries.asScala.map(_.getKey).toList
- val inputLines = sc.parallelize(s3FileKeys).flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }
- if (inputLinesRDD_raw == null){
- inputLinesRDD_raw = inputLines
- }else{
- inputLinesRDD_raw = inputLinesRDD_raw.union(inputLines)
- }
- listing = s3.listNextBatchOfObjects(listing)
- }
- }
- }
- // TODO do something with inputLinesRDD_raw
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement