Advertisement
Guest User

Untitled

a guest
Apr 5th, 2016
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.24 KB | None | 0 0
  1.  
  2. import sys
  3. import time
  4. import boto3
  5. import logging
  6. import argparse
  7. from faker import Factory
  8. from random import choice, shuffle
  9. from multiprocessing import Process, JoinableQueue
  10. from mysql.connector import connect
  11.  
  12. logging.basicConfig(
  13. stream=sys.stdout,
  14. level=logging.INFO,
  15. format='%(asctime)s %(levelname)s %(message)s'
  16. )
  17.  
  18. parser = argparse.ArgumentParser()
  19.  
  20. parser.add_argument("--host", default=None, help="The hostname of the MySQL node to connect to")
  21. parser.add_argument("--port", default=None, type=int, help="The port of the MySQL node to connect to")
  22. parser.add_argument("--user", default="root", help="The user of the MySQL node to connect to")
  23. parser.add_argument("--password", default="", help="The password of the MySQL node to connect to")
  24. parser.add_argument("--database", default="matterhorn", help="The database to use")
  25. parser.add_argument("--table", default="mh_user_action", help="The user action table to insert to")
  26. parser.add_argument("--aws-profile", default="test", help="AWS profile to use for cloudwatch metrics")
  27. parser.add_argument("--num-workers", type=int, default=10, help="The number of insert threads")
  28. parser.add_argument("--num-inserts", type=int, default=10000, help="The total number of insert to execute")
  29. parser.add_argument("--interval", type=int, default=30, help="The number of seconds each worker waits between inserts")
  30.  
  31. options = parser.parse_args()
  32.  
  33.  
  34. class Worker(Process):
  35.  
  36. def __init__(self, work_queue, metric):
  37. Process.__init__(self)
  38. self.work_queue = work_queue
  39. self.metric = metric
  40. self.con = connect(
  41. user=options.user,
  42. password=options.password,
  43. host=options.host,
  44. database=options.database
  45. )
  46.  
  47. self.fake = Factory.create()
  48. self.session_id = self.fake.pystr(max_chars=25)
  49.  
  50. session_insert = ("INSERT INTO mh_user_session "
  51. "(session_id,user_ip,user_agent,user_id) "
  52. "VALUES (%(session_id)s, %(user_ip)s, %(user_agent)s, 'anonymous')")
  53. c = self.con.cursor()
  54. c.execute(session_insert, {
  55. 'session_id': self.session_id,
  56. 'user_ip': self.fake.ipv4(),
  57. 'user_agent': self.fake.user_agent()
  58. })
  59. self.con.commit()
  60. c.close()
  61.  
  62.  
  63. def run(self):
  64. log = logging.getLogger()
  65. while True:
  66. action_id = self.work_queue.get()
  67. if action_id is None:
  68. log.info("Nothing left to do for worker %s", self.name)
  69. self.work_queue.task_done()
  70. self.con.close()
  71. break
  72. try:
  73. action_insert = "INSERT INTO " + options.table + " " \
  74. + "(id,inpoint,outpoint,mediapackage,session_id,created,length,type,playing) " \
  75. + "VALUES (%(id)s, %(inpoint)s, %(outpoint)s, %(mediapackage)s, %(session_id)s, %(created)s, 0, %(type)s, 1)"
  76.  
  77. c = self.con.cursor()
  78. start = time.time()
  79. c.execute(action_insert, {
  80. 'id': action_id,
  81. 'inpoint': self.fake.pyint(),
  82. 'outpoint': self.fake.pyint(),
  83. 'mediapackage': self.fake.uuid4(),
  84. 'session_id': self.session_id,
  85. 'created': self.fake.date_time(),
  86. 'type': choice(['PLAY','PAUSE','SEEK','HEARTBEAT'])
  87. })
  88. log.info("%s inserting action %d", self.name, action_id)
  89. self.con.commit()
  90. end = time.time()
  91. self.metric.put_data(
  92. MetricData=[
  93. dict(
  94. MetricName='UserActionInsert-%s' % options.table,
  95. Unit='Seconds',
  96. Value=round(end - start, 3)
  97. )
  98. ])
  99. c.close()
  100. time.sleep(choice(range(options.interval)) + 1)
  101. finally:
  102. self.work_queue.task_done()
  103.  
  104.  
  105. def main():
  106.  
  107. log = logging.getLogger()
  108.  
  109. con = connect(
  110. user=options.user,
  111. password=options.password,
  112. host=options.host,
  113. database=options.database
  114. )
  115. c = con.cursor()
  116. c.execute("SELECT MAX(id) + 1 FROM %s" % options.table)
  117. (next_id,) = c.fetchone()
  118. next_id = next_id or 1
  119. con.close()
  120.  
  121. boto3.setup_default_session(profile_name=options.aws_profile)
  122. cloudwatch = boto3.resource('cloudwatch')
  123. metric = cloudwatch.Metric('mh-user-action-bench', 'UserActionInsert-%s' % options.table)
  124.  
  125. work_queue = JoinableQueue()
  126.  
  127. log.info("starting %d workers", options.num_workers)
  128. workers = [Worker(work_queue, metric) for x in xrange(options.num_workers)]
  129. for w in workers:
  130. w.start()
  131.  
  132. action_ids = range(next_id, next_id + options.num_inserts)
  133. shuffle(action_ids)
  134. for id in action_ids:
  135. work_queue.put(id)
  136.  
  137. log.info("poisoning the work queue")
  138. for i in xrange(options.num_workers):
  139. work_queue.put(None)
  140. log.info("joining the work queue")
  141. work_queue.join()
  142.  
  143. log.info("joining the work threads")
  144. for w in workers:
  145. w.join()
  146.  
  147. log.info("all work complete")
  148.  
  149. if __name__ == '__main__':
  150. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement