Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from kafka import KafkaConsumer
- from kafka import KafkaProducer
- import json,time
- from json import loads
- def chunkIt(seq, num):
- avg = len(seq) / float(num)
- out = []
- last = 0.0
- while last < len(seq):
- out.append(seq[int(last):int(last + avg)])
- last += avg
- return out
- filename="namafile"
- extension="txt"
- topic="test1"
- beginRec=False
- counter=0
- msg=[]
- consumer = KafkaConsumer(bootstrap_servers='localhost:9092',value_deserializer=lambda x: loads(x.decode('utf-8')))
- KafkaConsumer()
- consumer.subscribe(topic)
- print('Start subscribe {}\n'.format(topic))
- for message in consumer:
- if (beginRec == True):
- if (message.value == u'END'):
- break
- else:
- msg.append(message.value)
- print('Receive Data : {}'.format(message.value))
- counter += 1
- if (message.value == u'BEGIN'):
- beginRec = True
- print("Begin Rec")
- print("\nStart processing data")
- print("Total data: {}".format(len(msg)))
- chunked = chunkIt(msg, 3)
- cnt=0
- for data in chunked:
- cnt += 1
- f = open("{}{}.{}".format(filename, cnt, extension), "w")
- for row in data:
- f.write(row)
- f.close()
- print("Done saving {}{}.{}".format(filename, cnt, extension))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement