Advertisement
Guest User

Untitled

a guest
May 21st, 2016
121
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.08 KB | None | 0 0
  1. import random
  2. import time
  3. import traceback
  4. from string import ascii_uppercase
  5.  
  6. from cassandra import ConsistencyLevel
  7. from cassandra.cluster import Cluster
  8. from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
  9. from google.apputils import stopwatch
  10.  
  11. count = 0
  12.  
  13. # Test for async callback with retries
  14. def get_cassandra_session():
  15. #print "I GOT CALLED"
  16. print ("Initializing cassandra session for process")
  17. #username=config_map['cassandra_username'], password=config_map['cassandra_password'])
  18. cluster = Cluster(['127.0.0.1'], load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()))
  19. #cluster = Cluster(['10.41.55.56', '10.41.55.57', '10.41.55.58'], load_balancing_policy=DCAwareRoundRobinPolicy())
  20. # metadata = cluster.metadata
  21. session = cluster.connect()
  22. session.set_keyspace('test')
  23. return session
  24.  
  25. cassandra_session = get_cassandra_session()
  26. def errback_fn(error ,prepared_statement, statement_params):
  27. try:
  28. exception_count = 0
  29. global cassandra_session
  30. if cassandra_session is None or cassandra_session.is_shutdown or error:
  31. cassandra_session = get_cassandra_session()
  32. for key in cassandra_session.get_pool_state():
  33. for in_flight in cassandra_session.get_pool_state()[key]['in_flights']:
  34. print "in_flight count is ", in_flight
  35. if in_flight > 1000:
  36. print "Future thread Sleeping for 500ms, in_flight is ", in_flight
  37. time.sleep(.5)
  38. print "Open Count is ", cassandra_session.get_pool_state()[key]['open_count']
  39.  
  40. while True:
  41. try:
  42. print "inside errback, after encountering error: ",error
  43. print "Statement is :",prepared_statement
  44. print "Params are :", statement_params
  45. cassandra_session.execute(prepared_statement, statement_params)
  46. exception_count = 0
  47. break
  48. except:
  49. print traceback.format_exc()
  50. time.sleep(5)
  51. # Get a new cassandra session
  52. if cassandra_session is None or cassandra_session.is_shutdown:
  53. cassandra_session = get_cassandra_session()
  54. exception_count += 1
  55. if exception_count > 5:
  56. print "Severe Error Occured for statement: ", prepared_statement, " Params: ", statement_params, " Breaking out of loop"
  57. break
  58. continue
  59. except:
  60. print traceback.format_exc()
  61.  
  62.  
  63. cassandra_session.execute("truncate table test.test_async;")
  64. print "Truncated table test_async"
  65. future = None
  66. num_count_test = 1000000
  67. _start_async = time.time()
  68. query = "INSERT INTO test_async(key,value,str) VALUES (?, ?, ?)"
  69. prepared_query = cassandra_session.prepare(query)
  70. prepared_query.consistency_level = ConsistencyLevel.LOCAL_ONE
  71.  
  72. print "Query Prepared for test_async"
  73. sw = stopwatch.StopWatch()
  74. sw.start('complete-async')
  75. for j in range(1, num_count_test):
  76. try:
  77. sw.start('async-randomgen')
  78. param1 = random.randint(1, 1000000)
  79. param2 = random.randint(1, 1000000)
  80. param3 = ''.join(random.choice(ascii_uppercase) for i in range(20))
  81. sw.stop('async-randomgen')
  82. for key in cassandra_session.get_pool_state():
  83. for in_flight in cassandra_session.get_pool_state()[key]['in_flights']:
  84. if in_flight > 1000:
  85. print "Future thread Sleeping for 500ms, in_flight is ", in_flight
  86. time.sleep(.5)
  87. # if cassandra_session.get_pool_state()[key]['open_count'] > 20:
  88. # print "Future thread Sleeping for 500ms"
  89. # time.sleep(.5)
  90.  
  91. future = cassandra_session.execute_async(prepared_query,(param1,param2,param3))
  92. future.add_errback(errback_fn, prepared_query, (param1, param2, param3))
  93. except:
  94. print traceback.format_exc()
  95. if cassandra_session is None or cassandra_session.is_shutdown:
  96. print "Reinitializing Cassandra Session"
  97. cassandra_session = get_cassandra_session()
  98. continue
  99.  
  100. sw.stop('complete-async')
  101. # Block the future
  102. future.result()
  103. print "Time Taken For Async : ", time.time() - _start_async
  104.  
  105. cassandra_session.execute("truncate table test.test_non_async")
  106. print "Truncated table test_non_async"
  107.  
  108. _start_non_async = time.time()
  109. query = "INSERT INTO test_non_async(key,value,str) VALUES (?, ?, ?)"
  110. prepared_query = cassandra_session.prepare(query)
  111. prepared_query.consistency_level = ConsistencyLevel.LOCAL_ONE
  112. print "Query Prepared for test_non_async"
  113. sw.start('complete-non-async')
  114. for l in range(1, num_count_test):
  115. sw.start('non-async-randomgen')
  116. param1 = random.randint(1, 1000000)
  117. param2 = random.randint(1, 1000000)
  118. param3 = ''.join(random.choice(ascii_uppercase) for m in range(20))
  119. sw.stop('non-async-randomgen')
  120. try:
  121. cassandra_session.execute(prepared_query, (param1, param2, param3))
  122. except:
  123. print traceback.format_exc()
  124.  
  125. sw.stop('complete-non-async')
  126.  
  127. print sw.accum
  128.  
  129. print "Time Taken For Non Async : ", time.time() - _start_non_async
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement