Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val dataset = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](inputFile)
- printf("Input size: %d\n", dataset.count())
- val output = dataset.mapPartitions( iter => {
- val ERROR = new Text("error")
- val CHECKSUM = new Text("checksum")
- val compare = new Comparator()
- var res = List.newBuilder[(Text, Text)]
- if (iter.isEmpty) {
- res.+=((new Text(inputFile + ":empty"), new Text("")))
- res.result().iterator
- }
- iter.sliding(2).foreach{ case Seq(prev, curr) => {
- if (compare.compare(prev._1, curr._1) < 0) {
- res.+=((ERROR,
- new Text("misorder in " + inputFile +
- " between " + prev.toString +
- " and " + curr.toString)))
- } else {
- res.+=((new Text("SUCCESS"),
- new Text("order in " + inputFile +
- " between " + prev.toString +
- " and " + curr.toString)))
- }
- }}
- res.result().iterator
- })
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement