Advertisement
Guest User

Untitled

a guest
Oct 13th, 2019
166
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.79 KB | None | 0 0
  1. from celery import Celery
  2. from celery.schedules import crontab
  3. import MySQLdb
  4. import random
  5. import string
  6. import time
  7.  
  8. app = Celery('db_update', broker="pyamqp://guest@localhost//")
  9. # disable UTC to use local time
  10. app.conf.enable_utc = False
  11.  
  12.  
  13. @app.task
  14. def generate_data():
  15. start = time.time()
  16. try:
  17. print('check db to track updates.')
  18. db = MySQLdb.connect(user='root', passwd="qweqwe", db="celery_test")
  19. c = db.cursor()
  20. # c.execute("""SELECT * FROM `student_old`""")
  21. # print(c.fetchall())
  22. insert_query = """INSERT INTO `student_old` (`name`, `email`, `address`, `class`) VALUES (%s, %s, %s, %s)"""
  23.  
  24. addr_list = ['Dhaka', 'Rajshahi', 'Gazipur', 'Rangpur']
  25. letters = string.ascii_lowercase
  26. for i in range(200000):
  27. print('id: ', i + 1)
  28. rand_name = ''.join(random.choice(letters) for _ in range(10))
  29. query_data = (rand_name, rand_name + '@venturenxt.com', random.choice(addr_list), random.randint(1, 10))
  30. c.execute(insert_query, query_data)
  31. db.commit()
  32. db.close()
  33.  
  34. except Exception as e:
  35. print(str(e))
  36. print('Execution time taken: ', time.time() - start)
  37.  
  38.  
  39. @app.task
  40. def update_data():
  41. start = time.time()
  42. try:
  43. print('check db to track updates.')
  44. db = MySQLdb.connect(user='root', passwd="qweqwe", db="celery_test")
  45. c = db.cursor()
  46. offset, limit = 0, 30000
  47. truncate_query = """TRUNCATE TABLE `student_new`"""
  48. query_str = """SELECT * from `student_old` LIMIT %s , %s"""
  49. insert_query = """INSERT INTO `student_new` (`id`, `name`, `email`, `address`, `class`) VALUES (%s, %s, %s, %s, %s)"""
  50. try:
  51. # Truncate the table first
  52. c.execute(truncate_query)
  53. db.commit()
  54. while True:
  55. # get data from old table
  56.  
  57. results = c.execute(query_str, (offset, limit))
  58. print('Total retrieved data: ', results)
  59. offset += limit
  60.  
  61. # rollback test
  62. # if offset == 30000:
  63. # raise Exception('Roll back test')
  64.  
  65. if not results:
  66. print('Retrieved all data, exiting the function')
  67. break
  68.  
  69. # insert into new table
  70. data = c.fetchall()
  71. c.executemany(insert_query, data)
  72. db.commit()
  73.  
  74. except Exception as err:
  75. print(str(err))
  76. db.rollback()
  77.  
  78. db.close()
  79.  
  80. except Exception as e:
  81. print(str(e))
  82. print('Execution time taken: ', time.time() - start)
  83.  
  84.  
  85. # add "update_data" task to the beat schedule
  86. app.conf.beat_schedule = {
  87. "sync-db": {
  88. "task": "db_update.update_data",
  89. "schedule": crontab(minute='*'),
  90. }
  91. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement