Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #TODO Реализовать обвноление данных с использованием Ваниного скрипта автоматом.
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- import os
- import pandas as pd
- import psycopg2
- import requests
- import time
- import pickle
- import csv
- import logging
- from zipfile import ZipFile
- from urllib.request import urlopen
- from datetime import datetime, timedelta
- from binascii import b2a_base64
- from hashlib import sha1
- from hmac import new
- from io import BytesIO
- from urllib.parse import quote
- from sqlalchemy import create_engine
- from time import gmtime, strftime
- #SETTINGS
- path_to_paid_users_folder = r'/Users/a.eryomin/PycharmProjects/DataDownloader/onelink_paid_users'
- onelink_pure_data = r'/Users/a.eryomin/PycharmProjects/DataDownloader/onelink_pure_data'
- # #Variables for Postgresql settings
- PG = {
- 'host': 'youlahdp-dev-statdb1.devmail.ru',
- 'user': 'stat',
- 'password': 'stat',
- 'database': 'statdb',
- }
- db_connection = psycopg2.connect(dbname=PG['database'], user=PG['user'], password=PG['password'], host=PG['host'])
- # #Query for gettinbg data from Postgresql
- # pg_query = '''
- # SELECT
- # y_users.user_id,
- # lower(y_users.user_adv_id) AS adv_user_id,
- # y_users.date_registered,
- # y_users.date_authorized,
- # y_users.push_token_current
- # FROM aeryomin.y_users AS y_users
- #
- # JOIN
- # (SELECT sender_id, press_chat_button, press_call_button, product_id FROM tmp_tables.y_user_user_counter)
- # AS y_usr_usr
- # ON y_users.user_id = y_usr_usr.sender_id
- # WHERE
- # date_trunc('day', TO_TIMESTAMP(y_users.date_registered)) >= date_trunc('day', now()) - INTERVAL '7 day'
- # AND (y_usr_usr.press_call_button IS NOT NULL
- # OR y_usr_usr.press_chat_button IS NOT NULL
- # OR y_usr_usr.product_id IS NOT NULL)
- # AND (y_users.platform = 'android'
- # OR y_users.platform = 'ios')
- # ;
- # '''
- #
- # updated_query = """`
- # SELECT
- # y_users.user_id,
- # lower(y_users.user_adv_id) AS adv_user_id,
- # y_users.date_registered,
- # y_users.date_authorized,
- # y_users.push_token_current
- #
- # FROM aeryomin.y_users AS y_users
- # WHERE
- # date_trunc('day', TO_TIMESTAMP(y_users.date_registered)) >= date_trunc('day', now()) - INTERVAL '7 day'
- # AND y_users.user_id NOT IN (SELECT sender_id FROM aeryomin.y_user_user_counter)
- # AND (y_users.platform = 'android'
- # OR y_users.platform = 'ios')
- # ;
- # """
- #
- # paid_user_id.to_sql('aeryomin.from_1link', engine)
- # cur = db_connection.cursor()
- # cur.execute(updated_query)
- # y_users_df = pd.DataFrame(cur.fetchall())
- def one_link_downloader():
- logging.info("One Link downloader has been started.")
- projects = {
- '100260': 'ios',
- '101318': 'android'
- }
- now = str(datetime.now().date())
- dates = [str((datetime.now() - timedelta(days=i)).date()) for i in range(1, 10)]
- result_projects = []
- getliststart_idfa_idAd_customEvent_1_8_9 = "http://1l-api.mail.ru/v1/profile/getliststart.json?" + \
- "idProject={idProject}&" + \
- "fields%5B%5D=idfa&" + \
- "fields%5B%5D=tsLastCustomEventRepeatable2&" + \
- "fields%5B%5D=tsLastCustomEventRepeatable13&" + \
- "fields%5B%5D=tsLastCustomEventRepeatable15&" + \
- "fields%5B%5D=idAd&" + \
- "fields%5B%5D=idUser&" + \
- "fields%5B%5D=tsReg&" + \
- "filters%5BtsReg%5D%5Bbetween%5D%5Bstart%5D={date1}&" + \
- "filters%5BtsReg%5D%5Bbetween%5D%5Bend%5D={date2}"
- getliststart = 'http://1l-api.mail.ru/v1/profile/getliststart.json?idProject={idProject}&fields=*'
- getliststatus = "http://1l-api.mail.ru/v1/profile/getliststatus.json?uidReport={uidReport}"
- get_campaign_name = 'http://1l-api.mail.ru/v1/campaign/gettitle.json?id={id}'
- def get_info(url, params={}):
- def get_signature(access_id, private_key, url, method='GET', post_data=None):
- method = method.upper()
- # post_data must be in URL-encoded query string format
- data = post_data if post_data else ''
- string = '%s&%s&%s' % (
- method,
- quote(url, safe='~'),
- quote(data, safe='~')
- )
- key_bytes = bytes(private_key, 'latin-1')
- signature = b2a_base64(
- new(key_bytes, string.encode(), sha1).digest()
- ).decode().rstrip('\n')
- return 'AuthHMAC %s:%s' % (access_id, signature)
- url = url.format(**params)
- sign = get_signature('a.eryomin', 'y;+DmDRwf64Yb}2bnYZ', url)
- return requests.get(url, headers={'Authorization': sign}).json()
- def get_report(link):
- url = urlopen(link)
- print(type(url))
- print(url)
- zipfile = ZipFile(BytesIO(url.read()))
- f = open(list(zipfile.NameToInfo.keys())[0], 'wb')
- f.write(zipfile.open(list(zipfile.NameToInfo.keys())[0]).read())
- f.close()
- return pd.read_csv(list(zipfile.NameToInfo.keys())[0], sep=';')
- now = str(datetime.now().date())
- dates = [str((datetime.now() - timedelta(days=i)).date()) for i in range(1, 8)]
- result_projects = []
- for project_id in projects:
- print(project_id, dates)
- print(get_info(getliststart_idfa_idAd_customEvent_1_8_9,
- {'date1': min(dates), 'date2': max(dates), 'idProject': project_id}))
- result_projects.append(
- get_info(getliststart_idfa_idAd_customEvent_1_8_9,
- {'date1': min(dates), 'date2': max(dates), 'idProject': project_id})['data']['uidReport']
- )
- links = []
- for project_id in result_projects:
- link = get_info(getliststatus, {'uidReport': project_id})['data'].get('link')
- if link is not None:
- links.append(link)
- while True:
- links = []
- for project_id in result_projects:
- link = get_info(getliststatus, {'uidReport': project_id})['data'].get('link')
- if link is not None:
- links.append(link)
- if len(links) == len(result_projects):
- break
- time.sleep(6 * 10)
- print('-')
- dfs = []
- for link, params in zip(links, list(projects.values())):
- df = get_report(link)
- df.columns = ['idfa', 'goal1', 'goal8', 'goal9', 'campaign', 'id_user', 'date_reg']
- df['platform'] = [params] * df.shape[0]
- df['date_check'] = [now] * df.shape[0]
- dfs.append(df)
- result = pd.concat(dfs)
- result['campaign'] = result['campaign'].astype(str)
- need_campaign = set(map(lambda x: x, result['campaign']))
- campaign = pickle.load(open('campaign.pckl', 'rb'))
- for i in need_campaign:
- if i not in set(campaign.keys()):
- resp = get_info(get_campaign_name, {'id': i})['data']
- if type(resp) == list and len(resp) == 1:
- campaign = dict({i: resp[0]}, **campaign)
- else:
- campaign = dict(resp, **campaign)
- pickle.dump(campaign, open('campaign.pckl', 'wb'), protocol=0)
- df_campaign = pd.DataFrame(list(campaign.items()))
- df_campaign.columns = ['campaign_id', 'campaign_name']
- result = pd.merge(result, df_campaign, how='outer', left_on='campaign', right_on='campaign_id')
- result['campaign_name'] = result['campaign_name'].map(lambda x: x.replace('\t', ' '))
- result['goal1'] = pd.to_datetime(result['goal1'] * 1000000000)
- result['goal8'] = pd.to_datetime(result['goal8'] * 1000000000)
- result['goal9'] = pd.to_datetime(result['goal9'] * 1000000000)
- result['date_reg'] = pd.to_datetime(result['date_reg'] * 1000000000)
- result['date_reg'] = result['date_reg'].map(lambda x: str(x.date()))
- result = result[
- ['idfa', 'goal1', 'goal8', 'goal9', 'campaign', 'id_user',
- 'platform', 'date_reg', 'date_check', 'campaign_id',
- 'campaign_name']
- ]
- result.to_csv(os.path.join(onelink_pure_data, 'onelink_pure_data_{}.csv'.format(now)), encoding='utf-8', sep='\t', index=False, header=False)
- def get_only_paid_traffic():
- path_to_csv = os.path.join(onelink_pure_data, 'onelink_pure_data_{}.csv'.format(str(datetime.now().date())))
- date_from_file = pd.read_csv(path_to_csv, sep='\t')
- df = pd.DataFrame(date_from_file)
- paid_df = df[df.iloc[:, 10] != 'Organic: Empty tracking id']
- paid_user_id = paid_df.iloc[:, 0]
- #Rename old csv file
- os.rename(os.path.join(path_to_paid_users_folder, 'paid_users.csv'),
- os.path.join(path_to_paid_users_folder, 'paid_users_{0}.csv'.format(strftime("%Y-%m-%d", gmtime()))))
- #Store data into paid_users.csv
- paid_user_id.to_csv(os.path.join(path_to_paid_users_folder, 'paid_users.csv'), header=False, index=False)
- def add_one_link_to_pg():
- #Drop table in PG
- drop_query = '''
- DROP TABLE aeryomin.onelink_paid_users;
- '''
- create_query = '''
- CREATE TABLE aeryomin.onelink_paid_users (device_id VARCHAR)
- '''
- db_connection = psycopg2.connect(dbname=PG['database'], user=PG['user'], password=PG['password'], host=PG['host'])
- drop_cursor = db_connection.cursor()
- drop_cursor.execute(drop_query)
- db_connection.commit()
- create_cursor = db_connection.cursor()
- create_cursor.execute(create_query)
- db_connection.commit()
- insert_cursor = db_connection.cursor()
- with open(os.path.join(path_to_paid_users_folder, 'paid_users.csv'), 'r') as f:
- next(f)
- insert_cursor.copy_from(f, 'aeryomin.onelink_paid_users')
- db_connection.commit()
- db_connection.close()
- def create_tmp_users_table():
- #Getting users from y_users by condition.
- pg_select_query = '''
- SELECT
- users.user_id
- date_trunc('day', TO_TIMESTAMP(users.date_registered)),
- FROM aeryomin.y_users as users
- WHERE
- upper(user_adv_id) IN (SELECT * FROM aeryomin.onelink_paid_users)
- AND date_trunc('day', TO_TIMESTAMP(users.date_registered)) >= date_trunc('day', now()) - INTERVAL '7 day'
- AND
- GROUP BY
- date_trunc('day', TO_TIMESTAMP(users.date_registered)),
- users.user_id
- ;
- '''
- cur = db_connection.cursor()
- cur.execute(pg_select_query)
- y_users_df = pd.DataFrame(cur.fetchall())
- y_users_df.to_csv('final_users_.csv', header=False, index=False)
- if __name__ == '__main__':
- #Выкачивание данных из 1линка. По результату работы будет сохранён файл .csv с выгрузкой.
- one_link_downloader()
- add_one_link_to_pg()
- get_only_paid_traffic()
- create_tmp_users_table()
Add Comment
Please, Sign In to add comment