Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
- class Regex(str: String) extends Serializable {
- val regex = str.r.unanchored
- def matches(str: String) = str match {
- case regex(_*) => true
- case _ => false
- }
- }
- class NgramRecord(line: String) {
- val field = line.split('\t')
- val ngram = field(0)
- val year = field(1).toInt
- val volumes = field(2).toInt
- val matches = field(3).toInt
- def matches(r: Regex) = r matches ngram
- override def toString = s"$ngram,$year,$volumes,$matches"
- }
- import org.apache.hadoop.mapred.SequenceFileInputFormat
- import org.apache.hadoop.io.{LongWritable, Text}
- import com.esotericsoftware.kryo.Kryo
- import org.apache.spark.serializer.KryoRegistrator
- class Registrator extends KryoRegistrator {
- override def registerClasses(kryo: Kryo) {
- kryo.register(classOf[LongWritable])
- kryo.register(classOf[Text])
- }
- }
- object NgramsAggregate {
- /* find ngrams that match a regex; args are regex output input [input ..] */
- def main(args: Array[String]) {
- val conf = new SparkConf()
- .setAppName("ngrams")
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .set("spark.kryo.registrator", "Registrator")
- val sc = new SparkContext(conf)
- val regex = new Regex(args(0))
- val output = args(1)
- /* if things were simple */
- /* val input = sc.union(args.drop(2).map(sc.textFile(_))) */
- /* alas they are not */
- val input = sc.union(args.drop(2)
- .map(sc.hadoopFile[LongWritable, Text,
- SequenceFileInputFormat[LongWritable, Text]](_)))
- .map(r => new NgramRecord(r._2.toString))
- input.filter(_ matches regex).saveAsTextFile(output)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement