Guest User

Untitled

a guest
Oct 21st, 2017
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.97 KB | None | 0 0
  1. // Package ddblock provides a...
  2. // TODO:
  3. package ddblock
  4.  
  5. import (
  6. "errors"
  7. "fmt"
  8. //"runtime"
  9. "strconv"
  10. "sync"
  11. "time"
  12.  
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/session"
  16. "github.com/aws/aws-sdk-go/service/dynamodb"
  17.  
  18. "golang.org/x/net/context"
  19. )
  20.  
  21. var (
  22. // ErrConflict is returned when trying to get a lock, but
  23. // someone else already has it. The caller should wait and try again.
  24. ErrConflict = errors.New("ddbmutex: conflict, lock held by another")
  25. )
  26.  
  27. // default values set when creating a the Mutex.
  28. var (
  29. DefaultTableName = "locks"
  30. DefaultTTL = time.Minute
  31. //DefaultTTL = 10 * time.Second
  32.  
  33. nameString = "name"
  34. uuidString = "uuid"
  35. expiresString = "expires"
  36. )
  37.  
  38. // Mutex creates a lock using aws dynamodb. It uses
  39. // credential and region information from the standard sources
  40. // such as a config file or env variables.
  41. type Mutex struct {
  42. lk sync.Mutex
  43.  
  44. ctx context.Context
  45. cancel func()
  46.  
  47. TableName string
  48. TTL time.Duration
  49.  
  50. name string
  51. fullname string
  52. uuid string
  53. }
  54.  
  55. // New creates a new mutex using dynamodb as the distributed store.
  56. // If context is canceled the lock will be released.
  57. func New(ctx context.Context, name string) *Mutex {
  58. if ctx == nil {
  59. ctx = context.Background()
  60. }
  61. ctx, cancel := context.WithCancel(ctx)
  62. return &Mutex{
  63. ctx: ctx,
  64. cancel: cancel,
  65.  
  66. TableName: DefaultTableName,
  67. TTL: DefaultTTL,
  68.  
  69. name: name,
  70. fullname: "ddblock-" + name,
  71. uuid: fmt.Sprintf("%d", time.Now().UnixNano()),
  72. }
  73. }
  74.  
  75. // Name returns the name of the mutex which should uniquely identify
  76. // it on dynamodb.
  77. func (m *Mutex) Name() string {
  78. return m.name
  79. }
  80.  
  81. // Lock creates the lock item on dynamodb. The lock is renewed every TTL/2
  82. // to make sure the lock is kept. A nil error indicates success. An error
  83. // of ErrConflict means someone else already has the lock. Another error
  84. // indicates an network or dynamo error.
  85. func (m *Mutex) Lock() error {
  86. //var GoNum int = runtime.NumGoroutine()
  87. go func() {
  88. for m.ctx.Err() == nil {
  89. select {
  90. case <-time.After(m.cleanTTL() / 2):
  91. //fmt.Printf("Renewing... " + strconv.Itoa(GoNum) + "\n")
  92. //m.ctx.Done()
  93. case <-m.ctx.Done():
  94. //fmt.Printf("GOROUTINE LOCK RELEASED " + strconv.Itoa(GoNum) + "\n")
  95. m.Unlock()
  96. return
  97. }
  98.  
  99. //fmt.Printf("Num Goroutines At Update: " + strconv.Itoa(GoNum) + "\n")
  100. m.update()
  101. }
  102. }()
  103.  
  104. //fmt.Printf("Num Goroutines At Create: " + strconv.Itoa(GoNum) + "\n")
  105. return m.create()
  106. }
  107.  
  108. // Unlock deletes the lock from dynamodb and allows other go get it.
  109. func (m *Mutex) Unlock() error {
  110. m.cancel()
  111. return m.delete()
  112. }
  113.  
  114. func (m *Mutex) create() error {
  115. m.lk.Lock()
  116. defer m.lk.Unlock()
  117.  
  118. now := time.Now()
  119. params := &dynamodb.PutItemInput{
  120. TableName: &m.TableName,
  121. Item: map[string]*dynamodb.AttributeValue{
  122. "name": {
  123. S: &m.fullname,
  124. },
  125. "expires": {
  126. N: aws.String(strconv.FormatInt(now.Add(m.cleanTTL()).UnixNano(), 10)),
  127. },
  128. "uuid": {
  129. S: &m.uuid,
  130. },
  131. },
  132. ConditionExpression: aws.String("#name <> :name OR (#name = :name AND #exp < :exp)"),
  133. ExpressionAttributeNames: map[string]*string{
  134. "#name": &nameString,
  135. "#exp": &expiresString,
  136. },
  137. ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
  138. ":name": {
  139. S: &m.fullname,
  140. },
  141. ":exp": {
  142. N: aws.String(strconv.FormatInt(now.UnixNano(), 10)),
  143. },
  144. },
  145. }
  146.  
  147. _, err := getSvc().PutItem(params)
  148. return err
  149. }
  150.  
  151. func (m *Mutex) update() error {
  152. m.lk.Lock()
  153. defer m.lk.Unlock()
  154.  
  155. if m.uuid == "" {
  156. // has already been unlocked
  157. return nil
  158. }
  159.  
  160. now := time.Now()
  161. params := &dynamodb.PutItemInput{
  162. TableName: &m.TableName,
  163. Item: map[string]*dynamodb.AttributeValue{
  164. "name": {
  165. S: &m.fullname,
  166. },
  167. "expires": {
  168. N: aws.String(strconv.FormatInt(now.Add(m.cleanTTL()).UnixNano(), 10)),
  169. },
  170. "uuid": {
  171. S: &m.uuid,
  172. },
  173. },
  174. ConditionExpression: aws.String("#name = :name AND #uuid = :uuid"),
  175. ExpressionAttributeNames: map[string]*string{
  176. "#name": &nameString,
  177. "#uuid": &uuidString,
  178. },
  179. ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
  180. ":name": {
  181. S: &m.fullname,
  182. },
  183. ":uuid": {
  184. S: &m.uuid,
  185. },
  186. },
  187. }
  188.  
  189. _, err := getSvc().PutItem(params)
  190. if err != nil {
  191. //if e, ok := err.(awserr.Error); ok {
  192. // //return e.Code() == "ConditionalCheckFailedException"
  193. // fmt.Println(e.Code())
  194. //}
  195. panic(err)
  196. }
  197. return err
  198. }
  199.  
  200. func (m *Mutex) delete() error {
  201. m.lk.Lock()
  202. defer m.lk.Unlock()
  203.  
  204. if m.uuid == "" {
  205. // has already been unlocked successfully
  206. return nil
  207. }
  208.  
  209. params := &dynamodb.DeleteItemInput{
  210. TableName: &m.TableName,
  211. Key: map[string]*dynamodb.AttributeValue{
  212. "name": {
  213. S: &m.fullname,
  214. },
  215. },
  216. ConditionExpression: aws.String("#name = :name AND #uuid = :uuid"),
  217. ExpressionAttributeNames: map[string]*string{
  218. "#name": aws.String("name"),
  219. "#uuid": aws.String("uuid"),
  220. },
  221. ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
  222. ":name": {
  223. S: &m.fullname,
  224. },
  225. ":uuid": {
  226. S: &m.uuid,
  227. },
  228. },
  229. }
  230.  
  231. _, err := getSvc().DeleteItem(params)
  232. if IsAquireError(err) || err == nil {
  233. m.uuid = ""
  234. return nil
  235. }
  236.  
  237. return err
  238. }
  239.  
  240. // IsAquireError checks to see if the error returned by Lock
  241. // is the result of someone else holding the lock. If false
  242. // and err != nil, there was some sort of config or network issue.
  243. func IsAquireError(err error) bool {
  244. if e, ok := err.(awserr.Error); ok {
  245. return e.Code() == "ConditionalCheckFailedException"
  246. }
  247.  
  248. return false
  249. }
  250.  
  251. func (m *Mutex) cleanTTL() time.Duration {
  252. ttl := m.TTL
  253. if ttl == 0 {
  254. ttl = DefaultTTL
  255. }
  256.  
  257. if ttl == 0 {
  258. panic("ttl can not be zero")
  259. }
  260.  
  261. return ttl
  262. }
  263.  
  264. var (
  265. svc *dynamodb.DynamoDB
  266. svcLk sync.Mutex
  267. )
  268.  
  269. // getSvc enables the initialization on first read (ie. after config has been parsed),
  270. // kind of like a singleton class.
  271. func getSvc() *dynamodb.DynamoDB {
  272. svcLk.Lock()
  273. defer svcLk.Unlock()
  274.  
  275. if svc == nil {
  276. c := aws.NewConfig().
  277. WithMaxRetries(3).
  278. WithRegion("us-east-1")
  279.  
  280. svc = dynamodb.New(session.New(c))
  281. }
  282.  
  283. return svc
  284. }
Add Comment
Please, Sign In to add comment