Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import random
- import time
- import traceback
- from string import ascii_uppercase
- from cassandra import ConsistencyLevel
- from cassandra.cluster import Cluster
- from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
- from google.apputils import stopwatch
- count = 0
- # Test for async callback with retries
- def get_cassandra_session():
- #print "I GOT CALLED"
- print ("Initializing cassandra session for process")
- #username=config_map['cassandra_username'], password=config_map['cassandra_password'])
- #cluster = Cluster(['127.0.0.1'], load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()))
- cluster = Cluster(['192.168.3.41', '192.168.3.42', '192.168.3.43'], load_balancing_policy=DCAwareRoundRobinPolicy())
- # metadata = cluster.metadata
- session = cluster.connect()
- session.set_keyspace('test')
- return session
- cassandra_session = get_cassandra_session()
- def errback_fn(error ,prepared_statement, statement_params):
- try:
- exception_count = 0
- global cassandra_session
- if cassandra_session is None or cassandra_session.is_shutdown or error:
- cassandra_session = get_cassandra_session()
- for key in cassandra_session.get_pool_state():
- for in_flight in cassandra_session.get_pool_state()[key]['in_flights']:
- print "in_flight count is ", in_flight
- if in_flight > 1000:
- print "Future thread Sleeping for 500ms, in_flight is ", in_flight
- time.sleep(.5)
- print "Open Count is ", cassandra_session.get_pool_state()[key]['open_count']
- while True:
- try:
- print "inside errback, after encountering error: ",error
- print "Statement is :",prepared_statement
- print "Params are :", statement_params
- cassandra_session.execute(prepared_statement, statement_params)
- exception_count = 0
- break
- except:
- print traceback.format_exc()
- time.sleep(5)
- # Get a new cassandra session
- if cassandra_session is None or cassandra_session.is_shutdown:
- cassandra_session = get_cassandra_session()
- exception_count += 1
- if exception_count > 5:
- print "Severe Error Occured for statement: ", prepared_statement, " Params: ", statement_params, " Breaking out of loop"
- break
- continue
- except:
- print traceback.format_exc()
- cassandra_session.execute("truncate table test.test_async;")
- print "Truncated table test_async"
- future = None
- num_count_test = 1000000
- _start_async = time.time()
- query = "INSERT INTO test_async(key,value,str) VALUES (?, ?, ?)"
- prepared_query = cassandra_session.prepare(query)
- prepared_query.consistency_level = ConsistencyLevel.LOCAL_ONE
- print "Query Prepared for test_async"
- sw = stopwatch.StopWatch()
- sw.start('complete-async')
- for j in range(1, num_count_test):
- try:
- sw.start('async-randomgen')
- param1 = random.randint(1, 1000000)
- param2 = random.randint(1, 1000000)
- param3 = ''.join(random.choice(ascii_uppercase) for i in range(20))
- sw.stop('async-randomgen')
- for key in cassandra_session.get_pool_state():
- for in_flight in cassandra_session.get_pool_state()[key]['in_flights']:
- if in_flight > 1000:
- print "Future thread Sleeping for 500ms, in_flight is ", in_flight
- time.sleep(.5)
- # if cassandra_session.get_pool_state()[key]['open_count'] > 20:
- # print "Future thread Sleeping for 500ms"
- # time.sleep(.5)
- future = cassandra_session.execute_async(prepared_query,(param1,param2,param3))
- future.add_errback(errback_fn, prepared_query, (param1, param2, param3))
- except:
- print traceback.format_exc()
- if cassandra_session is None or cassandra_session.is_shutdown:
- print "Reinitializing Cassandra Session"
- cassandra_session = get_cassandra_session()
- continue
- sw.stop('complete-async')
- # Block the future
- future.result()
- print "Time Taken For Async : ", time.time() - _start_async
- cassandra_session.execute("truncate table test.test_non_async")
- print "Truncated table test_non_async"
- _start_non_async = time.time()
- query = "INSERT INTO test_non_async(key,value,str) VALUES (?, ?, ?)"
- prepared_query = cassandra_session.prepare(query)
- prepared_query.consistency_level = ConsistencyLevel.LOCAL_ONE
- print "Query Prepared for test_non_async"
- sw.start('complete-non-async')
- for l in range(1, num_count_test):
- sw.start('non-async-randomgen')
- param1 = random.randint(1, 1000000)
- param2 = random.randint(1, 1000000)
- param3 = ''.join(random.choice(ascii_uppercase) for m in range(20))
- sw.stop('non-async-randomgen')
- try:
- cassandra_session.execute(prepared_query, (param1, param2, param3))
- except:
- print traceback.format_exc()
- sw.stop('complete-non-async')
- print sw.accum
- print "Time Taken For Non Async : ", time.time() - _start_non_async
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement