Advertisement
Guest User

Untitled

a guest
Jul 20th, 2019
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Lua 5.54 KB | None | 0 0
  1.  
  2. function WLock:acquire(done_ch, info)
  3.     -- [todo] delete lock key if there are no session keys
  4.     -- "Done channel" must be closed if the lock is released or lost (probably due
  5.     -- to Consul session invalidation or a network error).
  6.     -- "Done channel" can also be closed by user. The function should return immediately
  7.     -- in this case.
  8.     assert(done_ch, "'done channel' must be passed as 1st parameter")
  9.     goto wait
  10.  
  11.     ::retry_wait::
  12.     fiber.sleep(RETRY_TIMEOUT)
  13.  
  14.     ::wait::
  15.  
  16.     local _
  17.     local key
  18.     local value
  19.     local cas
  20.     local acquire
  21.     local stop_watching
  22.     local put_ok
  23.     local session
  24.     local ready_to_lock
  25.     local watchdog
  26.     local chan
  27.     local lock_key
  28.     local lock_kv
  29.     local kvs
  30.  
  31.     if done_ch:is_closed() then goto done end
  32.  
  33.     ----------------------------------------------------------------
  34.     -- * create and renew session
  35.     ----------------------------------------------------------------
  36.  
  37.     -- We need to create session so that other contenders could take into
  38.     -- account our weight.
  39.     session = util.ok_or_log_error(self.consul.session, self.consul, CONSUL_SESSION_TTL, "delete")
  40.     if not session then goto retry_wait end
  41.     log.info("created Consul session %q", session.id)
  42.  
  43.     -- put contender key with acquire
  44.     key = util.urljoin(self.prefix, session.id)
  45.     value = json.encode({weight = self.weight, info = info})
  46.     cas = nil -- no need to perform cas
  47.     acquire = session.id
  48.     put_ok = util.ok_or_log_error(self.consul.put, self.consul, key, value, cas, acquire)
  49.     if not put_ok then goto retry_wait end
  50.     log.info("put Consul session %q contender key", session.id)
  51.  
  52.     -- renew session in the background
  53.     -- if renew fails then we release the lock and return
  54.     fiber.create(function()
  55.         local timeout = 0.66 * CONSUL_SESSION_TTL
  56.         while true do
  57.             done_ch:get(timeout)
  58.             if done_ch:is_closed() then break end
  59.             if not util.ok_or_log_error(session.renew, session) then
  60.                 log.error("could not renew Consul session %q:", session.id)
  61.                 done_ch:close()
  62.             end
  63.         end
  64.         if util.ok_or_log_error(session.delete, session) then
  65.             log.info("released lock and deleted Consul session %q", session.id)
  66.         end
  67.     end)
  68.  
  69.     ----------------------------------------------------------------
  70.     -- * watch kv prefix and check if we can should attempt to acquire the lock
  71.     ----------------------------------------------------------------
  72.  
  73.     ready_to_lock = fiber.channel()
  74.  
  75.     _, stop_watching = self.consul:watch{
  76.         key = self.prefix,
  77.         prefix = true,
  78.         on_change = function(kvs)
  79.             -- check if we should preceed with the lock
  80.             local contender_weights, holder, max_weight = parse_kvs(kvs, self.prefix)
  81.             local should_lock = (contender_weights[session.id] or 0) >= max_weight and
  82.                                     (not holder or (contender_weights[holder] or 0) < max_weight)
  83.  
  84.             if should_lock then
  85.                 log.info("ready to lock with Consul session %q", session.id)
  86.                 ready_to_lock:put(kvs)
  87.             end
  88.         end,
  89.         consistent = true,
  90.     }
  91.  
  92.     -- stop watching when we are done
  93.     watchdog = fiber.create(function()
  94.         done_ch:get()
  95.         stop_watching()
  96.     end)
  97.  
  98.     -- wait until we are ready to lock or until the acquire is cancelled
  99.     chan, kvs = util.select({done_ch, ready_to_lock})
  100.     stop_watching()
  101.     pcall(watchdog.cancel, watchdog)
  102.  
  103.     if chan == done_ch then goto done end
  104.  
  105.     ----------------------------------------------------------------
  106.     -- * acquire the lock
  107.     ----------------------------------------------------------------
  108.  
  109.     -- put lock key
  110.     lock_key = util.urljoin(self.prefix, CONSUL_LOCK_KEY)
  111.     value = json.encode({holder = session.id})
  112.     lock_kv = get_lock_kv(kvs, self.prefix)
  113.     cas = 0
  114.     if lock_kv then cas = lock_kv.modify_index end
  115.     put_ok = util.ok_or_log_error(self.consul.put, self.consul, lock_key, value, cas)
  116.     if not put_ok then goto retry_wait end
  117.     log.info("acquired lock for Consul session %q", session.id)
  118.  
  119.     ----------------------------------------------------------------
  120.     -- * The lock is acquired. Now we need to "hold" the lock.
  121.     ----------------------------------------------------------------
  122.  
  123.     -- ? maybe merge this watch section into the previous one
  124.     -- [todo]: delete lock key on cleanup
  125.     -- [todo]: retry when monitoring lock
  126.     -- watch lock key
  127.     _, stop_watching = self.consul:watch{
  128.         key = self.prefix,
  129.         prefix = true,
  130.         on_change = function(kvs)
  131.             -- wait until the lock session is invalidated or the lock key is changed
  132.             local _, holder, _ = parse_kvs(kvs, self.prefix)
  133.             -- check if we are still the holder
  134.             if holder ~= session.id then
  135.                 log.info("lost lock for Consul session %q: holder changed", session.id)
  136.                 done_ch:close()
  137.             end
  138.         end,
  139.         on_error = function(err)
  140.             log.error("Consul lock key %q watch error: %s", lock_key, err)
  141.             done_ch:close()
  142.         end,
  143.         consistent = true,
  144.     }
  145.  
  146.     -- stop watching when we are done
  147.     fiber.create(function()
  148.         done_ch:get()
  149.         stop_watching()
  150.     end)
  151.  
  152.     ::done::
  153.     -- * All done. Return hold() and stop() functions.
  154.     return function() done_ch:get() end, function() done_ch:close() end
  155. end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement