Advertisement
aproxtime

Customer With Header

May 22nd, 2019
117
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.30 KB | None | 0 0
  1. from kafka import KafkaConsumer
  2. from kafka import KafkaProducer
  3. import json,time
  4. from json import loads
  5.  
  6. def chunkIt(seq, num):
  7.     avg = len(seq) / float(num)
  8.     out = []
  9.     last = 0.0
  10.  
  11.     while last < len(seq):
  12.         out.append(seq[int(last):int(last + avg)])
  13.         last += avg
  14.  
  15.     return out
  16.  
  17.  
  18. filename="namafile"
  19. extension="txt"
  20. topic="test1"
  21. beginRec=False
  22. counter=0
  23. msg=[]
  24.  
  25. consumer = KafkaConsumer(bootstrap_servers='localhost:9092',value_deserializer=lambda x: loads(x.decode('utf-8')))
  26. KafkaConsumer()
  27. consumer.subscribe(topic)
  28. print('Start subscribe {}\n'.format(topic))
  29.  
  30.  
  31.  
  32. for message in consumer:
  33.     if (beginRec == True):
  34.         if (message.value == u'END'):
  35.             break
  36.         else:
  37.             msg.append(message.value)
  38.             print('Receive Data : {}'.format(message.value))
  39.             counter += 1
  40.        
  41.    
  42.     if (message.value == u'BEGIN'):
  43.         beginRec = True
  44.         print("Begin Rec")
  45.  
  46. print("\nStart processing data")
  47. print("Total data: {}".format(len(msg)))
  48. chunked = chunkIt(msg, 3)
  49.  
  50. cnt=0
  51. for data in chunked:
  52.     cnt += 1
  53.     f = open("{}{}.{}".format(filename, cnt, extension), "w")
  54.     for row in data:
  55.         f.write(row)
  56.    
  57.     f.close()
  58.     print("Done saving {}{}.{}".format(filename, cnt, extension))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement