SHARE
TWEET

Untitled

a guest Oct 13th, 2019 123 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. from celery import Celery
  2. from celery.schedules import crontab
  3. import MySQLdb
  4. import random
  5. import string
  6. import time
  7.  
  8. app = Celery('db_update', broker="pyamqp://guest@localhost//")
  9. # disable UTC to use local time
  10. app.conf.enable_utc = False
  11.  
  12.  
  13. @app.task
  14. def generate_data():
  15.     start = time.time()
  16.     try:
  17.         print('check db to track updates.')
  18.         db = MySQLdb.connect(user='root', passwd="qweqwe", db="celery_test")
  19.         c = db.cursor()
  20.         # c.execute("""SELECT * FROM `student_old`""")
  21.         # print(c.fetchall())
  22.         insert_query = """INSERT INTO `student_old` (`name`, `email`, `address`, `class`) VALUES (%s, %s, %s, %s)"""
  23.  
  24.         addr_list = ['Dhaka', 'Rajshahi', 'Gazipur', 'Rangpur']
  25.         letters = string.ascii_lowercase
  26.         for i in range(200000):
  27.             print('id: ', i + 1)
  28.             rand_name = ''.join(random.choice(letters) for _ in range(10))
  29.             query_data = (rand_name, rand_name + '@venturenxt.com', random.choice(addr_list), random.randint(1, 10))
  30.             c.execute(insert_query, query_data)
  31.             db.commit()
  32.         db.close()
  33.  
  34.     except Exception as e:
  35.         print(str(e))
  36.     print('Execution time taken: ', time.time() - start)
  37.  
  38.  
  39. @app.task
  40. def update_data():
  41.     start = time.time()
  42.     try:
  43.         print('check db to track updates.')
  44.         db = MySQLdb.connect(user='root', passwd="qweqwe", db="celery_test")
  45.         c = db.cursor()
  46.         offset, limit = 0, 30000
  47.         truncate_query = """TRUNCATE TABLE `student_new`"""
  48.         query_str = """SELECT * from `student_old` LIMIT %s , %s"""
  49.         insert_query = """INSERT INTO `student_new` (`id`, `name`, `email`, `address`, `class`) VALUES (%s, %s, %s, %s, %s)"""
  50.         try:
  51.             # Truncate the table first
  52.             c.execute(truncate_query)
  53.             db.commit()
  54.             while True:
  55.                 # get data from old table
  56.  
  57.                 results = c.execute(query_str, (offset, limit))
  58.                 print('Total retrieved data: ', results)
  59.                 offset += limit
  60.  
  61.                 # rollback test
  62.                 # if offset == 30000:
  63.                 #     raise Exception('Roll back test')
  64.  
  65.                 if not results:
  66.                     print('Retrieved all data, exiting the function')
  67.                     break
  68.  
  69.                 # insert into new table
  70.                 data = c.fetchall()
  71.                 c.executemany(insert_query, data)
  72.                 db.commit()
  73.  
  74.         except Exception as err:
  75.             print(str(err))
  76.             db.rollback()
  77.  
  78.         db.close()
  79.  
  80.     except Exception as e:
  81.         print(str(e))
  82.     print('Execution time taken: ', time.time() - start)
  83.  
  84.  
  85. # add "update_data" task to the beat schedule
  86. app.conf.beat_schedule = {
  87.     "sync-db": {
  88.         "task": "db_update.update_data",
  89.         "schedule": crontab(minute='*'),
  90.     }
  91. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top