Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- from googleads import ad_manager
- from googleads import oauth2
- from google.oauth2 import service_account
- from google.cloud import bigquery
- from os import getenv
- import pymysql
- import pytz
- def handle_request(data, context):
- """Background Cloud Function to be triggered by Pub/Sub topic <fetch_line_items>."""
- # Step 1: Authenticate and load LineItemService
- oauth2_client = oauth2.GoogleServiceAccountClient(getenv('KEY_FILE'), oauth2.GetAPIScope("ad_manager"))
- ad_manager_client = ad_manager.AdManagerClient(oauth2_client, getenv('APPLICATION_NAME'), getenv('NETWORK_CODE'))
- line_item_service = ad_manager_client.GetService("LineItemService")
- # Step 2: Specify SQL config and connect
- CONNECTION_NAME = getenv('CLOUDSQL_INSTANCE_CONNECTION')
- DB_USER = getenv('CLOUDSQL_USER')
- DB_PASSWORD = getenv('CLOUDSQL_PASSWORD')
- DB_NAME = getenv('CLOUDSQL_DATABASE')
- mysql_config = {
- 'unix_socket': f'/cloudsql/{CONNECTION_NAME}',
- 'user': DB_USER,
- 'password': DB_PASSWORD,
- 'db': DB_NAME,
- 'charset': 'utf8mb4',
- 'cursorclass': pymysql.cursors.DictCursor,
- 'autocommit': True
- }
- mysql_conn = pymysql.connect(**mysql_config)
- with mysql_conn.cursor() as cursor:
- # Step 3: Query database for the latest recorded modified date time
- cursor.execute("SELECT lastModifiedDateTime FROM LineItems " +
- "ORDER BY lastModifiedDateTime DESC LIMIT 1;")
- result = cursor.fetchone()
- if result is not None:
- latestModifiedDateTime = pytz.timezone('UTC').localize(result["lastModifiedDateTime"]) + datetime.timedelta(days=-1)
- statement = (ad_manager.StatementBuilder()
- .Where('lastModifiedDateTime > :lastModifiedDateTime')
- .WithBindVariable('lastModifiedDateTime', latestModifiedDateTime))
- else:
- statement = ad_manager.StatementBuilder()
- # Step 4: Query database for the columns
- cursor.execute("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='LineItems';")
- columns = [row["COLUMN_NAME"] for row in cursor.fetchall()]
- # Step 5: Fetch recently updated LineItems and update database
- while True:
- response = line_item_service.getLineItemsByStatement(statement.ToStatement())
- print("Response: " + str(response))
- if 'results' in response and len(response['results']):
- line_items = response['results']
- cursor.execute("INSERT INTO LineItems "
- "(" + ",".join(columns) + ") "
- "VALUES " + generate_sql_values(line_items, columns) + " "
- "ON DUPLICATE KEY UPDATE"
- " " + generate_sql_columns(columns) + ";"
- )
- statement.offset += statement.limit
- else:
- break
- return {'message': 'Number of LineItems updated: ' + str(response['totalResultSetSize'])}
- def generate_sql_columns(columns):
- return ", ".join([column + " = VALUES(" + column + ")" for column in columns])
- def generate_sql_values(line_items, columns):
- sql_values = ("(" +
- "),(".join([
- ", ".join([convert_value_for_sql_query(line_item[column]) for column in columns])
- for line_item in line_items]
- ) + ")")
- return sql_values
- def convert_value_for_sql_query(ad_manager_value):
- value_type = type(ad_manager_value)
- date_keys = ['year', 'month', 'day']
- datetime_keys = ['hour', 'minute', 'second']
- if ad_manager_value is None:
- return "NULL"
- elif value_type == str:
- return "\"" + str(ad_manager_value).replace("\"", "\\\"") + "\""
- elif str(value_type) == "<class 'zeep.objects.Date'>":
- return "\"" + datetime.date(*[ad_manager_value[key] for key in date_keys]).isoformat() + "\""
- elif str(value_type) == "<class 'zeep.objects.DateTime'>":
- datetime_value = datetime.datetime(
- ad_manager_value['date']['year'], ad_manager_value['date']['month'], ad_manager_value['date']['day'],
- ad_manager_value['hour'], ad_manager_value['minute'], ad_manager_value['second'], tzinfo=pytz.timezone(ad_manager_value['timeZoneId'])
- )
- utc_datetime_value = datetime_value.astimezone(pytz.timezone('UTC'))
- utc_datetime_value = datetime.datetime(utc_datetime_value.year, utc_datetime_value.month, utc_datetime_value.day, utc_datetime_value.hour, utc_datetime_value.minute, utc_datetime_value.second)
- return "\"" + utc_datetime_value.isoformat() + "\""
- else:
- return str(ad_manager_value).replace("\"", "\\\"")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement