Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.mavencode.clustering
- import java.util.Properties
- import com.typesafe.config.ConfigFactory
- import org.apache.log4j.{Level, LogManager}
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.{SaveMode, SparkSession}
- import org.apache.spark.storage.StorageLevel
- /**
- * Created by Philip K. Adetiloye on 2016-09-16.
- */
- object RecursivePostMerge {
- case class Locator(id: Integer, security_1: String, content_Locator_sha1_20c: Option[String], security: String, raw_score: Double, relative_score: Double, size: Integer)
- case class MergedLocator(super_security_1: String, super_security: String, super_cluster_size: Integer, id: Integer, security_1: String, content_Locator_sha1_20c: String, security: String, raw_score: Double, relative_score: Double, size: Integer)
- val log = LogManager.getRootLogger
- log.setLevel(Level.INFO)
- case class WithAncestor(child: Locator, ancestor: Option[Locator]) {
- def hasGrandparent: Boolean = {
- child.security != null
- }
- }
- object RecursiveParentLookup {
- def findSimilarLocatores(rdd: RDD[Locator]): RDD[WithAncestor] = {
- val persitRDD = rdd.coalesce(1000)
- // all Locatores keyed by id
- def byId = persitRDD.map(x => (x.security, x))
- // recursive function that "climbs" one generation at each iteration
- def climbOneGeneration(LocatoresParent: Option[RDD[WithAncestor]], LocatoresGrandParent: RDD[WithAncestor]): RDD[WithAncestor] = {
- //val cached = LocatoresGrandParent.cache()
- val cached = LocatoresGrandParent
- log.info("Locatores Grandparent partitions: " + LocatoresGrandParent.getNumPartitions)
- // find which Locatores can climb further up family tree
- val haveGrandparents = cached.filter(_.hasGrandparent)
- if (LocatoresParent.isDefined) {
- log.info("Locatores LocatoresParent partitions: " + LocatoresParent.get.getNumPartitions)
- // Locatores climb further up family tree only once
- val moreGrandParents = LocatoresParent.get.coalesce(1000).map(_.child).subtract(haveGrandparents.map(_.child))
- if (moreGrandParents.isEmpty()) {
- cached // we're done, return result
- } else {
- val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
- val withGrandparents = haveGrandparents
- .coalesce(1000)
- .map(r => (r.ancestor.get.content_Locator_sha1_20c.get, r)) // grandparent id
- .join(byId)
- .values
- .map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.child, Some(grandparent)) })
- done ++ cached ++ climbOneGeneration(Some(haveGrandparents), withGrandparents)
- }
- } else {
- val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
- val withGrandparents = haveGrandparents
- .coalesce(1000)
- .map(r => (r.ancestor.get.content_Locator_sha1_20c.get, r)) // grandparent id
- .join(byId)
- .values
- .map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.child, Some(grandparent)) })
- done ++ cached ++ climbOneGeneration(Some(haveGrandparents), withGrandparents)
- }
- }
- // call recursive method - start by assuming each Locator is its own parent, if it has one:
- climbOneGeneration(None, persitRDD.map(p => WithAncestor(p, Some(p))))
- }
- }
- def main(args: Array[String]) {
- val properties = ConfigFactory.load()
- //...
- val CPU = properties.getInt("cpu")
- val conf = new SparkConf()
- //.setMaster("local[2]")
- .setAppName("similarities-recursive-post-merge")
- //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // causes scratch space to grow out of disk space
- .set("spark.default.parallelism", (CPU * 3).toString)
- val sparkSession = SparkSession
- .builder()
- .config(conf)
- .getOrCreate()
- val opts = Map(
- "url" -> s"jdbc:postgresql://$S_DB_HOST:$S_DB_PORT/$S_DATABASE",
- "driver" -> "org.postgresql.Driver",
- "dbtable" -> SIMILARITY_TABLE,
- "user" -> S_DB_USER,
- "password"-> S_DB_PASSWORD,
- "partitionColumn" -> PARTITION_COL,
- "lowerBound" -> LOWER_BOUND,
- "upperBound" -> UPPER_BOUND,
- "numPartitions" -> NUM_PARTITION
- )
- val sc = sparkSession.sparkContext
- import sparkSession.implicits._
- val similarityDs = sparkSession.read.format("jdbc").options(opts).load
- similarityDs.createOrReplaceTempView("trading_data")
- val similarityDataset = sparkSession.sql("select * from trading_data where " +
- "relative_score >= 0.9 and " +
- "((security is not NULL or security <> '') " +
- "or (security_1 is not NULL or security_1 <> ''))").as[Locator]
- /*
- Super cluster Test data
- val entry1 = Locator(1, "11", Some("10"), "1",1.1, 2.1, 20)
- val entry2 = Locator(2, "12", Some("1"), "8",1.1, 2.1, 20)
- val entry3 = Locator(3, "13", Some("2"), "1",1.1, 2.1, 20)
- val entry4 = Locator(4, "14", Some("2"), "4" ,1.1, 2.1, 20)
- val entry5 = Locator(5, "15", Some("4"), "5", 1.1, 2.1, 20)
- val entry6 = Locator(6, "16", Some("2"), "2",1.1, 2.1, 20)
- val entry7 = Locator(7, "17", Some("4"), "4",1.1, 2.1, 20)
- val input = sc.parallelize(Seq(entry1, entry2, entry3, entry4, entry5, entry6, entry7))
- */
- val superClusterTmp = RecursiveParentLookup.findSimilarLocatores(similarityDataset.rdd).collect()
- val superClusterTmp1 = superClusterTmp.map(r => (r.ancestor.get.security_1, (r.ancestor.get.security, r.ancestor.get.size, r.child)))
- .groupBy(c => c._1).map{ case row => row._2.toSeq }
- .flatten
- val superClusters = superClusterTmp1.map( x => MergedLocator(x._1, x._2._1, x._2._2, x._2._3.id, x._2._3.security_1, x._2._3.content_Locator_sha1_20c.get, x._2._3.security, x._2._3.raw_score, x._2._3.relative_score, x._2._3.size )).toSeq.distinct
- val prop = new Properties()
- prop.setProperty("user", M_DB_USER)
- prop.setProperty("password", M_DB_PASSWORD)
- prop.setProperty("driver", "org.postgresql.Driver")
- superClusters.toDS().coalesce(100).write
- .mode(SaveMode.Append)
- .jdbc(s"jdbc:postgresql://$M_DB_HOST:$M_DB_PORT/$M_DATABASE", MERGED_TABLE, prop)
- sparkSession.stop()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement