Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "github.com/go-redis/redis"
- "github.com/jmoiron/sqlx"
- _ "github.com/lib/pq"
- "github.com/robfig/cron"
- "github.com/sirupsen/logrus"
- "time"
- )
- func init() {
- logrus.SetFormatter(&logrus.TextFormatter{})
- logrus.SetLevel(logrus.DebugLevel)
- }
- func main() {
- db := sqlx.MustConnect("postgres", "host=localhost port=5432 user=ben password=password dbname=globalcron sslmode=disable")
- _, err := db.Exec(` delete from task_log; `)
- if err != nil {
- panic(err)
- }
- client := redis.NewClient(&redis.Options{
- Addr: "localhost:6379",
- Password: "", // no password set
- DB: 0, // use default DB
- })
- c := cron.New()
- err = c.AddJob("*/5 * * * * *", newAtomicCronTask(
- "print",
- client,
- func() {
- logrus.Infof("Running job: the time is %s", time.Now().Format(time.RFC3339))
- },
- 2 * time.Second))
- err = c.AddJob("*/5 * * * * *", newAtomicCronTask(
- "daily",
- client,
- onceDailyCron(db, "daily", func() {
- logrus.Println("Running once daily cron")
- }),
- 2 * time.Second))
- if err != nil {
- panic(err)
- }
- c.Run()
- }
- // AtomicCronTask is a cron task that uses a schedule lock to
- // ensure that it only runs 1 time across the entire cluster.
- type AtomicCronTask struct {
- redis *redis.Client
- name string
- lockDur time.Duration
- fn func()
- }
- func newAtomicCronTask(name string, r *redis.Client, fun func(), lockDur time.Duration) *AtomicCronTask {
- return &AtomicCronTask{
- redis: r,
- name: "atomic_job:" + name,
- fn: fun,
- lockDur: lockDur,
- }
- }
- func (s *AtomicCronTask) Run() {
- err := s.redis.Watch(func(tx *redis.Tx) error {
- res := tx.Get(s.name)
- _, err := res.Result()
- if err == redis.Nil {
- pipe := tx.TxPipeline()
- pipe.Set(s.name, time.Now().Format(time.RFC3339Nano), s.lockDur)
- _, err := pipe.Exec()
- if err != nil {
- return err
- }
- logrus.Debugf("set key running task %s", s.name)
- s.fn()
- return nil
- } else if err != nil {
- logrus.Errorf("got err while getting key from redis: %v", err)
- return nil
- } else {
- logrus.Debugf("key exists, skipping task %s", s.name)
- }
- return nil
- }, s.name)
- if err != nil {
- if err == redis.TxFailedErr {
- logrus.Debugln("race condition avoided via transaction")
- } else {
- logrus.Errorf("got error during redis transaction: %v", err)
- }
- }
- }
- var _ cron.Job = &AtomicCronTask{}
- func onceDailyCron(db *sqlx.DB, name string, fn func()) func() {
- return func() {
- var n int
- err := db.Get(&n, `
- select count(*)
- from task_log
- where task_name = $1
- and run_time > $2
- `, name, time.Now().Truncate(24*time.Hour))
- if err != nil {
- logrus.Errorf("Got err while getting task log entry: %v", err)
- return
- }
- if n < 1 {
- _, execErr := db.Exec(`
- insert into task_log (task_name, run_time)
- values ($1, $2);
- `, name, time.Now())
- if execErr != nil {
- logrus.Errorln("Failed to insert daily cron entry: %v", execErr)
- return
- }
- fn()
- return
- }
- logrus.Debugln("Skipping daily task due to existing entry")
- }
- }
Add Comment
Please, Sign In to add comment