Advertisement
Guest User

Untitled

a guest
Sep 19th, 2016
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.58 KB | None | 0 0
  1. package com.mavencode.clustering
  2.  
  3.  
  4. import java.util.Properties
  5.  
  6. import com.typesafe.config.ConfigFactory
  7. import org.apache.log4j.{Level, LogManager}
  8. import org.apache.spark.SparkConf
  9. import org.apache.spark.rdd.RDD
  10. import org.apache.spark.sql.{SaveMode, SparkSession}
  11. import org.apache.spark.storage.StorageLevel
  12.  
  13.  
  14. /**
  15. * Created by Philip K. Adetiloye on 2016-09-16.
  16. */
  17.  
  18.  
  19. object RecursivePostMerge {
  20.  
  21. case class Locator(id: Integer, security_1: String, content_Locator_sha1_20c: Option[String], security: String, raw_score: Double, relative_score: Double, size: Integer)
  22.  
  23. case class MergedLocator(super_security_1: String, super_security: String, super_cluster_size: Integer, id: Integer, security_1: String, content_Locator_sha1_20c: String, security: String, raw_score: Double, relative_score: Double, size: Integer)
  24.  
  25. val log = LogManager.getRootLogger
  26. log.setLevel(Level.INFO)
  27.  
  28. case class WithAncestor(child: Locator, ancestor: Option[Locator]) {
  29.  
  30. def hasGrandparent: Boolean = {
  31. child.security != null
  32. }
  33. }
  34.  
  35.  
  36. object RecursiveParentLookup {
  37.  
  38.  
  39. def findSimilarLocatores(rdd: RDD[Locator]): RDD[WithAncestor] = {
  40.  
  41. val persitRDD = rdd.coalesce(1000)
  42. // all Locatores keyed by id
  43. def byId = persitRDD.map(x => (x.security, x))
  44.  
  45. // recursive function that "climbs" one generation at each iteration
  46.  
  47. def climbOneGeneration(LocatoresParent: Option[RDD[WithAncestor]], LocatoresGrandParent: RDD[WithAncestor]): RDD[WithAncestor] = {
  48. //val cached = LocatoresGrandParent.cache()
  49. val cached = LocatoresGrandParent
  50.  
  51.  
  52. log.info("Locatores Grandparent partitions: " + LocatoresGrandParent.getNumPartitions)
  53.  
  54. // find which Locatores can climb further up family tree
  55. val haveGrandparents = cached.filter(_.hasGrandparent)
  56.  
  57.  
  58. if (LocatoresParent.isDefined) {
  59.  
  60. log.info("Locatores LocatoresParent partitions: " + LocatoresParent.get.getNumPartitions)
  61.  
  62. // Locatores climb further up family tree only once
  63. val moreGrandParents = LocatoresParent.get.coalesce(1000).map(_.child).subtract(haveGrandparents.map(_.child))
  64.  
  65. if (moreGrandParents.isEmpty()) {
  66. cached // we're done, return result
  67. } else {
  68. val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
  69.  
  70. val withGrandparents = haveGrandparents
  71. .coalesce(1000)
  72. .map(r => (r.ancestor.get.content_Locator_sha1_20c.get, r)) // grandparent id
  73. .join(byId)
  74. .values
  75. .map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.child, Some(grandparent)) })
  76.  
  77.  
  78.  
  79. done ++ cached ++ climbOneGeneration(Some(haveGrandparents), withGrandparents)
  80. }
  81.  
  82. } else {
  83. val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
  84.  
  85. val withGrandparents = haveGrandparents
  86. .coalesce(1000)
  87. .map(r => (r.ancestor.get.content_Locator_sha1_20c.get, r)) // grandparent id
  88. .join(byId)
  89. .values
  90. .map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.child, Some(grandparent)) })
  91.  
  92. done ++ cached ++ climbOneGeneration(Some(haveGrandparents), withGrandparents)
  93.  
  94. }
  95. }
  96.  
  97. // call recursive method - start by assuming each Locator is its own parent, if it has one:
  98. climbOneGeneration(None, persitRDD.map(p => WithAncestor(p, Some(p))))
  99. }
  100.  
  101. }
  102.  
  103.  
  104.  
  105.  
  106. def main(args: Array[String]) {
  107.  
  108.  
  109. val properties = ConfigFactory.load()
  110.  
  111.  
  112. //...
  113.  
  114. val CPU = properties.getInt("cpu")
  115.  
  116.  
  117.  
  118. val conf = new SparkConf()
  119. //.setMaster("local[2]")
  120. .setAppName("similarities-recursive-post-merge")
  121. //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // causes scratch space to grow out of disk space
  122. .set("spark.default.parallelism", (CPU * 3).toString)
  123.  
  124.  
  125.  
  126. val sparkSession = SparkSession
  127. .builder()
  128. .config(conf)
  129. .getOrCreate()
  130.  
  131.  
  132. val opts = Map(
  133. "url" -> s"jdbc:postgresql://$S_DB_HOST:$S_DB_PORT/$S_DATABASE",
  134. "driver" -> "org.postgresql.Driver",
  135. "dbtable" -> SIMILARITY_TABLE,
  136. "user" -> S_DB_USER,
  137. "password"-> S_DB_PASSWORD,
  138. "partitionColumn" -> PARTITION_COL,
  139. "lowerBound" -> LOWER_BOUND,
  140. "upperBound" -> UPPER_BOUND,
  141. "numPartitions" -> NUM_PARTITION
  142. )
  143.  
  144.  
  145. val sc = sparkSession.sparkContext
  146.  
  147. import sparkSession.implicits._
  148.  
  149. val similarityDs = sparkSession.read.format("jdbc").options(opts).load
  150. similarityDs.createOrReplaceTempView("trading_data")
  151.  
  152. val similarityDataset = sparkSession.sql("select * from trading_data where " +
  153. "relative_score >= 0.9 and " +
  154. "((security is not NULL or security <> '') " +
  155. "or (security_1 is not NULL or security_1 <> ''))").as[Locator]
  156.  
  157.  
  158.  
  159. /*
  160.  
  161. Super cluster Test data
  162.  
  163. val entry1 = Locator(1, "11", Some("10"), "1",1.1, 2.1, 20)
  164. val entry2 = Locator(2, "12", Some("1"), "8",1.1, 2.1, 20)
  165. val entry3 = Locator(3, "13", Some("2"), "1",1.1, 2.1, 20)
  166. val entry4 = Locator(4, "14", Some("2"), "4" ,1.1, 2.1, 20)
  167. val entry5 = Locator(5, "15", Some("4"), "5", 1.1, 2.1, 20)
  168. val entry6 = Locator(6, "16", Some("2"), "2",1.1, 2.1, 20)
  169. val entry7 = Locator(7, "17", Some("4"), "4",1.1, 2.1, 20)
  170.  
  171.  
  172. val input = sc.parallelize(Seq(entry1, entry2, entry3, entry4, entry5, entry6, entry7))
  173.  
  174. */
  175.  
  176.  
  177. val superClusterTmp = RecursiveParentLookup.findSimilarLocatores(similarityDataset.rdd).collect()
  178.  
  179.  
  180. val superClusterTmp1 = superClusterTmp.map(r => (r.ancestor.get.security_1, (r.ancestor.get.security, r.ancestor.get.size, r.child)))
  181. .groupBy(c => c._1).map{ case row => row._2.toSeq }
  182. .flatten
  183.  
  184.  
  185. val superClusters = superClusterTmp1.map( x => MergedLocator(x._1, x._2._1, x._2._2, x._2._3.id, x._2._3.security_1, x._2._3.content_Locator_sha1_20c.get, x._2._3.security, x._2._3.raw_score, x._2._3.relative_score, x._2._3.size )).toSeq.distinct
  186.  
  187.  
  188. val prop = new Properties()
  189. prop.setProperty("user", M_DB_USER)
  190. prop.setProperty("password", M_DB_PASSWORD)
  191. prop.setProperty("driver", "org.postgresql.Driver")
  192.  
  193.  
  194. superClusters.toDS().coalesce(100).write
  195. .mode(SaveMode.Append)
  196. .jdbc(s"jdbc:postgresql://$M_DB_HOST:$M_DB_PORT/$M_DATABASE", MERGED_TABLE, prop)
  197.  
  198.  
  199. sparkSession.stop()
  200. }
  201. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement