Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import time
- import multiprocessing
- from cassandra.cluster import Cluster
- from cassandra.policies import DCAwareRoundRobinPolicy
- count = 0
- class PagedResultHandler(object):
- def __init__(self, future):
- self.error = None
- self.finished_event = multiprocessing.Event()
- self.future = future
- self.future.add_callbacks(
- callback=self.handle_page,
- errback=self.handle_error)
- def handle_page(self, rows):
- global count
- for row in rows:
- #process_row(row)
- count += 1
- #print row
- if self.future.has_more_pages:
- self.future.start_fetching_next_page()
- else:
- self.finished_event.set()
- def handle_error(self, exc):
- self.error = exc
- self.finished_event.set()
- def get_cassandra_session():
- #print "I GOT CALLED"
- print ("Initializing cassandra session for process")
- #username=config_map['cassandra_username'], password=config_map['cassandra_password'])
- #cluster = Cluster(['127.0.0.1'], load_balancing_policy=DCAwareRoundRobinPolicy())
- cluster = Cluster(['10.41.55.56'], load_balancing_policy=DCAwareRoundRobinPolicy())
- session = cluster.connect()
- session.set_keyspace('cams')
- return session
- #future = session.execute_async("SELECT * FROM users")
- _start = time.time()
- session = get_cassandra_session()
- future = session.execute_async("select * from product_filter_mapping")
- handler = PagedResultHandler(future)
- handler.finished_event.wait()
- if handler.error:
- raise handler.error
- print "Time Taken Using Async : ", time.time() - _start
- print count
- _start = time.time()
- count = 0
- rs = session.execute("select * from product_filter_mapping")
- for row in rs:
- count += 1
- print "Time Taken Without Async : ", time.time() - _start
- print count
Add Comment
Please, Sign In to add comment