Guest User

Untitled

a guest
Mar 22nd, 2018
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.24 KB | None | 0 0
  1. from cassandra.cluster import Cluster
  2. from confluent_kafka import Consumer
  3.  
  4.  
  5. class WriteToCassandra(object):
  6. def __init__(self):
  7. self.kafka_consumer = Consumer({'bootstrap.servers': broker_addr, 'group.id': const.CONSUMER_GROUP_CASSANDRA})
  8. self.kafka_consumer.subscribe([const.CASSANDRA_TOPIC])
  9.  
  10. self.cluster = Cluster(cassandra_addr)
  11. self.session = self.cluster.connect(const.CASSANDRA_KEYSPACE)
  12.  
  13. def session_execute(self, query):
  14. future = self.session.execute_async(query)
  15. try:
  16. result = future.result()
  17. except Exception as e:
  18. self.log.info(query)
  19. raise Exception(e) # Intentially added here to stop the script for testing.
  20.  
  21. def get_insert_query(self, message):
  22. data_received = message.value()
  23. data = json.loads(data_received)
  24. query = "insert into " + data["tablename"] + " json '" + json.dumps(data["data"]) + "';"
  25. return query
  26.  
  27. def run(self):
  28. while True:
  29. message = self.kafka_consumer.poll(0)
  30. query = self.get_insert_query(message)
  31. self.session_execute(self.session, query)
  32.  
  33. if __name__ == '__main__':
  34. cassandra_write = WriteToCassandra()
  35. cassandrawrite.run()
Add Comment
Please, Sign In to add comment