Advertisement
Guest User

ngrams

a guest
Dec 1st, 2014
276
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.75 KB | None | 0 0
  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkContext._
  3. import org.apache.spark.SparkConf
  4.  
  5. class Regex(str: String) extends Serializable {
  6.   val regex = str.r.unanchored
  7.  
  8.   def matches(str: String) = str match {
  9.     case regex(_*) => true
  10.     case _ => false
  11.   }
  12. }
  13.  
  14. class NgramRecord(line: String) {
  15.   val field = line.split('\t')
  16.   val ngram = field(0)
  17.   val year = field(1).toInt
  18.   val volumes = field(2).toInt
  19.   val matches = field(3).toInt
  20.  
  21.   def matches(r: Regex) = r matches ngram
  22.  
  23.   override def toString = s"$ngram,$year,$volumes,$matches"
  24. }
  25.  
  26. import org.apache.hadoop.mapred.SequenceFileInputFormat
  27. import org.apache.hadoop.io.{LongWritable, Text}
  28.  
  29. import com.esotericsoftware.kryo.Kryo
  30. import org.apache.spark.serializer.KryoRegistrator
  31.  
  32. class Registrator extends KryoRegistrator {
  33.   override def registerClasses(kryo: Kryo) {
  34.     kryo.register(classOf[LongWritable])
  35.     kryo.register(classOf[Text])
  36.   }
  37. }
  38.  
  39. object NgramsAggregate {
  40.   /* find ngrams that match a regex; args are regex output input [input ..] */
  41.   def main(args: Array[String]) {
  42.     val conf = new SparkConf()
  43.       .setAppName("ngrams")
  44.       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  45.       .set("spark.kryo.registrator", "Registrator")
  46.     val sc = new SparkContext(conf)
  47.     val regex = new Regex(args(0))
  48.     val output = args(1)
  49.     /* if things were simple */
  50.     /* val input = sc.union(args.drop(2).map(sc.textFile(_))) */
  51.     /* alas they are not */
  52.     val input = sc.union(args.drop(2)
  53.       .map(sc.hadoopFile[LongWritable, Text,
  54.         SequenceFileInputFormat[LongWritable, Text]](_)))
  55.       .map(r => new NgramRecord(r._2.toString))
  56.    input.filter(_ matches regex).saveAsTextFile(output)
  57.   }
  58. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement