Guest User

pagedresulthandler

a guest
May 18th, 2016
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.87 KB | None | 0 0
  1. import time
  2.  
  3. import multiprocessing
  4. from cassandra.cluster import Cluster
  5. from cassandra.policies import DCAwareRoundRobinPolicy
  6.  
  7. count = 0
  8.  
  9. class PagedResultHandler(object):
  10.  
  11.     def __init__(self, future):
  12.         self.error = None
  13.         self.finished_event = multiprocessing.Event()
  14.         self.future = future
  15.         self.future.add_callbacks(
  16.             callback=self.handle_page,
  17.             errback=self.handle_error)
  18.  
  19.     def handle_page(self, rows):
  20.         global count
  21.         for row in rows:
  22.             #process_row(row)
  23.             count += 1
  24.             #print row
  25.  
  26.         if self.future.has_more_pages:
  27.             self.future.start_fetching_next_page()
  28.         else:
  29.             self.finished_event.set()
  30.  
  31.     def handle_error(self, exc):
  32.         self.error = exc
  33.         self.finished_event.set()
  34.  
  35. def get_cassandra_session():
  36.         #print "I GOT CALLED"
  37.     print ("Initializing cassandra session for process")
  38.     #username=config_map['cassandra_username'], password=config_map['cassandra_password'])
  39.     #cluster = Cluster(['127.0.0.1'], load_balancing_policy=DCAwareRoundRobinPolicy())
  40.     cluster = Cluster(['10.41.55.56'], load_balancing_policy=DCAwareRoundRobinPolicy())
  41.  
  42.     session = cluster.connect()
  43.     session.set_keyspace('cams')
  44.     return session
  45.  
  46.  
  47. #future = session.execute_async("SELECT * FROM users")
  48. _start = time.time()
  49.  
  50. session = get_cassandra_session()
  51. future = session.execute_async("select * from product_filter_mapping")
  52. handler = PagedResultHandler(future)
  53. handler.finished_event.wait()
  54.  
  55. if handler.error:
  56.     raise handler.error
  57.  
  58. print "Time Taken Using Async : ", time.time() - _start
  59.  
  60. print count
  61.  
  62. _start = time.time()
  63. count = 0
  64. rs = session.execute("select * from product_filter_mapping")
  65.  
  66. for row in rs:
  67.     count += 1
  68.  
  69. print "Time Taken Without Async : ", time.time() - _start
  70.  
  71. print count
Add Comment
Please, Sign In to add comment