Guest User

Untitled

a guest
Oct 16th, 2018
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 23.31 KB | None | 0 0
  1. #!/usr/bin/env python
  2. import collections
  3. import urlparse
  4. import datetime
  5. import boto3
  6. import requests
  7. import logging
  8. import sys
  9. import os
  10. import click
  11. import time
  12. import csv
  13. import cStringIO
  14. import gevent
  15. import psutil
  16. import gc
  17.  
  18. from retrying import retry
  19. from collections import namedtuple
  20. from gevent.queue import Queue
  21. from gevent.monkey import patch_all
  22.  
  23. from bi_db import bi_db_connection
  24.  
  25. from singular_api_client.singular_client import SingularClient
  26. from singular_api_client.params import (Metrics, DiscrepancyMetrics, Format,
  27. CountryCodeFormat, Dimensions,
  28. TimeBreakdown)
  29. from singular_api_client.exceptions import retry_if_unexpected_error
  30.  
  31. patch_all()
  32.  
  33. WHITE = "\033[0m"
  34. PURPLE = "\033[0;35m"
  35. RED = "\033[0;31m"
  36. GREEN = "\033[0;32m"
  37. YELLOW = "\033[0;33m"
  38. BLUE = "\033[0;34m"
  39. CYAN = "\033[0;36m"
  40. RED_UNDERLINE = "\033[4;31m"
  41. GREEN_UNDERLINE = "\033[4;32m"
  42. YELLOW_UNDERLINE = "\033[4;33m"
  43. BLUE_UNDERLINE = "\033[4;34m"
  44. PURPLE_UNDERLINE = "\033[4;35m"
  45. CYAN_UNDERLINE = "\033[4;36m"
  46. WHITE_UNDERLINE = "\033[4;37m"
  47.  
  48. FIVE_MINUTES = 60 * 5
  49. DATE_FORMAT = "%Y-%m-%d"
  50. DEFAULT_START_DATE = datetime.datetime.strftime(
  51. datetime.datetime.today() - datetime.timedelta(days=32), "%Y-%m-%d")
  52. DEFAULT_END_DATE = datetime.datetime.strftime(
  53. datetime.datetime.today() - datetime.timedelta(days=30), "%Y-%m-%d")
  54.  
  55. logger = logging.getLogger("etl_manager")
  56. logger2 = logging.getLogger("client")
  57.  
  58. formatter = logging.Formatter('\
  59. [%(process)d :: %(thread)d] - %(asctime)s - %(name)s - %(levelname)s - %(message)s')
  60. file_handler = logging.FileHandler("sync.log")
  61. file_handler.setFormatter(formatter)
  62.  
  63. error_handler = logging.FileHandler("error.log")
  64. error_handler.setFormatter(formatter)
  65. error_handler.setLevel(logging.ERROR)
  66.  
  67. ch = logging.StreamHandler(sys.stdout)
  68. ch.setLevel(logging.DEBUG)
  69. ch.setFormatter(formatter)
  70. logger.addHandler(ch)
  71.  
  72. for cur_logger in [logger, logger2]:
  73. cur_logger.setLevel(logging.DEBUG)
  74. cur_logger.addHandler(file_handler)
  75. cur_logger.addHandler(error_handler)
  76.  
  77.  
  78. DEFAULT_DIMENSIONS = [Dimensions.APP, Dimensions.SOURCE, Dimensions.OS, Dimensions.PLATFORM,
  79. Dimensions.COUNTRY_FIELD, Dimensions.ADN_SUB_CAMPAIGN_NAME,
  80. Dimensions.ADN_SUB_ADNETWORK_NAME, Dimensions.ADN_ORIGINAL_CURRENCY,
  81. Dimensions.ADN_CAMPAIGN_NAME
  82. ]
  83.  
  84. DEFAULT_PUBLISHER_DIMENSIONS = [Dimensions.KEYWORD, Dimensions.PUBLISHER_ID,
  85. Dimensions.PUBLISHER_SITE_NAME
  86. ]
  87.  
  88. DEFAULT_CREATIVE_DIMENSIONS = [Dimensions.ADN_CREATIVE_NAME, Dimensions.ADN_CREATIVE_ID,
  89. Dimensions.SINGULAR_CREATIVE_ID, Dimensions.CREATIVE_HASH,
  90. Dimensions.CREATIVE_IMAGE_HASH, Dimensions.CREATIVE_IMAGE,
  91. Dimensions.CREATIVE_WIDTH, Dimensions.CREATIVE_HEIGHT,
  92. Dimensions.CREATIVE_IS_VIDEO, Dimensions.CREATIVE_TEXT,
  93. Dimensions.CREATIVE_URL, Dimensions.KEYWORD
  94. ]
  95.  
  96. DEFAULT_METRICS = [Metrics.ADN_COST, Metrics.ADN_IMPRESSIONS, Metrics.CUSTOM_CLICKS,
  97. Metrics.CUSTOM_INSTALLS, Metrics.ADN_ORIGINAL_COST
  98. ]
  99.  
  100. DEFAULT_DISCREPANCY_METRICS = [DiscrepancyMetrics.ADN_CLICKS, DiscrepancyMetrics.ADN_INSTALLS
  101. ]
  102.  
  103. DEFAULT_COHORT_METRICS = ['revenue'] # create new table w/ addl cohort fields
  104. DEFAULT_COHORT_PERIODS = [1, 7, 14, 30]
  105.  
  106. APIKey = namedtuple("APIKey", ["org_name", "api_key", "org_id"])
  107.  
  108.  
  109. def memory_usage_psutil():
  110. # return the memory usage in MB
  111. return psutil.Process(os.getpid()).memory_info()[0] / float(2 ** 20)
  112.  
  113.  
  114. def daterange(start_date, end_date):
  115. cur_date = start_date
  116. while cur_date <= end_date:
  117. yield cur_date
  118. cur_date = cur_date + datetime.timedelta(days=1)
  119.  
  120.  
  121. def encode_date(date_obj):
  122. return date_obj.strftime(DATE_FORMAT)
  123.  
  124.  
  125. class RedshiftSync(object):
  126. MAX_REPORTS_TO_QUEUE = 1
  127. DEFAULT_UPDATE_WINDOW_DAYS = 30
  128. REFRESH_TIMEOUT = 60 * 60 * 6 # 6 HOURS
  129.  
  130. def __init__(self, s3_path, api_keys, dimensions=DEFAULT_DIMENSIONS,
  131. metrics=DEFAULT_METRICS, cohort_metrics=DEFAULT_COHORT_METRICS,
  132. cohort_periods=DEFAULT_COHORT_PERIODS, display_alignment=True,
  133. format=Format.JSON, country_code_format=CountryCodeFormat.ISO3,
  134. max_parallel_reports=MAX_REPORTS_TO_QUEUE,
  135. discrepancy_metrics=DEFAULT_DISCREPANCY_METRICS):
  136.  
  137. self.max_parallel_reports = max_parallel_reports
  138. self.api_keys = api_keys
  139. self.dimensions = dimensions
  140. self.metrics = metrics
  141. self.discrepancy_metrics = discrepancy_metrics
  142. self.cohort_metrics = cohort_metrics
  143. self.cohort_periods = cohort_periods
  144. self.display_alignment = display_alignment
  145. self.format = format
  146. self.country_code_format = country_code_format
  147. self.exchange_rate = {}
  148.  
  149. self.columns = ["start_date", "org_id", "org_name", "inserted_at", "org_partner_cost",
  150. "organization_cost", "uan_cost"] + \
  151. self.dimensions + self.metrics + self.discrepancy_metrics
  152. for period in self.cohort_periods:
  153. self.columns += ["{}_{}".format(cohort, period) for cohort in self.cohort_metrics]
  154. self.columns += ["{}_{}_{}".format(cohort, period, "original"
  155. ) for cohort in self.cohort_metrics]
  156.  
  157. parsed_s3_path = urlparse.urlparse(s3_path)
  158. self.bucket = parsed_s3_path.netloc
  159. self.s3_prefix = parsed_s3_path.path
  160. self.s3_path = s3_path
  161.  
  162. def download_to_s3(self, start_date, end_date):
  163. days = list(daterange(start_date, end_date))
  164. logger.info("Processing {} days of data".format(len(days)))
  165.  
  166. for day in days:
  167.  
  168. logger.info("Adding %d reports to the queue" % len(self.api_keys))
  169. tasks = Queue()
  170. for x in self.api_keys:
  171. if x.api_key:
  172. tasks.put_nowait((x, day.strftime(DATE_FORMAT)))
  173.  
  174. # This statement will not continue until all workers are done/cancelled/killed
  175. gevent.joinall([gevent.spawn(self.run_async_report_worker, i, tasks)
  176. for i in xrange(self.max_parallel_reports)])
  177.  
  178. logger.info("Finished all queries for {}".format(day.strftime(DATE_FORMAT)))
  179.  
  180. def run_async_report_worker(self, worker_id, tasks):
  181. while not tasks.empty():
  182. self.run_async_report(*tasks.get())
  183.  
  184. def run_async_report(self, api_key, date):
  185. try:
  186. logger.info("Pulling data for {}{}{}, date={}".format(
  187. YELLOW_UNDERLINE, api_key.org_name, WHITE, date))
  188. # time.sleep(3)
  189. self.run_async_report_inner(api_key, date)
  190. except Exception as e:
  191. logger.error("Exception while pulling data for {}, date={}: {}".format(
  192. api_key.org_name, date, e))
  193.  
  194. @retry(wait_exponential_multiplier=1000, wait_exponential_max=60000,
  195. retry_on_exception=retry_if_unexpected_error, stop_max_attempt_number=5)
  196. def run_async_report_inner(self, api_key, date):
  197. """
  198. Enqueue and Poll one report from Singular API (using the async endpoint)
  199. Note: This method doesn't have any return value and should raise an
  200. Exception in case of failure
  201.  
  202. :param api_key: org api_key
  203. :param date: requested date formatted in "%Y-%m-%d"
  204. """
  205. client = SingularClient(api_key.api_key)
  206.  
  207. client.BASE_API_URL = ""
  208.  
  209. report_id = client.create_async_report(
  210. start_date=date,
  211. end_date=date,
  212. format=self.format,
  213. dimensions=self.dimensions,
  214. metrics=self.metrics,
  215. discrepancy_metrics=self.discrepancy_metrics,
  216. cohort_metrics=self.cohort_metrics,
  217. cohort_periods=[str(x) for x in self.cohort_periods],
  218. time_breakdown=TimeBreakdown.DAY,
  219. country_code_format=self.country_code_format,
  220. display_alignment=self.display_alignment
  221. )
  222.  
  223. while True:
  224. report_status = client.get_report_status(report_id)
  225. if report_status.status not in [report_status.QUEUED, report_status.STARTED]:
  226. break
  227. time.sleep(10)
  228. if report_status.status != report_status.DONE:
  229. logger.error("async report failed -- org_name = %s, date = %s: %s" %
  230. (api_key.org_name, date, repr(report_status)))
  231. return
  232.  
  233. # Process the new report
  234. logger.info("async report finished successfully -- org_name = %s%s%s, date = %s: %s" %
  235. (GREEN_UNDERLINE, api_key.org_name, WHITE, date, repr(report_status)))
  236.  
  237. self.handle_report_data(api_key, date, report_status.download_url, report_id)
  238.  
  239. def handle_report_data(self, api_key, date, download_url, report_id):
  240.  
  241. # TODO: To conserve more memory, move from downloading everything to streaming...
  242. # Download result
  243. res = requests.get(download_url)
  244. if not res.ok:
  245. logger.error("unexpected error when downloading org_name = %s, report_id=%s, url=%s" %
  246. (api_key.org_name, report_id, download_url))
  247. return
  248.  
  249. membuf = cStringIO.StringIO()
  250. result_data = res.json()['results']
  251.  
  252. # Our API returns the CSV output with a dictionary in the cohorted fields,
  253. # so we have to download and transform
  254.  
  255. result_data = self.convert_currencies(result_data)
  256. result_data = self.remove_nas(result_data)
  257. if len(result_data) > 0:
  258. for row in result_data:
  259. del row["end_date"]
  260. row.update({
  261. "org_id": api_key.org_id,
  262. "org_name": api_key.org_name,
  263. "inserted_at": datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
  264. })
  265. dw = csv.DictWriter(membuf, self.columns, restval='', doublequote=1,
  266. quoting=csv.QUOTE_ALL)
  267. dw.writeheader()
  268. dw.writerows(RedshiftSync.convert(result_data))
  269.  
  270. # Upload to S3
  271. key = os.path.join(self.s3_prefix, "date=%s" % date, api_key.org_name)
  272. if key.startswith("/"):
  273. key = key[1:]
  274. boto3.resource("s3").Bucket(self.bucket).put_object(Key=key, Body=membuf.getvalue(),
  275. ServerSideEncryption='aws:kms')
  276.  
  277. logger.info("finished uploading report data to: %s" % os.path.join(self.bucket, key))
  278. logger.info("MEMORY CONSUMPTION: {}{}{}".format(RED_UNDERLINE, memory_usage_psutil(),
  279. WHITE))
  280. gc.collect()
  281.  
  282. def remove_nas(self, data):
  283.  
  284. for row in data:
  285. cols_labeled_na = [
  286. 'creative_hash', 'creative_image_hash', 'creative_image', 'creative_text',
  287. 'creative_url', 'creative_width', 'creative_height', 'creative_is_video',
  288. 'adn_creative_name', 'singular_creative_id', 'adn_creative_id',
  289. 'adn_sub_campaign_name', 'adn_sub_adnetwork_name', 'adn_campaign_name', 'keyword',
  290. 'publisher_id', 'publisher_site_name', 'unified_campaign_name', 'country_field'
  291. ]
  292. cols_labeled_unknown = [
  293. 'unified_campaign_name'
  294. ]
  295. cols_to_fill_zero = [
  296. 'tracker_installs', 'tracker_reengagements', 'adn_impressions', 'adn_clicks',
  297. 'custom_clicks', 'custom_installs', 'adn_installs', 'revenue_1',
  298. 'revenue_1_original', 'revenue_7', 'revenue_7_original', 'revenue_14',
  299. 'revenue_14_original', 'revenue_30', 'revenue_30_original', 'retention_rate_1',
  300. 'retention_rate_7', 'retention_rate_14', 'retention_rate_30'
  301. ]
  302.  
  303. for cur_col in list(set(self.columns) & set(cols_labeled_na)):
  304. row[cur_col] = None if row.get(cur_col) == 'N/A' else row.get(cur_col)
  305. row[cur_col] = None if row.get(cur_col) == '' else row.get(cur_col)
  306.  
  307. for cur_col in list(set(self.columns) & set(cols_labeled_unknown)):
  308. row[cur_col] = None if row.get(cur_col) == 'unknown' else row.get(cur_col)
  309.  
  310. for cur_col in list(set(self.columns) & set(cols_to_fill_zero)):
  311. row[cur_col] = row.get(cur_col) if row.get(cur_col) else 0
  312.  
  313. return data
  314.  
  315. def convert_currencies(self, data):
  316. query_result = None
  317. uan_currency = None
  318. organization_currency = None
  319. currency_query = """
  320. select c1.factor, c2.code
  321. from currencies_dailycurrencyexchangerate c1
  322. left join currencies_currency c2 on c1.currency_id = c2.id
  323. where c1.date = '{}'
  324. and c2.code = '{}'
  325. """
  326. for ii, row in enumerate(data):
  327. start_date = row['start_date']
  328. row['adn_original_currency'] = row['adn_original_currency'] if (
  329. row.get('adn_original_currency') and 'N/A' not in row.get('adn_original_currency')
  330. ) else 'USD'
  331.  
  332. row['organization_currency'] = row['organization_currency'] if \
  333. row.get('organization_currency') else 'USD'
  334.  
  335. # cost transforming for Org
  336. if row['organization_currency'] != organization_currency:
  337. query_result = RedshiftSync.query_mobcnc(currency_query.format(
  338. start_date, row['organization_currency']))
  339. if isinstance(query_result, list) and (
  340. query_result[0].get('code', {}) == row['organization_currency']):
  341.  
  342. organization_currency = query_result[0]['code']
  343.  
  344. self.exchange_rate[row['organization_currency']] = float(
  345. query_result[0].get('factor', 1))
  346. else:
  347. logger.exception('organization currency not found in mobcnc')
  348.  
  349. row['organization_cost'] = 0
  350. if row['adn_cost'] > 0:
  351. row['organization_cost'] = row['adn_cost'] / self.exchange_rate[
  352. row['organization_currency']]
  353. elif row['adn_cost'] == 'N/A' or not row['adn_cost'] or row['adn_cost'] < 0:
  354. row['adn_cost'] = 0
  355. else:
  356. import ipdb
  357. ipdb.set_trace()
  358. print
  359.  
  360. if not self.exchange_rate.get(row['adn_original_currency']) and \
  361. row['adn_original_currency'] != uan_currency:
  362. query_result = RedshiftSync.query_mobcnc(currency_query.format(
  363. start_date, row['adn_original_currency']))
  364. if isinstance(query_result, list) and (
  365. query_result[0].get('code', {}) == row['adn_original_currency']):
  366. uan_currency = query_result[0]['code']
  367. self.exchange_rate[row['adn_original_currency']] = float(
  368. query_result[0].get('factor', 1))
  369. else:
  370. logger.exception('uan currency code not found in mobcnc')
  371.  
  372. row['uan_cost'] = 0
  373. if row['adn_original_cost'] > 0:
  374. row['uan_cost'] = row['adn_original_cost'] / self.exchange_rate[
  375. row['adn_original_currency']]
  376. elif row['adn_original_cost'] == 'N/A' or not \
  377. row['adn_original_cost'] or row['adn_original_cost'] < 0:
  378. row['adn_original_cost'] = 0
  379.  
  380. row['org_partner_cost'] = 0
  381. if row['organization_cost'] and row['uan_cost'] > 0:
  382. row['org_partner_cost'] = row['organization_cost'] - row['uan_cost']
  383. if row['org_partner_cost'] < 0:
  384. row['org_partner_cost'] = 0
  385.  
  386. # revenue transforming
  387. for cohort in self.cohort_metrics:
  388. for period in self.cohort_periods:
  389. if (row[cohort].get('{}d'.format(period), None) and (
  390. row['organization_currency'] != 'USD')):
  391. row["{}_{}".format(cohort, period)] = (
  392. row[cohort].get('{}d'.format(period)) /
  393. self.exchange_rate[row['organization_currency']])
  394. else:
  395. row["{}_{}".format(cohort, period)] = \
  396. row[cohort].get('{}d'.format(period), None)
  397. row["{}_{}_{}".format(cohort, period, "original")] = \
  398. row[cohort].get('{}d'.format(period), None)
  399. del row[cohort]
  400.  
  401. return data
  402.  
  403. @staticmethod
  404. def convert(data):
  405. if isinstance(data, basestring):
  406. return data.encode('utf-8')
  407. elif isinstance(data, collections.Mapping):
  408. return dict(map(RedshiftSync.convert, data.iteritems()))
  409. elif isinstance(data, collections.Iterable):
  410. return type(data)(map(RedshiftSync.convert, data))
  411. else:
  412. return data
  413.  
  414. @staticmethod
  415. def query_mobcnc(query):
  416. connection = bi_db_connection(
  417. db_host=os.getenv("BI_HOST"),
  418. db_port=int(os.getenv("BI_PORT")),
  419. db_user=os.getenv("BI_USER"),
  420. db_pass=os.getenv("BI_PASSWORD"),
  421. db_name=os.getenv("BI_NAME"))
  422. return connection.execute(query)
  423.  
  424.  
  425. @click.group()
  426. def cli():
  427. pass
  428.  
  429.  
  430. def filter_api_keys(api_keys, cherry_pick, exclude_org_name, org_name):
  431. api_keys_filtering = []
  432.  
  433. if cherry_pick:
  434. for ix, key in enumerate(api_keys):
  435. print("[{0}] - {1}".format(ix + 1, key.org_name, ))
  436. value = click.prompt('Enter all org indices you want to run \
  437. script for (comma delimited)')
  438. values = [int(idx) - 1 for idx in value.split(',')]
  439. for picked_org in values:
  440. api_keys_filtering.append(api_keys[picked_org])
  441.  
  442. else:
  443. [[api_keys_filtering.append(org)
  444. for org in api_keys if name == org.org_name] for name in org_name]
  445. [[api_keys_filtering.append(org)
  446. for org in api_keys if name != org.org_name] for name in exclude_org_name]
  447.  
  448. api_keys = api_keys_filtering if api_keys_filtering else api_keys
  449. return api_keys
  450.  
  451.  
  452. @cli.command()
  453. @click.argument("s3_bucket")
  454. @click.option('--start-date', '-s', default=DEFAULT_START_DATE, help="default is 31 days ago")
  455. @click.option('--extra-dimensions', '-d', multiple=True, help="additional dimensions to pull")
  456. @click.option('--end-date', '-e', default=DEFAULT_END_DATE, help="default is 30 days ago")
  457. @click.option('--max-parallel-reports', '-m', default=5, help="# of reports running in parallel")
  458. @click.option('--org-name', '-o', multiple=True, default=[], help="run script for specific orgs")
  459. @click.option('--exclude-org-name', '-x', multiple=True, default=[], help="ignore specific orgs")
  460. @click.option('--cherry-pick', '-c', is_flag=True, help="Pick from list of orgs with indexs")
  461. @click.option('--report-type', type=click.Choice(
  462. ['publisher', 'creative', 'generic']), default='generic')
  463. def download_to_s3(s3_bucket, start_date, end_date, extra_dimensions,
  464. max_parallel_reports, org_name, exclude_org_name, cherry_pick, report_type):
  465. query = """
  466. select o.name as org_name, ou.api_key, o.id as org_id
  467. from dashboard_organization o
  468. left join dashboard_organizationuser ou on o.id = ou.organization_id
  469. where ou.is_active = 1
  470. and o.name not like '%%_dev'
  471. and o.name not like '%%test%%'
  472. and o.name not like '%%exercise%%'
  473. and o.name not like '%%training%%'
  474. and o.name not like '%%mock%%'
  475. and o.name not like '%%demo%%'
  476. and o.name not like '%%(copy)%%'
  477. and o.account_type = 1
  478. and company_logo is not null
  479. and company_logo <> ''
  480. and min_stats_date is not null
  481. group by o.name;
  482. """
  483. # Get relevant orgs and API keys
  484. raw_api_keys = [APIKey(**kwargs) for kwargs in RedshiftSync.query_mobcnc(query)]
  485. api_keys = filter_api_keys(raw_api_keys, cherry_pick, exclude_org_name, org_name)
  486.  
  487. # Start/End date
  488. start_date = datetime.datetime.strptime(start_date, DATE_FORMAT).date()
  489. end_date = datetime.datetime.strptime(end_date, DATE_FORMAT).date()
  490.  
  491. # Define dimensions
  492. dimensions = DEFAULT_DIMENSIONS
  493. if report_type == 'publisher':
  494. dimensions += DEFAULT_PUBLISHER_DIMENSIONS
  495. elif report_type == 'creative':
  496. dimensions += DEFAULT_CREATIVE_DIMENSIONS
  497. dimensions += [x for x in extra_dimensions if x not in dimensions]
  498.  
  499. # Download data
  500. RedshiftSync(s3_bucket, api_keys, dimensions=dimensions,
  501. max_parallel_reports=max_parallel_reports) \
  502. .download_to_s3(start_date, end_date)
  503.  
  504.  
  505. @cli.command()
  506. @click.argument("s3_bucket")
  507. @click.option('--start-date', '-s', default=DEFAULT_START_DATE, help="default is 31 days ago")
  508. @click.option('--extra-dimensions', '-d', multiple=True, help="additional dimensions to pull")
  509. @click.option('--end-date', '-e', default=DEFAULT_END_DATE, help="default is 30 days ago")
  510. @click.option('--max-parallel-reports', '-m', default=5, help="# of reports running in parallel")
  511. @click.option('--report-type', type=click.Choice(
  512. ['publisher', 'creative', 'generic']), default='generic')
  513. def complete_missed_orgs(s3_bucket, start_date, end_date, extra_dimensions,
  514. max_parallel_reports, report_type):
  515. query = """
  516. select o.name as org_name, ou.api_key, o.id as org_id
  517. from dashboard_organization o
  518. left join dashboard_organizationuser ou on o.id = ou.organization_id
  519. where ou.is_active = 1
  520. and o.name not like '%%_dev'
  521. and o.name not like '%%test%%'
  522. and o.name not like '%%exercise%%'
  523. and o.name not like '%%training%%'
  524. and o.name not like '%%mock%%'
  525. and o.name not like '%%demo%%'
  526. and o.name not like '%%(copy)%%'
  527. and o.account_type = 1
  528. and company_logo is not null
  529. and company_logo <> ''
  530. and min_stats_date is not null
  531. group by o.name;
  532. """
  533. # Get relevant orgs and API keys
  534. api_keys = [APIKey(**kwargs) for kwargs in RedshiftSync.query_mobcnc(query)]
  535.  
  536. # Start/End date
  537. start_date = datetime.datetime.strptime(start_date, DATE_FORMAT).date()
  538. end_date = datetime.datetime.strptime(end_date, DATE_FORMAT).date()
  539.  
  540. # Define dimensions
  541. dimensions = DEFAULT_DIMENSIONS
  542. if report_type == 'publisher':
  543. dimensions += DEFAULT_PUBLISHER_DIMENSIONS
  544. elif report_type == 'creative':
  545. dimensions += DEFAULT_CREATIVE_DIMENSIONS
  546. dimensions += [x for x in extra_dimensions if x not in dimensions]
  547.  
  548. s3 = boto3.resource("s3")
  549. bucket = s3.Bucket(r"singular-store-30d-retention-us-west-2")
  550. for day in daterange(start_date, end_date):
  551.  
  552. s3_path = "master_api/temp/date={}".format(day.strftime(DATE_FORMAT))
  553. existing_orgs = [os.path.split(x.key)[-1]
  554. for x in bucket.objects.filter(Prefix=s3_path)]
  555. keys_for_date = [x for x in api_keys if x.org_name not in existing_orgs]
  556.  
  557. if len(keys_for_date) == 0:
  558. continue
  559.  
  560. logger.info("date={}, missing orgs: {}".format(day, keys_for_date))
  561. # Download data
  562. RedshiftSync(s3_bucket, keys_for_date, dimensions=dimensions,
  563. max_parallel_reports=max_parallel_reports) \
  564. .download_to_s3(day, day)
  565.  
  566.  
  567. if __name__ == "__main__":
  568. cli()
Add Comment
Please, Sign In to add comment