Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from cassandra.cluster import Cluster
- from confluent_kafka import Consumer
- class WriteToCassandra(object):
- def __init__(self):
- self.kafka_consumer = Consumer({'bootstrap.servers': broker_addr, 'group.id': const.CONSUMER_GROUP_CASSANDRA})
- self.kafka_consumer.subscribe([const.CASSANDRA_TOPIC])
- self.cluster = Cluster(cassandra_addr)
- self.session = self.cluster.connect(const.CASSANDRA_KEYSPACE)
- def session_execute(self, query):
- future = self.session.execute_async(query)
- try:
- result = future.result()
- except Exception as e:
- self.log.info(query)
- raise Exception(e) # Intentially added here to stop the script for testing.
- def get_insert_query(self, message):
- data_received = message.value()
- data = json.loads(data_received)
- query = "insert into " + data["tablename"] + " json '" + json.dumps(data["data"]) + "';"
- return query
- def run(self):
- while True:
- message = self.kafka_consumer.poll(0)
- query = self.get_insert_query(message)
- self.session_execute(self.session, query)
- if __name__ == '__main__':
- cassandra_write = WriteToCassandra()
- cassandrawrite.run()
Add Comment
Please, Sign In to add comment