Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from kafka import KafkaConsumer
- import re
- # connect to kafka topic
- kaf = KafkaConsumer('kafka.consumer.topic',
- auto_offset_reset='earliest', bootstrap_servers=[
- 'consumer-kafka.hdw.r53.deap.tv'])
- outputfile = 'Z:\KafkaConsum\file.csv'
- outfile = open(outputfile, mode='w', newline='')
- i = 0
- header = True
- for row in kaf:
- a = row.value.decode("utf-8")
- if 'Twitter' in a:
- data = re.findall(r'w+=s*['"]?[w-]+', a)
- headers = []
- array = []
- for pair in data:
- m = re.search(r'(w+)=s*['"]?([w-]+)', pair)
- headers.append(m.group(1)) # get header
- array.append(m.group(2)) # get value
- if header == True:
- h = '|'.join(headers)
- print(h)
- outfile.write(h + 'n')
- header = False
- r = '|'.join(array)
- print(r)
- outfile.write(r + 'n')
- outfile.close()
Add Comment
Please, Sign In to add comment