Advertisement
Guest User

Untitled

a guest
Nov 27th, 2018
193
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
SPARK 2.08 KB | None | 0 0
  1. import com.cloudera.datascience.common.XmlInputFormat
  2. import org.apache.spark.SparkContext
  3. import scala.xml._
  4. import org.apache.hadoop.io.{Text, LongWritable}
  5. import org.apache.hadoop.conf.Configuration
  6. import org.apache.spark.rdd.RDD
  7.  
  8. def loadMedline(sc: SparkContext, path: String) = {
  9.   @transient val conf = new Configuration()
  10.   conf.set(XmlInputFormat.START_TAG_KEY, "<MedlineCitation ")
  11.   conf.set(XmlInputFormat.END_TAG_KEY, "</MedlineCitation>")
  12.   val in = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
  13.   in.map(line => line._2.toString)
  14. }
  15.  
  16. val medline_raw = loadMedline(sc, "file:///home/licencep/tpgraphes/medline_data")
  17.  
  18. def majorTopics(elem: Elem): Seq[String] = {
  19.   val dn = elem \\ "DescriptorName"
  20.   val mt = dn.filter(n => (n \ "@MajorTopicYN").text == "Y")
  21.   mt.map(n => n.text)
  22. }
  23.  
  24. val mxml: RDD[Elem] = medline_raw.map(XML.loadString)
  25. val medline: RDD[Seq[String]] = mxml.map(majorTopics).cache()
  26.  
  27. val topics: RDD[String] = medline.flatMap(mesh => mesh)
  28.  
  29. val topicPairs = medline.flatMap(t => t.sorted.combinations(2))
  30. val cooccurs = topicPairs.map(p => (p, 1)).reduceByKey(_+_)
  31. cooccurs.cache()
  32.  
  33. import com.google.common.hash.Hashing
  34. def hashId(str: String) = {
  35. Hashing.md5().hashString(str).asLong()
  36. }
  37.  
  38. import org.apache.spark.graphx._
  39.  
  40. val vertices = topics.map(topic => (hashId(topic), topic))
  41. val edges = cooccurs.map(p => { val (topics, cnt) = p
  42. val ids = topics.map(hashId).sorted
  43. Edge(ids(0), ids(1), cnt)
  44. })
  45.  
  46. val topicGraph = Graph(vertices, edges)
  47. topicGraph.cache()
  48.  
  49. def topNamesAndDegrees(degrees: VertexRDD[Int], topicGraph: Graph[String, Int]): Array[(String, Int)] = {
  50. val namesAndDegrees = degrees.innerJoin(topicGraph.vertices) {
  51.   (topicId, degree, name) => (name, degree)
  52. }
  53. val ord = Ordering.by[(String, Int), Int](_._2)
  54. namesAndDegrees.map(_._2).top(10)(ord)
  55. }
  56.  
  57. def sortedConnectedComponents(connectedComponents: Graph[VertexId, _]): Seq[(VertexId, Long)] = {
  58.   val componentCounts = connectedComponents.vertices.map(_._2).countByValue
  59.   componentCounts.toSeq.sortBy(_._2).reverse
  60. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement