Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import redis
- from kafka import KafkaConsumer
- from kafka import TopicPartition
- import pymysql
- from collections import Counter
- def news(event, context):
- kafkaServer = os.environ['KAFKASERVER']
- kafkaTopic = os.environ['KAFKATOPIC']
- redisKey = os.environ['REDISKEY']
- redisServer = os.environ['REDISSERVER']
- redisPort = os.environ['REDISPORT']
- redisDB = os.environ['REDISDB']
- hostwrite = os.environ['HOSTWRITE']
- name = os.environ['USER']
- password = os.environ['PW']
- db_name = os.environ['DB']
- dbtable = os.environ['DBTABLE']
- dbcolumn = os.environ['DBCOLUMN']
- try:
- connwrite = pymysql.connect(hostwrite, user=name, passwd=password, db=db_name,charset='utf8',cursorclass=pymysql.cursors.DictCursor,connect_timeout=5)
- except Exception as e:
- raise e
- logger.error("ERROR: Unexpected error: Could not connect to MySql instance.")
- sys.exit()
- listids = []
- consumer = KafkaConsumer(bootstrap_servers=kafkaServer)
- partition = TopicPartition(kafkaTopic, 0)
- r = redis.StrictRedis(host=redisServer, port=int(redisPort), db=int(redisDB))
- start = r.get(redisKey)
- if start == None:
- start = 0
- consumer.assign([partition])
- consumer.seek_to_end(partition)
- lastOffset = consumer.position(partition)
- consumer.seek(partition, int(start))
- print lastOffset
- print start
- if lastOffset > int(start):
- for msg in consumer:
- if msg.offset == lastOffset-1:
- r.set(redisKey,lastOffset)
- break
- item_dict = json.loads(msg.value)
- listids.append(item_dict['id'])
- finaldict = Counter(listids)
- for k, v in finaldict.iteritems():
- with connwrite.cursor() as cur:
- if (int(v)>0):
- cur.execute ("UPDATE %s set %s = %s + %s where id = %s",(dbtable,dbcolumn,dbcolumn,v,k))
- connwrite.commit()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement