Advertisement
Guest User

Untitled

a guest
Dec 3rd, 2017
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.96 KB | None | 0 0
  1. import json
  2. import redis
  3. from kafka import KafkaConsumer
  4. from kafka import TopicPartition
  5. import pymysql
  6.  
  7. from collections import Counter
  8. def news(event, context):
  9.     kafkaServer  = os.environ['KAFKASERVER']
  10.     kafkaTopic  = os.environ['KAFKATOPIC']
  11.     redisKey = os.environ['REDISKEY']
  12.     redisServer = os.environ['REDISSERVER']
  13.     redisPort = os.environ['REDISPORT']
  14.     redisDB = os.environ['REDISDB']
  15.     hostwrite  = os.environ['HOSTWRITE']
  16.     name = os.environ['USER']
  17.     password = os.environ['PW']
  18.     db_name = os.environ['DB']
  19.     dbtable = os.environ['DBTABLE']
  20.     dbcolumn = os.environ['DBCOLUMN']
  21.     try:
  22.         connwrite = pymysql.connect(hostwrite, user=name, passwd=password, db=db_name,charset='utf8',cursorclass=pymysql.cursors.DictCursor,connect_timeout=5)
  23.     except Exception as e:
  24.         raise e
  25.         logger.error("ERROR: Unexpected error: Could not connect to MySql instance.")
  26.         sys.exit()
  27.  
  28.     listids = []
  29.     consumer = KafkaConsumer(bootstrap_servers=kafkaServer)
  30.     partition = TopicPartition(kafkaTopic, 0)
  31.     r = redis.StrictRedis(host=redisServer, port=int(redisPort), db=int(redisDB))
  32.     start = r.get(redisKey)
  33.     if start == None:
  34.         start = 0
  35.     consumer.assign([partition])
  36.     consumer.seek_to_end(partition)
  37.     lastOffset = consumer.position(partition)
  38.     consumer.seek(partition, int(start))
  39.     print lastOffset
  40.     print start
  41.     if lastOffset > int(start):
  42.         for msg in consumer:
  43.             if msg.offset == lastOffset-1:
  44.                 r.set(redisKey,lastOffset)
  45.                 break
  46.             item_dict = json.loads(msg.value)
  47.             listids.append(item_dict['id'])
  48.     finaldict = Counter(listids)
  49.     for k, v in finaldict.iteritems():
  50.             with connwrite.cursor() as cur:
  51.                 if (int(v)>0):
  52.                     cur.execute ("UPDATE %s set %s = %s + %s where id = %s",(dbtable,dbcolumn,dbcolumn,v,k))
  53.                     connwrite.commit()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement