Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import com.cloudera.datascience.common.XmlInputFormat
- import org.apache.spark.SparkContext
- import scala.xml._
- import org.apache.hadoop.io.{Text, LongWritable}
- import org.apache.hadoop.conf.Configuration
- import org.apache.spark.rdd.RDD
- def loadMedline(sc: SparkContext, path: String) = {
- @transient val conf = new Configuration()
- conf.set(XmlInputFormat.START_TAG_KEY, "<MedlineCitation ")
- conf.set(XmlInputFormat.END_TAG_KEY, "</MedlineCitation>")
- val in = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
- in.map(line => line._2.toString)
- }
- val medline_raw = loadMedline(sc, "file:///home/licencep/tpgraphes/medline_data")
- def majorTopics(elem: Elem): Seq[String] = {
- val dn = elem \\ "DescriptorName"
- val mt = dn.filter(n => (n \ "@MajorTopicYN").text == "Y")
- mt.map(n => n.text)
- }
- val mxml: RDD[Elem] = medline_raw.map(XML.loadString)
- val medline: RDD[Seq[String]] = mxml.map(majorTopics).cache()
- val topics: RDD[String] = medline.flatMap(mesh => mesh)
- val topicPairs = medline.flatMap(t => t.sorted.combinations(2))
- val cooccurs = topicPairs.map(p => (p, 1)).reduceByKey(_+_)
- cooccurs.cache()
- import com.google.common.hash.Hashing
- def hashId(str: String) = {
- Hashing.md5().hashString(str).asLong()
- }
- import org.apache.spark.graphx._
- val vertices = topics.map(topic => (hashId(topic), topic))
- val edges = cooccurs.map(p => { val (topics, cnt) = p
- val ids = topics.map(hashId).sorted
- Edge(ids(0), ids(1), cnt)
- })
- val topicGraph = Graph(vertices, edges)
- topicGraph.cache()
- def topNamesAndDegrees(degrees: VertexRDD[Int], topicGraph: Graph[String, Int]): Array[(String, Int)] = {
- val namesAndDegrees = degrees.innerJoin(topicGraph.vertices) {
- (topicId, degree, name) => (name, degree)
- }
- val ord = Ordering.by[(String, Int), Int](_._2)
- namesAndDegrees.map(_._2).top(10)(ord)
- }
- def sortedConnectedComponents(connectedComponents: Graph[VertexId, _]): Seq[(VertexId, Long)] = {
- val componentCounts = connectedComponents.vertices.map(_._2).countByValue
- componentCounts.toSeq.sortBy(_._2).reverse
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement