Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- function WLock:acquire(done_ch, info)
- -- [todo] delete lock key if there are no session keys
- -- "Done channel" must be closed if the lock is released or lost (probably due
- -- to Consul session invalidation or a network error).
- -- "Done channel" can also be closed by user. The function should return immediately
- -- in this case.
- assert(done_ch, "'done channel' must be passed as 1st parameter")
- goto wait
- ::retry_wait::
- fiber.sleep(RETRY_TIMEOUT)
- ::wait::
- local _
- local key
- local value
- local cas
- local acquire
- local stop_watching
- local put_ok
- local session
- local ready_to_lock
- local watchdog
- local chan
- local lock_key
- local lock_kv
- local kvs
- if done_ch:is_closed() then goto done end
- ----------------------------------------------------------------
- -- * create and renew session
- ----------------------------------------------------------------
- -- We need to create session so that other contenders could take into
- -- account our weight.
- session = util.ok_or_log_error(self.consul.session, self.consul, CONSUL_SESSION_TTL, "delete")
- if not session then goto retry_wait end
- log.info("created Consul session %q", session.id)
- -- put contender key with acquire
- key = util.urljoin(self.prefix, session.id)
- value = json.encode({weight = self.weight, info = info})
- cas = nil -- no need to perform cas
- acquire = session.id
- put_ok = util.ok_or_log_error(self.consul.put, self.consul, key, value, cas, acquire)
- if not put_ok then goto retry_wait end
- log.info("put Consul session %q contender key", session.id)
- -- renew session in the background
- -- if renew fails then we release the lock and return
- fiber.create(function()
- local timeout = 0.66 * CONSUL_SESSION_TTL
- while true do
- done_ch:get(timeout)
- if done_ch:is_closed() then break end
- if not util.ok_or_log_error(session.renew, session) then
- log.error("could not renew Consul session %q:", session.id)
- done_ch:close()
- end
- end
- if util.ok_or_log_error(session.delete, session) then
- log.info("released lock and deleted Consul session %q", session.id)
- end
- end)
- ----------------------------------------------------------------
- -- * watch kv prefix and check if we can should attempt to acquire the lock
- ----------------------------------------------------------------
- ready_to_lock = fiber.channel()
- _, stop_watching = self.consul:watch{
- key = self.prefix,
- prefix = true,
- on_change = function(kvs)
- -- check if we should preceed with the lock
- local contender_weights, holder, max_weight = parse_kvs(kvs, self.prefix)
- local should_lock = (contender_weights[session.id] or 0) >= max_weight and
- (not holder or (contender_weights[holder] or 0) < max_weight)
- if should_lock then
- log.info("ready to lock with Consul session %q", session.id)
- ready_to_lock:put(kvs)
- end
- end,
- consistent = true,
- }
- -- stop watching when we are done
- watchdog = fiber.create(function()
- done_ch:get()
- stop_watching()
- end)
- -- wait until we are ready to lock or until the acquire is cancelled
- chan, kvs = util.select({done_ch, ready_to_lock})
- stop_watching()
- pcall(watchdog.cancel, watchdog)
- if chan == done_ch then goto done end
- ----------------------------------------------------------------
- -- * acquire the lock
- ----------------------------------------------------------------
- -- put lock key
- lock_key = util.urljoin(self.prefix, CONSUL_LOCK_KEY)
- value = json.encode({holder = session.id})
- lock_kv = get_lock_kv(kvs, self.prefix)
- cas = 0
- if lock_kv then cas = lock_kv.modify_index end
- put_ok = util.ok_or_log_error(self.consul.put, self.consul, lock_key, value, cas)
- if not put_ok then goto retry_wait end
- log.info("acquired lock for Consul session %q", session.id)
- ----------------------------------------------------------------
- -- * The lock is acquired. Now we need to "hold" the lock.
- ----------------------------------------------------------------
- -- ? maybe merge this watch section into the previous one
- -- [todo]: delete lock key on cleanup
- -- [todo]: retry when monitoring lock
- -- watch lock key
- _, stop_watching = self.consul:watch{
- key = self.prefix,
- prefix = true,
- on_change = function(kvs)
- -- wait until the lock session is invalidated or the lock key is changed
- local _, holder, _ = parse_kvs(kvs, self.prefix)
- -- check if we are still the holder
- if holder ~= session.id then
- log.info("lost lock for Consul session %q: holder changed", session.id)
- done_ch:close()
- end
- end,
- on_error = function(err)
- log.error("Consul lock key %q watch error: %s", lock_key, err)
- done_ch:close()
- end,
- consistent = true,
- }
- -- stop watching when we are done
- fiber.create(function()
- done_ch:get()
- stop_watching()
- end)
- ::done::
- -- * All done. Return hold() and stop() functions.
- return function() done_ch:get() end, function() done_ch:close() end
- end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement