Guest User

Untitled

a guest
Sep 15th, 2016
40
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.67 KB | None | 0 0
  1. # __author__ minion
  2.  
  3. import datetime
  4. import multiprocessing
  5. from sqlalchemy import create_engine
  6.  
  7. from pymongo import MongoClient
  8. from adcuratio.settings import DATABASES
  9.  
  10.  
  11. from gevent import queue
  12. from collections import defaultdict
  13. from sqlalchemy.pool import NullPool
  14. import os
  15. import csv
  16. from engine.constants import EXPERIAN_COLUMN_LIST as column_list
  17. from engine.helper.etl_helper import *
  18. from engine.helper.debug import *
  19. USERNAME = DATABASES['default']['USER']
  20. PASSWORD = DATABASES['default']['PASSWORD']
  21. HOST = DATABASES['default']['HOST']
  22. PORT = DATABASES['default']['PORT']
  23.  
  24. segment_heirarchy_map = {}
  25. trade_meta_data_map = {}
  26.  
  27.  
  28. def get_active_CMtrades_meta(processing_date):
  29.     active_trade_set = set()
  30.     adspot_set = set()
  31.     trade_meta_data_map_temp = {}
  32.     active_cmt_trade = CustomizedMessagingTrade.objects.filter(status=1)
  33.     for cmtTrade in active_cmt_trade:
  34.         campaign_adspot_per_cmt_trade = cmtTrade.campaign_adspot.all()
  35.         for adspot in campaign_adspot_per_cmt_trade:
  36.             if adspot.start_timestamp == processing_date:
  37.                 active_trade_set.add(cmtTrade)
  38.                 adspot_set.add(adspot)
  39.         trade_meta_data_map[str(cmtTrade.id)] = {"adspot": list(adspot_set)}
  40.     return list(active_trade_list), trade_meta_data_map_temp
  41.  
  42.  
  43. def get_trade_meta_data(processing_date):
  44.     active_trade_list, trade_meta_data_map_temp = get_active_CMtrades_meta(
  45.         processing_date)
  46.     segment_to_bvector_map = {}
  47.     for trade in active_CMtrades:
  48.         group_list = trade.segment.group.all()
  49.         json_map = []
  50.         bit_vector = bitarray('0' * 41)
  51.         for group_list_item in group_list:
  52.             json_map.append(aggregate_all_filters(
  53.                 group_list_item.group.filter_json))
  54.             bit_vector |= group_adc(group_list_item.id)
  55.         # calculating adspots may need some modification
  56.         segment_to_bvector_map[segment.id] = bit_vector.to01()
  57.         wanted_ad_spots = TradeAdspots.objects.filter(
  58.             trade_ad_id__trade=trade).values_list('pk', flat=True)
  59.         ad_id = trade.ad_id
  60.         trade_meta_data_map_temp[str(trade.id)].update(
  61.             {"ad_id": trade.ad_id, "bvector": bit_vector.to01()})
  62.        # print trade_meta_data_map_temp
  63.     return trade_meta_data_map_temp, segment_to_bvector_map
  64.  
  65.  
  66. def get_segment_group_data(company=None, brand=None, sub_brand=None):
  67.     data = {}
  68.     if company:
  69.         segments = Segment.objects.filter(company=company)
  70.     elif brand:
  71.         segments = Segment.objects.filter(brand=brand)
  72.     elif sub_brand:
  73.         segments = Segment.objects.filter(sub_brand=sub_brand)
  74.     for segment in segments:
  75.         data[str(segment)] = {}
  76.         wanted_vector = bitarray('0' * 41)
  77.         wanted_json_map = []
  78.         for wanted_group in segment.groups.all():
  79.             wanted_json_map.append(
  80.                 aggregate_all_filters(wanted_group.filter_json))
  81.             wanted_vector |= group_adc(wanted_group.id)
  82.         data[str(segment)] = {"wanted_vector": wanted_vector,
  83.                               "wanted_json_map": wanted_json_map}
  84.     return data
  85.  
  86.  
  87. def get_segment_heirarchy():
  88.     trades = CustomizedMessagingTrade.objects.filter(status=1)
  89.     for trade in trades:
  90.         adv_company = trade.adv_company
  91.         if str(adv_company) not in segment_heirarchy_map:
  92.             segment_heirarchy_map[str(adv_company)] = {}
  93.             # Get all segment data
  94.             segment_heirarchy_map[str(adv_company)][
  95.                 "group_data"] = get_segment_group_data(company=adv_company)
  96.         for brand in adv_company.brand_set.filter(parent=None):
  97.             if str(brand) not in segment_heirarchy_map[str(adv_company)]:
  98.                 segment_heirarchy_map[str(adv_company)][str(brand)] = {}
  99.                 # Get all segment data
  100.                 segment_heirarchy_map[str(adv_company)][str(brand)][
  101.                     "group_data"] = get_segment_group_data(brand=brand)
  102.  
  103.             for sub_brand in brand.sub_brand.all():
  104.                 if str(sub_brand) not in segment_heirarchy_map[str(adv_company)][str(brand)]:
  105.                     segment_heirarchy_map[str(adv_company)][str(brand)][
  106.                         str(sub_brand)] = {}
  107.                     # Get all segment data
  108.                     segment_heirarchy_map[str(adv_company)][str(brand)][str(sub_brand)][
  109.                         "group_data"] = get_segment_group_data(sub_brand=sub_brand)
  110.  
  111.  
  112. def generate_stb_parallel_gevent(processing_start_date, processing_trades, database):
  113.  
  114.     from datetime import datetime as dt
  115.     from multiprocessing import Pool
  116.  
  117.     print "Starting execution", dt.now()
  118.  
  119.     # Get from active CMtrades
  120.     trade_meta_data_map = get_trade_meta_data(
  121.         processing_trades, processing_start_date)
  122.     stb_map = map_stb_ids_to_trade_id()
  123.     pools = Pool(POOL_SIZE)
  124.  
  125.     process_db_size = BATCH_SIZE
  126.     data = []
  127.     for i in range(DEMOGRAPHICS_DB_SIZE / BATCH_SIZE):
  128.         data.append({"limit": process_db_size, "skip": process_db_size * i,
  129.                      "trade_meta_data_map": trade_meta_data_map, "database": database, "stb_map": stb_map})
  130.     data.append({"limit": process_db_size, "skip": process_db_size * (DEMOGRAPHICS_DB_SIZE / BATCH_SIZE + 1),
  131.                  "trade_meta_data_map": trade_meta_data_map, "database": database, "stb_map": stb_map})
  132.     start_time = dt.now()
  133.     pools.map(stb_generator_process, data)
  134.     pools.close()
  135.     print dt.now() - start_time, "Time taken"
  136.  
  137.  
  138. def stb_generator_process(data):
  139.     try:
  140.         import gevent
  141.         from gevent.queue import JoinableQueue
  142.         from gevent import monkey
  143.         monkey.patch_all()
  144.         database = data["database"]
  145.         client = MongoClient(connect=False)
  146.         db = client[database]
  147.         num_worker_threads = NUM_WORKER_THREADS
  148.         joinable_queue = JoinableQueue()
  149.  
  150.         trade_meta_data_map = data["trade_meta_data_map"]
  151.         stb_map = data["stb_map"]
  152.         # print data["trade_meta_data_map"]
  153.         limit = data["limit"]
  154.         skip = data["skip"]
  155.         # TODO: Break single split into multiple producer greenlets
  156.         final_query = "SELECT * from experian where id >= " + \
  157.             str(skip) + " and id < " + str(skip + limit)
  158.         print multiprocessing.current_process(), final_query
  159.         monkey_engine = create_engine('postgresql://%s:%s@%s:%s/%s' %
  160.                                       (USERNAME, PASSWORD, HOST, PORT, DB_NAME), poolclass=NullPool)
  161.         monkey_connection = monkey_engine.connect()
  162.         con_obj = monkey_connection.execute(final_query)
  163.         for i in range(num_worker_threads):
  164.             gevent.spawn(process_stb_thread_gevent, i,
  165.                          trade_meta_data_map, stb_map, joinable_queue, db)
  166.         for i in range(limit):
  167.             # if i%100 == 0:
  168.             #     sleep(5)
  169.             try:
  170.                 stb_tuple = con_obj.cursor.next()
  171.                 joinable_queue.put_nowait(stb_tuple)
  172.             except:
  173.                 pass
  174.         monkey_connection.close()
  175.         joinable_queue.join()
  176.         # print "Sleeping"
  177.         # gevent.sleep(10)
  178.     except:
  179.         import traceback
  180.         print traceback.format_exc()
  181.  
  182.  
  183. def process_stb_thread_gevent(greenlet_id, trade_meta_data_map, stb_map, joinable_queue, db):
  184.     while True:
  185.         try:
  186.             stb = joinable_queue.get(timeout=2)
  187.             stb_obj = dict(zip(column_list, stb))
  188.             # TODO: Options work here
  189.             joinable_queue.task_done()
  190.         except queue.Empty:
  191.             print "Greenlet exiting queue empty"
  192.             break
  193.         except:
  194.             import traceback
  195.             print traceback.format_exc()
  196.             break
Add Comment
Please, Sign In to add comment