Advertisement
Guest User

Untitled

a guest
May 12th, 2019
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.71 KB | None | 0 0
  1. import datetime
  2. from googleads import ad_manager
  3. from googleads import oauth2
  4. from google.oauth2 import service_account
  5. from google.cloud import bigquery
  6. from os import getenv
  7. import pymysql
  8. import pytz
  9.  
  10. def handle_request(data, context):
  11. """Background Cloud Function to be triggered by Pub/Sub topic <fetch_line_items>."""
  12.  
  13. # Step 1: Authenticate and load LineItemService
  14. oauth2_client = oauth2.GoogleServiceAccountClient(getenv('KEY_FILE'), oauth2.GetAPIScope("ad_manager"))
  15. ad_manager_client = ad_manager.AdManagerClient(oauth2_client, getenv('APPLICATION_NAME'), getenv('NETWORK_CODE'))
  16. line_item_service = ad_manager_client.GetService("LineItemService")
  17.  
  18. # Step 2: Specify SQL config and connect
  19. CONNECTION_NAME = getenv('CLOUDSQL_INSTANCE_CONNECTION')
  20. DB_USER = getenv('CLOUDSQL_USER')
  21. DB_PASSWORD = getenv('CLOUDSQL_PASSWORD')
  22. DB_NAME = getenv('CLOUDSQL_DATABASE')
  23.  
  24. mysql_config = {
  25. 'unix_socket': f'/cloudsql/{CONNECTION_NAME}',
  26. 'user': DB_USER,
  27. 'password': DB_PASSWORD,
  28. 'db': DB_NAME,
  29. 'charset': 'utf8mb4',
  30. 'cursorclass': pymysql.cursors.DictCursor,
  31. 'autocommit': True
  32. }
  33.  
  34. mysql_conn = pymysql.connect(**mysql_config)
  35.  
  36. with mysql_conn.cursor() as cursor:
  37. # Step 3: Query database for the latest recorded modified date time
  38. cursor.execute("SELECT lastModifiedDateTime FROM LineItems " +
  39. "ORDER BY lastModifiedDateTime DESC LIMIT 1;")
  40. result = cursor.fetchone()
  41. if result is not None:
  42. latestModifiedDateTime = pytz.timezone('UTC').localize(result["lastModifiedDateTime"]) + datetime.timedelta(days=-1)
  43. statement = (ad_manager.StatementBuilder()
  44. .Where('lastModifiedDateTime > :lastModifiedDateTime')
  45. .WithBindVariable('lastModifiedDateTime', latestModifiedDateTime))
  46. else:
  47. statement = ad_manager.StatementBuilder()
  48.  
  49. # Step 4: Query database for the columns
  50. cursor.execute("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='LineItems';")
  51. columns = [row["COLUMN_NAME"] for row in cursor.fetchall()]
  52.  
  53. # Step 5: Fetch recently updated LineItems and update database
  54. while True:
  55. response = line_item_service.getLineItemsByStatement(statement.ToStatement())
  56. print("Response: " + str(response))
  57. if 'results' in response and len(response['results']):
  58. line_items = response['results']
  59. cursor.execute("INSERT INTO LineItems "
  60. "(" + ",".join(columns) + ") "
  61. "VALUES " + generate_sql_values(line_items, columns) + " "
  62. "ON DUPLICATE KEY UPDATE"
  63. " " + generate_sql_columns(columns) + ";"
  64. )
  65. statement.offset += statement.limit
  66. else:
  67. break
  68.  
  69. return {'message': 'Number of LineItems updated: ' + str(response['totalResultSetSize'])}
  70.  
  71. def generate_sql_columns(columns):
  72. return ", ".join([column + " = VALUES(" + column + ")" for column in columns])
  73.  
  74. def generate_sql_values(line_items, columns):
  75. sql_values = ("(" +
  76. "),(".join([
  77. ", ".join([convert_value_for_sql_query(line_item[column]) for column in columns])
  78. for line_item in line_items]
  79. ) + ")")
  80. return sql_values
  81.  
  82. def convert_value_for_sql_query(ad_manager_value):
  83. value_type = type(ad_manager_value)
  84. date_keys = ['year', 'month', 'day']
  85. datetime_keys = ['hour', 'minute', 'second']
  86. if ad_manager_value is None:
  87. return "NULL"
  88. elif value_type == str:
  89. return "\"" + str(ad_manager_value).replace("\"", "\\\"") + "\""
  90. elif str(value_type) == "<class 'zeep.objects.Date'>":
  91. return "\"" + datetime.date(*[ad_manager_value[key] for key in date_keys]).isoformat() + "\""
  92. elif str(value_type) == "<class 'zeep.objects.DateTime'>":
  93. datetime_value = datetime.datetime(
  94. ad_manager_value['date']['year'], ad_manager_value['date']['month'], ad_manager_value['date']['day'],
  95. ad_manager_value['hour'], ad_manager_value['minute'], ad_manager_value['second'], tzinfo=pytz.timezone(ad_manager_value['timeZoneId'])
  96. )
  97. utc_datetime_value = datetime_value.astimezone(pytz.timezone('UTC'))
  98. utc_datetime_value = datetime.datetime(utc_datetime_value.year, utc_datetime_value.month, utc_datetime_value.day, utc_datetime_value.hour, utc_datetime_value.minute, utc_datetime_value.second)
  99. return "\"" + utc_datetime_value.isoformat() + "\""
  100. else:
  101. return str(ad_manager_value).replace("\"", "\\\"")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement