Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- This class allows usage of CoreNLP in Spark, creating an instance of the pipeline on each worker so that the
- code can run in parallel.
- @param annotators: the CoreNLP annotator pipeline
- @param params: the parameters desired for the annotators
- */
- class NLPPipeline(annotators: String, params: Tuple2[String, String]*) extends Serializable {
- import edu.stanford.nlp.pipeline._
- import java.util.Properties
- @transient private var nlpPipeline: StanfordCoreNLP = _
- /**
- Returns a CoreNLP pipeline local to the worker, using the constructor parameters
- */
- private def getOrCreatePipeline(): StanfordCoreNLP = {
- if (nlpPipeline == null) {
- val props = new Properties()
- props.setProperty("annotators", annotators)
- if (params.nonEmpty) params.map{p => props.setProperty(p._1, p._2)}
- nlpPipeline = new StanfordCoreNLP(props)
- }
- nlpPipeline
- }
- /**
- Basic step of the pipeline, transforming any text into a CoreNLP document.
- @param keyword: the text to be transformed
- */
- def transform(keyword: String) = {
- val pipeline = getOrCreatePipeline()
- pipeline.process(keyword)
- }
- }
- /**
- Example object implementing the lemmatization pipeline
- */
- object Lemma extends NLPPipeline("tokenize, ssplit, pos, lemma") {
- import edu.stanford.nlp.ling.CoreAnnotations._
- import scala.collection.JavaConversions._
- /**
- Helper class to give nice structure to the results in a DataFrame
- */
- case class Lemmas(tokens: Seq[String], lemmas: Seq[String])
- /**
- udf to run the pipeline on a dataframe column.
- */
- def lemmatize = udf((keyword: String) => {
- val doc = transform(keyword)
- val tokens = doc.get(classOf[SentencesAnnotation]).flatMap(_.get(classOf[TokensAnnotation]))
- Lemmas(tokens.map(_.get(classOf[TextAnnotation])), tokens.map(_.get(classOf[LemmaAnnotation])))
- })
- }
Add Comment
Please, Sign In to add comment