Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # __author__ minion
- import datetime
- import multiprocessing
- from sqlalchemy import create_engine
- from pymongo import MongoClient
- from adcuratio.settings import DATABASES
- from gevent import queue
- from collections import defaultdict
- from sqlalchemy.pool import NullPool
- import os
- import csv
- from engine.constants import EXPERIAN_COLUMN_LIST as column_list
- from engine.helper.etl_helper import *
- from engine.helper.debug import *
- USERNAME = DATABASES['default']['USER']
- PASSWORD = DATABASES['default']['PASSWORD']
- HOST = DATABASES['default']['HOST']
- PORT = DATABASES['default']['PORT']
- segment_heirarchy_map = {}
- trade_meta_data_map = {}
- def get_active_CMtrades_meta(processing_date):
- active_trade_set = set()
- adspot_set = set()
- trade_meta_data_map_temp = {}
- active_cmt_trade = CustomizedMessagingTrade.objects.filter(status=1)
- for cmtTrade in active_cmt_trade:
- campaign_adspot_per_cmt_trade = cmtTrade.campaign_adspot.all()
- for adspot in campaign_adspot_per_cmt_trade:
- if adspot.start_timestamp == processing_date:
- active_trade_set.add(cmtTrade)
- adspot_set.add(adspot)
- trade_meta_data_map[str(cmtTrade.id)] = {"adspot": list(adspot_set)}
- return list(active_trade_list), trade_meta_data_map_temp
- def get_trade_meta_data(processing_date):
- active_trade_list, trade_meta_data_map_temp = get_active_CMtrades_meta(
- processing_date)
- segment_to_bvector_map = {}
- for trade in active_CMtrades:
- group_list = trade.segment.group.all()
- json_map = []
- bit_vector = bitarray('0' * 41)
- for group_list_item in group_list:
- json_map.append(aggregate_all_filters(
- group_list_item.group.filter_json))
- bit_vector |= group_adc(group_list_item.id)
- # calculating adspots may need some modification
- segment_to_bvector_map[segment.id] = bit_vector.to01()
- wanted_ad_spots = TradeAdspots.objects.filter(
- trade_ad_id__trade=trade).values_list('pk', flat=True)
- ad_id = trade.ad_id
- trade_meta_data_map_temp[str(trade.id)].update(
- {"ad_id": trade.ad_id, "bvector": bit_vector.to01()})
- # print trade_meta_data_map_temp
- return trade_meta_data_map_temp, segment_to_bvector_map
- def get_segment_group_data(company=None, brand=None, sub_brand=None):
- data = {}
- if company:
- segments = Segment.objects.filter(company=company)
- elif brand:
- segments = Segment.objects.filter(brand=brand)
- elif sub_brand:
- segments = Segment.objects.filter(sub_brand=sub_brand)
- for segment in segments:
- data[str(segment)] = {}
- wanted_vector = bitarray('0' * 41)
- wanted_json_map = []
- for wanted_group in segment.groups.all():
- wanted_json_map.append(
- aggregate_all_filters(wanted_group.filter_json))
- wanted_vector |= group_adc(wanted_group.id)
- data[str(segment)] = {"wanted_vector": wanted_vector,
- "wanted_json_map": wanted_json_map}
- return data
- def get_segment_heirarchy():
- trades = CustomizedMessagingTrade.objects.filter(status=1)
- for trade in trades:
- adv_company = trade.adv_company
- if str(adv_company) not in segment_heirarchy_map:
- segment_heirarchy_map[str(adv_company)] = {}
- # Get all segment data
- segment_heirarchy_map[str(adv_company)][
- "group_data"] = get_segment_group_data(company=adv_company)
- for brand in adv_company.brand_set.filter(parent=None):
- if str(brand) not in segment_heirarchy_map[str(adv_company)]:
- segment_heirarchy_map[str(adv_company)][str(brand)] = {}
- # Get all segment data
- segment_heirarchy_map[str(adv_company)][str(brand)][
- "group_data"] = get_segment_group_data(brand=brand)
- for sub_brand in brand.sub_brand.all():
- if str(sub_brand) not in segment_heirarchy_map[str(adv_company)][str(brand)]:
- segment_heirarchy_map[str(adv_company)][str(brand)][
- str(sub_brand)] = {}
- # Get all segment data
- segment_heirarchy_map[str(adv_company)][str(brand)][str(sub_brand)][
- "group_data"] = get_segment_group_data(sub_brand=sub_brand)
- def generate_stb_parallel_gevent(processing_start_date, processing_trades, database):
- from datetime import datetime as dt
- from multiprocessing import Pool
- print "Starting execution", dt.now()
- # Get from active CMtrades
- trade_meta_data_map = get_trade_meta_data(
- processing_trades, processing_start_date)
- stb_map = map_stb_ids_to_trade_id()
- pools = Pool(POOL_SIZE)
- process_db_size = BATCH_SIZE
- data = []
- for i in range(DEMOGRAPHICS_DB_SIZE / BATCH_SIZE):
- data.append({"limit": process_db_size, "skip": process_db_size * i,
- "trade_meta_data_map": trade_meta_data_map, "database": database, "stb_map": stb_map})
- data.append({"limit": process_db_size, "skip": process_db_size * (DEMOGRAPHICS_DB_SIZE / BATCH_SIZE + 1),
- "trade_meta_data_map": trade_meta_data_map, "database": database, "stb_map": stb_map})
- start_time = dt.now()
- pools.map(stb_generator_process, data)
- pools.close()
- print dt.now() - start_time, "Time taken"
- def stb_generator_process(data):
- try:
- import gevent
- from gevent.queue import JoinableQueue
- from gevent import monkey
- monkey.patch_all()
- database = data["database"]
- client = MongoClient(connect=False)
- db = client[database]
- num_worker_threads = NUM_WORKER_THREADS
- joinable_queue = JoinableQueue()
- trade_meta_data_map = data["trade_meta_data_map"]
- stb_map = data["stb_map"]
- # print data["trade_meta_data_map"]
- limit = data["limit"]
- skip = data["skip"]
- # TODO: Break single split into multiple producer greenlets
- final_query = "SELECT * from experian where id >= " + \
- str(skip) + " and id < " + str(skip + limit)
- print multiprocessing.current_process(), final_query
- monkey_engine = create_engine('postgresql://%s:%s@%s:%s/%s' %
- (USERNAME, PASSWORD, HOST, PORT, DB_NAME), poolclass=NullPool)
- monkey_connection = monkey_engine.connect()
- con_obj = monkey_connection.execute(final_query)
- for i in range(num_worker_threads):
- gevent.spawn(process_stb_thread_gevent, i,
- trade_meta_data_map, stb_map, joinable_queue, db)
- for i in range(limit):
- # if i%100 == 0:
- # sleep(5)
- try:
- stb_tuple = con_obj.cursor.next()
- joinable_queue.put_nowait(stb_tuple)
- except:
- pass
- monkey_connection.close()
- joinable_queue.join()
- # print "Sleeping"
- # gevent.sleep(10)
- except:
- import traceback
- print traceback.format_exc()
- def process_stb_thread_gevent(greenlet_id, trade_meta_data_map, stb_map, joinable_queue, db):
- while True:
- try:
- stb = joinable_queue.get(timeout=2)
- stb_obj = dict(zip(column_list, stb))
- # TODO: Options work here
- joinable_queue.task_done()
- except queue.Empty:
- print "Greenlet exiting queue empty"
- break
- except:
- import traceback
- print traceback.format_exc()
- break
Add Comment
Please, Sign In to add comment