Advertisement
Guest User

Untitled

a guest
Jun 29th, 2016
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.94 KB | None | 0 0
  1. #!/usr/bin/env python
  2. import sys
  3. import threading
  4. import random
  5. import string
  6. import time
  7. from datetime import datetime, timezone, timedelta
  8. from cassandra.cluster import Cluster
  9. from cassandra.cluster import NoHostAvailable
  10. from cassandra.policies import WhiteListRoundRobinPolicy
  11. from cassandra.query import dict_factory, SimpleStatement
  12. from cassandra.auth import PlainTextAuthProvider
  13. from cassandra.connection import ConsistencyLevel
  14. from cassandra import InvalidRequest
  15.  
  16. query_times = []
  17.  
  18.  
  19. class CassandraCheck(object):
  20. def __init__(self, hosts, keyspace="replication_check"):
  21. self.query_time = 0
  22. self.hosts = hosts
  23. if not isinstance(hosts, list):
  24. print("[!] List of hosts has to be a list")
  25. sys.exit(1)
  26. try:
  27. auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
  28. cluster = Cluster(
  29. hosts,
  30. load_balancing_policy=WhiteListRoundRobinPolicy(hosts),
  31. auth_provider=auth_provider,
  32. port=9042
  33. )
  34. self.session = cluster.connect()
  35. self.metadata = cluster.metadata
  36. self.keyspace = keyspace
  37. except NoHostAvailable as e:
  38. print("[!] Error connecting to Cassandra hosts:\n%s" % e)
  39. sys.exit(1)
  40.  
  41. try:
  42. self.session.set_keyspace(keyspace)
  43. self.session.row_factory = dict_factory
  44. except InvalidRequest as e:
  45. print("[!] Error switching to keyspace '%s':\n%s" % (keyspace, e))
  46. sys.exit(1)
  47.  
  48. def get_replicas(self, key):
  49. return [host for host in self.metadata.get_replicas(self.keyspace, key)]
  50.  
  51. def test(self, key):
  52. query = "SELECT value FROM test where key = %s"
  53. stmt = SimpleStatement(query, consistency_level=ConsistencyLevel.LOCAL_ONE)
  54. rows = self.session.execute(stmt, [key], trace=True)
  55. if len(rows.current_rows) == 1:
  56. #print(rows.get_query_trace())
  57. now = datetime.now(timezone.utc)
  58. epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
  59. self.query_time = (now - epoch) // timedelta(microseconds=1)
  60. return self.query_time
  61. else:
  62. return 0
  63.  
  64. def insert(self, key):
  65. query = "INSERT INTO test (key, value) VALUES (%s, %s)"
  66. stmt = SimpleStatement(query, consistency_level=ConsistencyLevel.LOCAL_ONE)
  67. self.session.execute(stmt, [key, key], trace=True)
  68.  
  69. def remove(self, key):
  70. query = "DELETE FROM test WHERE key = %s"
  71. stmt = SimpleStatement(query, consistency_level=ConsistencyLevel.LOCAL_ONE)
  72. self.session.execute(stmt, [key], trace=True)
  73.  
  74.  
  75. def worker_read(cassandra, key):
  76. query_time = 0
  77. time.sleep(0.1)
  78. while query_time == 0:
  79. query_time = cassandra.test(key)
  80. query_times.append(int(cassandra.query_time))
  81. print("[+] Host: %s, Time: %s" % (cassandra.hosts[0], cassandra.query_time))
  82.  
  83.  
  84. def worker_insert(cassandra, key):
  85. time.sleep(3)
  86. print("[+] Writing key: %s" % key)
  87. cassandra.insert(key)
  88. time.sleep(2)
  89. cassandra.remove(key)
  90.  
  91.  
  92. if __name__ == '__main__':
  93. c = CassandraCheck(['172.31.28.150'])
  94. key = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(16))
  95. print("[+] Generating random key: %s" % key)
  96. n = 0
  97. for replica in c.get_replicas(key):
  98. replica = replica.__str__()
  99. reader = threading.Thread(target=worker_read, args=(CassandraCheck([replica]), key,))
  100. reader.start()
  101. print("[+] Reader thread for replica: %s started" % replica)
  102. if n == 0:
  103. writer = threading.Thread(target=worker_insert, args=(CassandraCheck([replica]), key,))
  104. writer.start()
  105. n += 1
  106.  
  107. reader.join(timeout=10)
  108. writer.join(timeout=10)
  109.  
  110. l = len(query_times)
  111. print("==\nAprox replication times (microseconds)\nBest: %s\nWorst: %s\n" % (query_times[1] - query_times[0], query_times[l - 1] - query_times[0]))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement