Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import sys
- import threading
- import random
- import string
- import time
- from datetime import datetime, timezone, timedelta
- from cassandra.cluster import Cluster
- from cassandra.cluster import NoHostAvailable
- from cassandra.policies import WhiteListRoundRobinPolicy
- from cassandra.query import dict_factory, SimpleStatement
- from cassandra.auth import PlainTextAuthProvider
- from cassandra.connection import ConsistencyLevel
- from cassandra import InvalidRequest
- query_times = []
- class CassandraCheck(object):
- def __init__(self, hosts, keyspace="replication_check"):
- self.query_time = 0
- self.hosts = hosts
- if not isinstance(hosts, list):
- print("[!] List of hosts has to be a list")
- sys.exit(1)
- try:
- auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
- cluster = Cluster(
- hosts,
- load_balancing_policy=WhiteListRoundRobinPolicy(hosts),
- auth_provider=auth_provider,
- port=9042
- )
- self.session = cluster.connect()
- self.metadata = cluster.metadata
- self.keyspace = keyspace
- except NoHostAvailable as e:
- print("[!] Error connecting to Cassandra hosts:\n%s" % e)
- sys.exit(1)
- try:
- self.session.set_keyspace(keyspace)
- self.session.row_factory = dict_factory
- except InvalidRequest as e:
- print("[!] Error switching to keyspace '%s':\n%s" % (keyspace, e))
- sys.exit(1)
- def get_replicas(self, key):
- return [host for host in self.metadata.get_replicas(self.keyspace, key)]
- def test(self, key):
- query = "SELECT value FROM test where key = %s"
- stmt = SimpleStatement(query, consistency_level=ConsistencyLevel.LOCAL_ONE)
- rows = self.session.execute(stmt, [key], trace=True)
- if len(rows.current_rows) == 1:
- #print(rows.get_query_trace())
- now = datetime.now(timezone.utc)
- epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
- self.query_time = (now - epoch) // timedelta(microseconds=1)
- return self.query_time
- else:
- return 0
- def insert(self, key):
- query = "INSERT INTO test (key, value) VALUES (%s, %s)"
- stmt = SimpleStatement(query, consistency_level=ConsistencyLevel.LOCAL_ONE)
- self.session.execute(stmt, [key, key], trace=True)
- def remove(self, key):
- query = "DELETE FROM test WHERE key = %s"
- stmt = SimpleStatement(query, consistency_level=ConsistencyLevel.LOCAL_ONE)
- self.session.execute(stmt, [key], trace=True)
- def worker_read(cassandra, key):
- query_time = 0
- time.sleep(0.1)
- while query_time == 0:
- query_time = cassandra.test(key)
- query_times.append(int(cassandra.query_time))
- print("[+] Host: %s, Time: %s" % (cassandra.hosts[0], cassandra.query_time))
- def worker_insert(cassandra, key):
- time.sleep(3)
- print("[+] Writing key: %s" % key)
- cassandra.insert(key)
- time.sleep(2)
- cassandra.remove(key)
- if __name__ == '__main__':
- c = CassandraCheck(['172.31.28.150'])
- key = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(16))
- print("[+] Generating random key: %s" % key)
- n = 0
- for replica in c.get_replicas(key):
- replica = replica.__str__()
- reader = threading.Thread(target=worker_read, args=(CassandraCheck([replica]), key,))
- reader.start()
- print("[+] Reader thread for replica: %s started" % replica)
- if n == 0:
- writer = threading.Thread(target=worker_insert, args=(CassandraCheck([replica]), key,))
- writer.start()
- n += 1
- reader.join(timeout=10)
- writer.join(timeout=10)
- l = len(query_times)
- print("==\nAprox replication times (microseconds)\nBest: %s\nWorst: %s\n" % (query_times[1] - query_times[0], query_times[l - 1] - query_times[0]))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement