Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import redis.clients.jedis.{JedisPool, JedisPoolConfig}
- object redisOp{
- @transient lazy val log: Logger = org.apache.log4j.LogManager.getLogger("myLogger")
- def apply(set: RDD[Int]): Unit = {
- val spark = SparkConstructor()
- val sc = spark.sparkContext
- // initialize Parents and Ranks key-values
- val parents = set.map(i => ("p"+i, i.toString))
- val ranks = set.map(i => ("r"+i, 1.toString))
- sc.toRedisKV(parents) // using spark-redis packege, ignore it.
- sc.toRedisKV(ranks)
- log.warn("***Initialized Redis***")
- }
- val jedisConfig = new JedisPoolConfig() // Check from here (object's values and variables)
- jedisConfig.setMaxIdle(8000) //TODO: a better configuration?
- jedisConfig.setMaxTotal(8000)
- lazy val pool = new JedisPool(jedisConfig, "localhost")
- def find(u: Long): Option[Long] = { // returns leader of the set containing u
- val r = pool.getResource
- val res = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) {
- Some(u)
- } else find(p.toLong))
- r.close()
- res
- }
- // other methods are similar to find...
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement