SHARE
TWEET

kek.py

a guest Dec 3rd, 2019 76 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. from pyspark import SparkContext, SparkConf
  2. import re
  3.  
  4. config = SparkConf().setAppName("bigram").setMaster("yarn")
  5. spark_context = SparkContext(conf=config)
  6.  
  7.  
  8. stop_words_input = spark_context.textFile("/data/wiki/stop_words_en-xpo6.txt")
  9. stop_words = stop_words_input.map(lambda x: x.strip().lower())
  10. broadcast_stop_words = spark_context.broadcast(set(stop_words.collect()))
  11.  
  12.  
  13. def get_words(line):
  14.     article_id, text = unicode(line.rstrip()).split('\t', 1)
  15.     text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
  16.     return filter(None, re.split("\W*\s+\W*", text, flags=re.UNICODE))
  17.  
  18.  
  19. def make_bigrams(line):
  20.     words = get_words(line)
  21.     bigrams = [word1 + '_' + word2 for word1, word2 in zip(words[:-1], words[1:])]
  22.     return bigrams
  23.  
  24.  
  25. def find_bigram(bigram):
  26.     word1, word2 = bigram.split('_', 1)
  27.     return word2 not in broadcast_stop_words.value and word1 not in broadcast_stop_words.value
  28.  
  29.  
  30. articles = spark_context.textFile("/data/wiki/en_articles_part", 16).map(lambda x: x.strip().lower())
  31. rdd2 = articles.flatMap(get_words).filter(lambda x: x not in broadcast_stop_words.value)
  32. rdd3 = articles.flatMap(make_bigrams).filter(find_bigram)
  33. words = rdd2.map(lambda x: (x, 1))
  34. bigrams = rdd3.map(lambda x: (x, 1))
  35.  
  36. words_count = spark_context.broadcast(words.count())
  37. # bigrams_count = bigrams.count()
  38.  
  39. words = spark_context.broadcast(dict(words.reduceByKey(lambda a, b: a + b).collect()))
  40. from math import log
  41.  
  42.  
  43. def calc_npmi(el):
  44.     bigram, count = el
  45.     pab = float(count) / words_count.value
  46.     word1, word2 = bigram.split('_', 1)
  47.     pa, pb = float(words.value[word1]) / words_count.value, float(words.value[word2]) / words_count.value
  48.     npmi = -log(pab / pa / pb) / log(pab)
  49.     return bigram, npmi
  50.  
  51.  
  52. bigrams = bigrams.reduceByKey(lambda a, b: a + b).filter(lambda x: x[1] >= 500).map(calc_npmi).sortBy(lambda x: -x[1])
  53.  
  54. for bigram, _ in bigrams.take(39):
  55.     print(bigram.encode('utf8'))
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top