Guest User

Untitled

a guest
Jan 18th, 2019
150
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.95 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "github.com/go-redis/redis"
  5. "github.com/jmoiron/sqlx"
  6. _ "github.com/lib/pq"
  7. "github.com/robfig/cron"
  8. "github.com/sirupsen/logrus"
  9. "time"
  10. )
  11.  
  12. func init() {
  13. logrus.SetFormatter(&logrus.TextFormatter{})
  14. logrus.SetLevel(logrus.DebugLevel)
  15. }
  16.  
  17.  
  18. func main() {
  19. db := sqlx.MustConnect("postgres", "host=localhost port=5432 user=ben password=password dbname=globalcron sslmode=disable")
  20. _, err := db.Exec(` delete from task_log; `)
  21. if err != nil {
  22. panic(err)
  23. }
  24.  
  25. client := redis.NewClient(&redis.Options{
  26. Addr: "localhost:6379",
  27. Password: "", // no password set
  28. DB: 0, // use default DB
  29. })
  30.  
  31. c := cron.New()
  32. err = c.AddJob("*/5 * * * * *", newAtomicCronTask(
  33. "print",
  34. client,
  35. func() {
  36. logrus.Infof("Running job: the time is %s", time.Now().Format(time.RFC3339))
  37. },
  38. 2 * time.Second))
  39.  
  40. err = c.AddJob("*/5 * * * * *", newAtomicCronTask(
  41. "daily",
  42. client,
  43. onceDailyCron(db, "daily", func() {
  44. logrus.Println("Running once daily cron")
  45. }),
  46. 2 * time.Second))
  47.  
  48. if err != nil {
  49. panic(err)
  50. }
  51.  
  52. c.Run()
  53. }
  54.  
  55. // AtomicCronTask is a cron task that uses a schedule lock to
  56. // ensure that it only runs 1 time across the entire cluster.
  57. type AtomicCronTask struct {
  58. redis *redis.Client
  59. name string
  60. lockDur time.Duration
  61. fn func()
  62. }
  63.  
  64. func newAtomicCronTask(name string, r *redis.Client, fun func(), lockDur time.Duration) *AtomicCronTask {
  65. return &AtomicCronTask{
  66. redis: r,
  67. name: "atomic_job:" + name,
  68. fn: fun,
  69. lockDur: lockDur,
  70. }
  71. }
  72.  
  73. func (s *AtomicCronTask) Run() {
  74. err := s.redis.Watch(func(tx *redis.Tx) error {
  75. res := tx.Get(s.name)
  76. _, err := res.Result()
  77. if err == redis.Nil {
  78. pipe := tx.TxPipeline()
  79. pipe.Set(s.name, time.Now().Format(time.RFC3339Nano), s.lockDur)
  80. _, err := pipe.Exec()
  81. if err != nil {
  82. return err
  83. }
  84. logrus.Debugf("set key running task %s", s.name)
  85.  
  86. s.fn()
  87.  
  88. return nil
  89. } else if err != nil {
  90. logrus.Errorf("got err while getting key from redis: %v", err)
  91. return nil
  92. } else {
  93. logrus.Debugf("key exists, skipping task %s", s.name)
  94. }
  95.  
  96. return nil
  97. }, s.name)
  98. if err != nil {
  99. if err == redis.TxFailedErr {
  100. logrus.Debugln("race condition avoided via transaction")
  101. } else {
  102. logrus.Errorf("got error during redis transaction: %v", err)
  103. }
  104. }
  105. }
  106.  
  107. var _ cron.Job = &AtomicCronTask{}
  108.  
  109. func onceDailyCron(db *sqlx.DB, name string, fn func()) func() {
  110. return func() {
  111. var n int
  112. err := db.Get(&n, `
  113. select count(*)
  114. from task_log
  115. where task_name = $1
  116. and run_time > $2
  117. `, name, time.Now().Truncate(24*time.Hour))
  118. if err != nil {
  119. logrus.Errorf("Got err while getting task log entry: %v", err)
  120. return
  121. }
  122.  
  123. if n < 1 {
  124. _, execErr := db.Exec(`
  125. insert into task_log (task_name, run_time)
  126. values ($1, $2);
  127. `, name, time.Now())
  128. if execErr != nil {
  129. logrus.Errorln("Failed to insert daily cron entry: %v", execErr)
  130. return
  131. }
  132.  
  133. fn()
  134. return
  135. }
  136.  
  137. logrus.Debugln("Skipping daily task due to existing entry")
  138. }
  139. }
Add Comment
Please, Sign In to add comment