Advertisement
Guest User

Untitled

a guest
Jul 18th, 2021
171
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.10 KB | None | 0 0
  1. import redis.clients.jedis.{JedisPool, JedisPoolConfig}
  2.  
  3. object redisOp{
  4.   @transient lazy val log: Logger = org.apache.log4j.LogManager.getLogger("myLogger")
  5.   def apply(set: RDD[Int]): Unit = {
  6.     val spark = SparkConstructor()
  7.     val sc = spark.sparkContext
  8.     // initialize Parents and Ranks key-values
  9.     val parents = set.map(i => ("p"+i, i.toString))
  10.     val ranks = set.map(i => ("r"+i, 1.toString))
  11.     sc.toRedisKV(parents) // using spark-redis packege, ignore it.
  12.     sc.toRedisKV(ranks)
  13.     log.warn("***Initialized Redis***")
  14.  
  15.   }
  16.  
  17.   val jedisConfig = new JedisPoolConfig()          // Check from here (object's values and variables)
  18.   jedisConfig.setMaxIdle(8000)                    //TODO: a better configuration?
  19.   jedisConfig.setMaxTotal(8000)
  20.   lazy val pool = new JedisPool(jedisConfig, "localhost")
  21.   def find(u: Long): Option[Long] = { // returns leader of the set containing u
  22.     val r = pool.getResource
  23.     val res = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) {
  24.       Some(u)
  25.     } else find(p.toLong))
  26.     r.close()
  27.     res
  28.   }
  29. // other methods are similar to find...
  30. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement