Advertisement
Guest User

Untitled

a guest
Oct 23rd, 2018
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 13.53 KB | None | 0 0
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import os
  4. import gzip
  5. import sys
  6. import glob
  7. import logging
  8. import collections
  9. import time
  10. from optparse import OptionParser
  11. import appsinstalled_pb2
  12. import memcache
  13. import threading
  14. import multiprocessing as mp
  15. from queue import Queue
  16. # R: нужно две линии отступа от импортов
  17. CONNECTION_RETRY_TIMEOUT = 20
  18. CONNECTION_TIMEOUT = 5
  19. NORMAL_ERR_RATE = 0.01
  20. # R: нужен отступ после констант
  21. AppsInstalled = collections.namedtuple("AppsInstalled", ["dev_type", "dev_id", "lat", "lon", "apps"])
  22.  
  23.  
  24. def parse_appsinstalled(line):
  25.     line_parts = line.decode("utf-8").strip().split("\t")
  26.     if len(line_parts) < 5:
  27.         return
  28.     dev_type, dev_id, lat, lon, raw_apps = line_parts
  29.     if not dev_type or not dev_id:
  30.         return
  31.     try:
  32.         apps = [int(a.strip()) for a in raw_apps.split(",")]
  33.     except ValueError:
  34.         apps = [int(a.strip()) for a in raw_apps.split(",") if a.isidigit()]
  35.         # R: Форматирование библиотека Logging будет делать сама. Параметры нужно передать просто аргументами
  36.         logging.info("Not all user apps are digits: `%s`", line)
  37.     try:
  38.         lat, lon = float(lat), float(lon)
  39.     except ValueError:
  40.         logging.info("Invalid geo coords: `%s`", line)  # R: L35 (исправил)
  41.     return AppsInstalled(dev_type, dev_id, lat, lon, apps)
  42.  
  43.  
  44. def insert_appsinstalled(appsinstalled, device_memc, dry_run=False, line_num=0):
  45.     errors = 0
  46.     memc_addr = device_memc.get(appsinstalled.dev_type)
  47.     if not memc_addr:
  48.         errors += 1
  49.         logging.error("Unknown device type: %s", appsinstalled.dev_type)  # R: L35 (исправил)
  50.         return errors
  51.  
  52.     ua = appsinstalled_pb2.UserApps()
  53.     ua.lat = appsinstalled.lat
  54.     ua.lon = appsinstalled.lon
  55.     key = "%s:%s" % (appsinstalled.dev_type, appsinstalled.dev_id)
  56.     ua.apps.extend(appsinstalled.apps)
  57.     packed = ua.SerializeToString()
  58.  
  59.     # @TODO persistent connection
  60.     # @TODO retry and timeouts!
  61.  
  62.     if dry_run:
  63.         logging.debug("%s - %s -> %s" % (memc_addr, key, str(ua).replace("\n", " ")))
  64.     else:
  65.         # R: закоменченный код нужно удалять. После решётки должен быть пробел
  66.         #result = memc_client.write(appsinstalled.dev_type, memc_addr, key, packed)
  67.         return errors, appsinstalled.dev_type, memc_addr, key, packed
  68.  
  69.  
  70. def prototest():
  71.     sample = "idfa\t1rfw452y52g2gq4g\t55.55\t42.42\t1423,43,567,3,7,23\ngaid\t7rfw452y52g2gq4g\t55.55\t42.42\t7423,424"
  72.     for line in sample.splitlines():
  73.         dev_type, dev_id, lat, lon, raw_apps = line.strip().split("\t")
  74.         apps = [int(a) for a in raw_apps.split(",") if a.isdigit()]
  75.         lat, lon = float(lat), float(lon)
  76.         ua = appsinstalled_pb2.UserApps()
  77.         ua.lat = lat
  78.         ua.lon = lon
  79.         ua.apps.extend(apps)
  80.         packed = ua.SerializeToString()
  81.         unpacked = appsinstalled_pb2.UserApps()
  82.         unpacked.ParseFromString(packed)
  83.         assert ua == unpacked
  84.  
  85.  
  86. class MemcClient():
  87.  
  88.     def __init__(self):
  89.         self.memc_pool = {}
  90.  
  91.     def write(self, dev_type, memc_addr, key, packed):
  92.         # R: проверка на None должна быть is None
  93.         if self.memc_pool.get(dev_type) == None:
  94.             self.memc_pool[dev_type] = memcache.Client([memc_addr], dead_retry=CONNECTION_RETRY_TIMEOUT, socket_timeout=CONNECTION_TIMEOUT)
  95.             logging.info("Opening Memcache connection, dev_type={} addr={}".format(dev_type, memc_addr))
  96.  
  97.         if self.memc_pool[dev_type].servers[0]._get_socket():  # connection established
  98.             result = self.memc_pool[dev_type].set(key, packed)
  99.             if not result:
  100.                 # тут нужно не .exception, а .error. Плюс переменной `e` нет в этом скоупе. и отсылаю к строке 35
  101.                 logging.exception("Cannot write to memc %s: %s" % (memc_addr, e))
  102.                 return False
  103.         else:
  104.             logging.exception("Error connecting to %s" % (memc_addr))  # R: L35
  105.             return False
  106.         return True
  107.  
  108.  
  109. def prepare_protobuf(work_queue, memc_queue, result_queue, device_memc):
  110.     '''Prepares protobuf pack for Memcache load'''
  111.     tries = 0
  112.     while True:
  113.         line_num, appsinstalled = work_queue.get()
  114.         errors, dev_type, memc_addr, key, packed = insert_appsinstalled(
  115.             appsinstalled, device_memc, line_num=line_num)  # R: слишком длинная строка, можно поставить перенос
  116.         memc_queue.put((dev_type, memc_addr, key, packed))
  117.         result_queue.put({'errors': errors, 'processed': 0})
  118.         if tries == 2:
  119.             break
  120.         if work_queue.empty():
  121.             logging.info("Protobuf process: work queue is empty, waiting ...")
  122.             tries += 1
  123.             time.sleep(3)
  124.         else:
  125.             tries = 0
  126.     logging.info('Protobuf process: work finished')
  127.  
  128.  
  129. class MemcWorker(threading.Thread):
  130.  
  131.     def __init__(self, memc_client, memc_queue, result_queue):
  132.         threading.Thread.__init__(self)
  133.         self.memc_queue = memc_queue
  134.         self.result_queue = result_queue
  135.         self.memc_client = memc_client
  136.         self.running = True
  137.         self.errors = 0
  138.         self.processed = 0
  139.  
  140.     def disable(self):
  141.         self.running = False
  142.  
  143.     def send_results(self):
  144.         self.result_queue.put({'errors': self.errors, 'processed': self.processed})
  145.         self.errors = 0
  146.         self.processed = 0
  147.  
  148.     def run(self):
  149.         # R: Здесь докстринг обернут двойными кавычками с переносами, а на строке 110 одинарными кавычками в одну строку
  150.         # R: Необходимо придерживаться единого стиля
  151.         """
  152.        Thread run method. Reads packages from queue and sends them to Memcache server
  153.        """
  154.         while self.running:
  155.             dev_type, memc_addr, key, packed = self.memc_queue.get()
  156.             ok = self.memc_client.write(dev_type, memc_addr, key, packed)
  157.             if ok:
  158.                 self.processed += 1
  159.             else:
  160.                 self.errors += 1
  161.             print('Memc added: {} {}, queue: {} {}'.format(key, ok, self.memc_queue.qsize(), self.result_queue.unfinished_tasks))  # R: L115
  162.             if self.errors > 1000 or self.processed > 1000:     # Saving preliminary results
  163.                 self.send_results()
  164.         # R: Теперь форматирование через .format, а не проценты, но оно всё ещё тут не нужно. Ещё и намешано получается
  165.         logging.info("MemcWorker {} finished task".format(self.name))
  166.         self.send_results()
  167.  
  168.  
  169. class LineWorker(threading.Thread):
  170.  
  171.     def __init__(self, work_queue, result_queue,  fn):  # R: Лишний пробел перед последним аргументом
  172.         threading.Thread.__init__(self)
  173.         self.work_queue = work_queue
  174.         self.result_queue = result_queue
  175.         self.fn = fn
  176.         self.errors = 0
  177.  
  178.     def run(self):
  179.         # R: L149
  180.         """
  181.        Thread run method. Reads file line by line and sends them to queue
  182.        """
  183.         if self.fn is not None:
  184.             try:
  185.                 with gzip.open(self.fn, 'rb') as fd:
  186.                     for line_num, line in enumerate(fd):
  187.                         line = line.strip()
  188.                         if not line:
  189.                             continue
  190.                         appsinstalled = parse_appsinstalled(line)
  191.                         if not appsinstalled:
  192.                             self.errors += 1
  193.                             continue
  194.                         self.work_queue.put((line_num, appsinstalled))
  195.                 self.result_queue.put({'errors': self.errors, 'processed': 0})
  196.             except:  # R: Необходимо указать, какое исключение мы ловим, не надо использовать пустой except. хотя бы except Exception
  197.                 logging.exception("Error reading file: %s" % (self.fn))  # R: L35
  198.             logging.info("LineWorker finished task: file {}, errors={}".format(self.fn, self.errors))  # R: L35, L164
  199.  
  200.  
  201. def dot_rename(path):
  202.     head, fn = os.path.split(path)
  203.     # atomic in most cases
  204.     os.rename(path, os.path.join(head, "." + fn))
  205.  
  206.  
  207. def main(options):
  208.     device_memc = {
  209.         "idfa": options.idfa,
  210.         "gaid": options.gaid,
  211.         "adid": options.adid,
  212.         "dvid": options.dvid,
  213.     }
  214.  
  215.     # Init queues
  216.     work_queue = mp.Queue(maxsize=300000)
  217.     memc_queue = mp.Queue(maxsize=300000)
  218.     result_queue = Queue()
  219.  
  220.     processed = 0
  221.     errors = 0
  222.  
  223.     # Line workers: parse files to strings and send them to work_queue
  224.     for fn in glob.iglob(options.pattern):
  225.         logging.info('Processing %s' % fn)  # R: L35
  226.         producer = LineWorker(work_queue, result_queue, fn)
  227.         producer.start()
  228.  
  229.     # Protobuf process: prepares packages from work_queue strings and sends them to memc_queue
  230.     proto_processes = [mp.Process(target=prepare_protobuf, args=(work_queue, memc_queue, result_queue, device_memc)) for x in range(2)]  # R: L115
  231.     for p in proto_processes:
  232.         p.start()
  233.  
  234.     # Memc workers: read packages from memc_queue and send them to Memcache servers
  235.     memc_client = MemcClient()
  236.     memc_workers = []
  237.     for w in range(0, opts.workers):
  238.         logging.info("Starting memc worker %s" % w)  # R: L35
  239.         memc_worker = MemcWorker(memc_client, memc_queue, result_queue)
  240.         memc_worker.start()
  241.         memc_workers.append(memc_worker)
  242.  
  243.     # Waiting until all workers finish their tasks
  244.     while not work_queue.empty() or not memc_queue.empty():
  245.         logging.info("Work queue: {}, memc queue: {}, result queue: {}".format(work_queue.qsize(), memc_queue.qsize(), result_queue.unfinished_tasks))  # R: L35
  246.         time.sleep(10)
  247.  
  248.     # Closing MemcWorkers. Protobuf process and LineWorkers are closed by themselves
  249.     for memc_worker in memc_workers:
  250.         memc_worker.disable()
  251.         logging.info("MemcWorker {} stopped".format(memc_worker))  # R: L35
  252.  
  253.     # Calculating stats
  254.     while not result_queue.empty():
  255.         result_worker = result_queue.get()
  256.         processed += result_worker['processed']
  257.         errors += result_worker['errors']
  258.     logging.info("Total processed={} errors={}".format(processed, errors))  # R: L35
  259.     err_rate = float(errors) / processed
  260.     if err_rate < NORMAL_ERR_RATE:
  261.         logging.info("Acceptable error rate ({:.5f}). Successfull load".format(err_rate))  # R: L35, L115
  262.     else:
  263.         logging.error("High error rate ({:.5f} > {:.5f}). Failed load".format(err_rate, NORMAL_ERR_RATE))  # R: L35, L115
  264.  
  265.     for fn in glob.iglob(options.pattern):
  266.         dot_rename(fn)
  267.  
  268.     return True
  269.  
  270.  
  271. if __name__ == '__main__':
  272.     op = OptionParser()
  273.     op.add_option("--workers", action="store", default=1)
  274.     op.add_option("-t", "--test", action="store_true", default=False)
  275.     op.add_option("-l", "--log", action="store", default=None)
  276.     op.add_option("--dry", action="store_true", default=False)
  277.     op.add_option("--pattern", action="store", default="/data/appsinstalled/*.tsv.gz")
  278.     op.add_option("--idfa", action="store", default="35.226.182.234:11211")
  279.     op.add_option("--gaid", action="store", default="35.232.4.163:11211")
  280.     op.add_option("--adid", action="store", default="35.226.182.234:11211")
  281.     op.add_option("--dvid", action="store", default="35.232.4.163:11211")
  282.     (opts, args) = op.parse_args()
  283.     logging.basicConfig(filename=opts.log, level=logging.INFO if not opts.dry else logging.DEBUG,
  284.                         format='[%(asctime)s] %(levelname).1s %(message)s', datefmt='%Y.%m.%d %H:%M:%S')  # R: L115
  285.     if opts.test:
  286.         prototest()
  287.         sys.exit(0)
  288.  
  289.     logging.info("Starting Memc loader with options: %s" % opts)  # R: L35
  290.     start_time = time.time()
  291.     try:
  292.         opts.workers = int(opts.workers)
  293.         main(opts)
  294.     except Exception as e:
  295.         logging.exception("Unexpected error: %s" % e)  # R: L35
  296.         sys.exit(1)
  297.     finally:
  298.         elapsed_time = time.time() - start_time
  299.         logging.info("Time elapsed: %s sec" % elapsed_time)  # R: L35
  300.         # R: Если это второй питон, скобки не нужны. Если третий, то не нужен пробел перед скобкой
  301.         print ("Work finished")
  302.         sys.exit(0)
  303.  
  304. '''
  305. Намешаны двойные и одинарные кавычки, стоит придерживаться единого стиля
  306. Форматировать строки при передаче в логгинг не нужно, достаточно оставить %s (преобразование str) или %r (преобразование repr),
  307. а аргументы для форматирования передавать следом через запятую
  308. Намешано использование процентного форматирования и .format - Тоже нужно к одному стилю
  309.  
  310. На строке 230 создаётся 2 процесса. Если уж хардкодить, стоит в начало вынести константу с говорящим названием, иначе остаётся только догадываться что за range(2)
  311. '''
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement