Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- import json
- import logging
- import os
- import string
- import time
- import uuid
- from copy import deepcopy
- from decimal import Decimal
- import Geohash
- import boto3
- import pymysql
- from aws_requests_auth.aws_auth import AWSRequestsAuth
- from boto3.dynamodb.conditions import Key
- from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
- from elasticsearch import Elasticsearch, RequestsHttpConnection
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- rds_host = os.environ['rds_host']
- user_name = os.environ['rds_user_name']
- password = os.environ['rds_password']
- db_name = os.environ['rds_db_name']
- dynamodb = boto3.resource('dynamodb')
- restaurant_table = dynamodb.Table('CF-restaurant_' + os.environ['environment'])
- menuItems_table = dynamodb.Table('CF-menuItems_' + os.environ['environment'])
- restaurant_mapper_table = dynamodb.Table('CF-restaurantMapper_' +
- os.environ['environment'])
- menu_mapper_table = dynamodb.Table('CF-menuItemMapper_' +
- os.environ['environment'])
- meals_table = dynamodb.Table('CF-meals_' + os.environ['environment'])
- meals_reported_table = dynamodb.Table(
- 'CF-mealReported_' + os.environ['environment'])
- images_reported_table = dynamodb.Table(
- 'CF-imagesReported_' + os.environ['environment'])
- images_table = dynamodb.Table('CF-images_' + os.environ['environment'])
- unpublished_image_table = dynamodb.Table('CF-imagesUnpublished_'
- + os.environ['environment'])
- tags_table = dynamodb.Table('CF-tags_' + os.environ['environment'])
- users_table = dynamodb.Table('CF-users_' + os.environ['environment'])
- # Scan tags table to create a tag_id -> tag_name lookup map.
- tag_lookup = {}
- tags = tags_table.scan()
- if tags.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200:
- for item in tags.get('Items', []):
- tag_lookup.update({item.get('tag_id'): item})
- serializer = TypeSerializer()
- deserializer = TypeDeserializer()
- lambda_client = boto3.client('lambda', region_name='us-east-1')
- awsauth = AWSRequestsAuth(aws_access_key=os.environ['aws_access_key'],
- aws_secret_access_key=os.environ[
- 'aws_secret_access_key'],
- aws_host=os.environ['host'],
- aws_region='us-east-1',
- aws_service='es')
- esClient = Elasticsearch(
- hosts=[{'host': os.environ['host'], 'port': 443}],
- http_auth=awsauth,
- use_ssl=True,
- verify_certs=True,
- connection_class=RequestsHttpConnection)
- def create_menu_item(menu_detail):
- """
- Create menu item with given menu detail
- :param menu_detail: dictionary of menu item detail
- :return: created menu item id
- """
- try:
- menu_response = menuItems_table.put_item(
- Item=menu_detail)
- if menu_response.get('ResponseMetadata', {}).get(
- 'HTTPStatusCode') == 200:
- return True
- else:
- return False
- except Exception as err:
- logger.info("error in create menu item %s ", str(err))
- return False
- def create_restaurant(restaurant_detail):
- """
- Create restaurant item with given restaurant detail
- :param restaurant_detail: dictionary of restaurant detail
- :return: created rentaurant id
- """
- try:
- logger.info('restaurant_detail: {}'.format(restaurant_detail))
- restaurant_response = restaurant_table.put_item(
- Item=restaurant_detail)
- if restaurant_response.get('ResponseMetadata', {}).get(
- 'HTTPStatusCode') == 200:
- return True
- else:
- return False
- except Exception as err:
- logger.info("error in create restaurant entry %s ", str(err))
- return False
- def update_count_of_tag(tags):
- '''
- :param tags: tags ids associated with menu_item
- :return: success/Failure
- '''
- logger.info("tag ids to be updated : %s", str(tags))
- try:
- tag_ids = [tag for tag in tags]
- with tags_table.batch_writer() as batch:
- for tag_id in tag_ids:
- if not tag_lookup.get(tag_id, {}).get('occurrence_count'):
- occurrence_count = 0
- else:
- occurrence_count = tag_lookup.get(
- tag_id, {}).get('occurrence_count')
- tag_item = tag_lookup.get(tag_id).update(
- {'occurrence_count': int(occurrence_count) + 1})
- batch.put_item(
- Item=tag_item
- )
- logger.error("successfully updated count of tags")
- return True
- except Exception as exc:
- logger.error("Exception in batch write: err %s", str(exc))
- return False
- def map_provider_menu_id(event, restaurant_id, image_url, timestamp,
- validation_response):
- """
- Map between provider restaurant id with menu item
- :param event: user input
- :param restaurant_id: restaurant id
- :return: Mapped menu item id
- """
- TWOPLACES = Decimal(10) ** -2
- try:
- # Query database to check if provider menu item id already exists.
- response = menu_mapper_table.query(
- KeyConditionExpression=Key('provider_menu_item_id').eq(
- event.get('provider_restaurant_id') + '_' +
- string.capwords(str(event.get('menu_item_name')).strip())),
- ScanIndexForward=False,
- Limit=1
- )
- new_image_url = None
- tag_ids = [tag.get('tag_id') for tag in event.get('tags', {})]
- # Return unique id of existing menu item.
- if response.get('Items', []):
- item = response['Items'][0]
- menu_item_id = item.get('menu_item_id')
- logger.info("menu item id = %s", str(menu_item_id))
- response_menuitem = menuItems_table.query(
- KeyConditionExpression=Key('menu_item_id').eq(menu_item_id),
- ScanIndexForward=False,
- Limit=1
- )
- # increment count of each tag
- update_count_of_tag(tag_ids)
- logger.info(response_menuitem)
- logger.info("validation response = %s", str(validation_response))
- if response_menuitem.get('ResponseMetadata', {}).get(
- 'HTTPStatusCode') == 200:
- if response_menuitem.get('Items', []):
- menu_item = response_menuitem.get('Items', [])[0]
- new_image_url = menu_item.get('image_url')
- logger.info("old image url = %s", new_image_url)
- if validation_response:
- logger.info("Got validation response true")
- if not new_image_url or event.get('update_menu'):
- logger.info("image url doesnot exist for menu item")
- updated_list = []
- updated_value = {}
- elastic_input_menu_item = {
- 'restaurant_id': restaurant_id,
- 'menu_item_name': string.capwords(
- str(event.get('menu_item_name')).strip()),
- 'provider_restaurant_id': event.get(
- 'provider_restaurant_id'),
- 'timestamp': timestamp,
- 'created_by': event.get('identity_id'),
- 'created_date': str(datetime.date.today())
- }
- # Save geohash values.
- if event.get('longitude') and event.get('latitude'):
- geohash = Geohash.encode(
- float(event.get('latitude')),
- float(event.get('longitude')))
- elastic_input_menu_item.update(
- {'geohash': geohash, 'geohash_5': geohash[:5],
- 'geohash_6': geohash[:6]})
- updated_list.append("geohash = :geohash")
- updated_value.update({':geohash': geohash})
- updated_list.append("geohash_5 = :geohash_5")
- updated_value.update({':geohash_5': geohash[:5]})
- updated_list.append("geohash_6 = :geohash_6")
- updated_value.update({':geohash_6': geohash[:6]})
- if image_url:
- updated_list.append("image_url = :updated_image")
- updated_value.update({':updated_image': image_url})
- elastic_input_menu_item.update(
- {'image_url': image_url})
- if event.get('tags'):
- updated_list.append("tags = :updated_tags")
- updated_value.update(
- {':updated_tags': [tag.get('tag_id')
- for tag in
- event.get('tags')]})
- elastic_input_menu_item.update(
- {'tags': [tag.get('tag_id')
- for tag in event.get('tags')]})
- if event.get('price'):
- price = event.get('price')
- try:
- price = str(Decimal(price).quantize(TWOPLACES))
- except Exception as err:
- logger.info("invalid price value %s ",
- str(err))
- price = event.get('price')
- updated_list.append("price = :updated_price")
- updated_value.update({':updated_price': price})
- if event.get('currency'):
- updated_list.append("currency = :updated_currency")
- updated_value.update({
- ':updated_currency': event.get('currency')})
- elastic_input_menu_item.update(
- {'currency': event.get('currency')})
- if event.get('symbol'):
- updated_list.append("symbol = :updated_symbol")
- updated_value.update({
- ':updated_symbol': event.get('symbol')})
- elastic_input_menu_item.update(
- {'symbol': event.get('symbol')})
- if event.get('description'):
- updated_list.append(
- "description = :updated_description")
- updated_value.update(
- {':updated_description': event.get(
- 'description')})
- elastic_input_menu_item.update(
- {'description': event.get('description')})
- updated_query = "SET " + ", ".join(updated_list)
- logger.info(updated_query)
- logger.info(updated_value)
- menu_item_update = menuItems_table.update_item(
- Key={
- 'menu_item_id': menu_item_id
- },
- UpdateExpression=updated_query,
- ExpressionAttributeValues=updated_value
- )
- if menu_item_update.get('ResponseMetadata', {}).get(
- 'HTTPStatusCode') == 200:
- elastic_input_menu_item.update({
- 'menu_item_id': menu_item_id})
- print('Updating menu item in elastic search table')
- print(elastic_input_menu_item)
- response_image_save_activity = lambda_client.invoke(
- FunctionName='CF-updateElastic:' + os.environ[
- 'alias'],
- InvocationType='Event',
- Payload=json.dumps(
- {'menu_item': elastic_input_menu_item}))
- return menu_item_id
- else:
- logger.info('got error in update menu item')
- return None
- else:
- logger.info("image url exist for menu item")
- return menu_item_id
- else:
- logger.info("got validation response as false")
- return menu_item_id
- else:
- menu_item_id = str(uuid.uuid4())
- # Create menu item in database.
- response = restaurant_table.query(
- KeyConditionExpression=Key('restaurant_id').eq(restaurant_id)
- )
- geohash = None
- restaurant_name = None
- if response.get('Items', []):
- item = response['Items'][0]
- restaurant_name = item.get('restaurant_name')
- menu_detail = {
- 'menu_item_id': menu_item_id,
- 'restaurant_id': restaurant_id,
- 'menu_item_name': string.capwords(
- str(event.get('menu_item_name')).strip()),
- 'timestamp': timestamp,
- 'created_by': event.get('identity_id'),
- 'created_date': str(datetime.date.today())
- }
- # Save geohash values.
- if event.get('longitude') and event.get('latitude'):
- geohash = Geohash.encode(
- float(event.get('latitude')),
- float(event.get('longitude')))
- menu_detail.update({
- 'geohash': geohash,
- 'geohash_5': geohash[:5],
- 'geohash_6': geohash[:6]
- })
- if restaurant_name:
- menu_detail.update({
- 'restaurant_name': restaurant_name
- })
- if image_url and validation_response:
- menu_detail.update({
- 'image_url': image_url
- })
- if event.get('tags'):
- menu_detail.update(
- {
- 'tags': [tag.get('tag_id')
- for tag in event.get('tags')]
- })
- if event.get('city'):
- menu_detail.update({
- 'city': event.get('city')
- })
- if event.get('price'):
- price = event.get('price')
- try:
- price = str(Decimal(price).quantize(TWOPLACES))
- except Exception as err:
- logger.info("invalid price value %s ", str(err))
- price = event.get('price')
- menu_detail.update({
- 'price': price
- })
- if event.get('currency'):
- menu_detail.update({
- 'currency': event.get('currency')
- })
- if event.get('symbol'):
- menu_detail.update({
- 'symbol': event.get('symbol')
- })
- if event.get('description'):
- menu_detail.update({
- 'description': event.get('description')
- })
- if event.get('identity_id'):
- menu_detail.update({
- 'created_by': event.get('identity_id')
- })
- # Map provider menu item id in database if it does not exist.
- if create_menu_item(menu_detail):
- # Add menu item to elastic search.
- elastic_input_menu_item = deepcopy(menu_detail)
- elastic_input_menu_item.update({
- 'provider_restaurant_id': event.get(
- 'provider_restaurant_id')})
- print('Adding menu item to elastic search table: ')
- print(elastic_input_menu_item)
- response_image_save_activity = lambda_client.invoke(
- FunctionName='CF-updateElastic:' + os.environ['alias'],
- InvocationType='Event',
- Payload=json.dumps({'menu_item': elastic_input_menu_item}))
- menuitem_mapper_response = menu_mapper_table.put_item(
- Item={'menu_item_id': menu_item_id,
- 'provider_menu_item_id': event.get(
- 'provider_restaurant_id') + '_' + string.capwords(
- str(event.get('menu_item_name')).strip())
- })
- if menuitem_mapper_response.get('ResponseMetadata', {}).get(
- 'HTTPStatusCode') == 200:
- # increment count of each tag
- if tag_ids:
- update_count_of_tag(tag_ids)
- return menu_item_id
- else:
- return None
- else:
- return None
- except Exception as err:
- logger.info("error in map and get menu_id %s ", str(err))
- return None
- def map_provider_restaurant_id(event, timestamp):
- """
- Map restaurant id and provider restaurant id.
- :param event: user input
- :param timestamp: time of restaurant creation
- :return: Mapped restaurant id
- """
- try:
- # Query database to check if provider restaurant id already exists.
- response = restaurant_mapper_table.query(
- KeyConditionExpression=Key('provider_restaurant_id').eq(
- event.get('provider_restaurant_id')),
- ScanIndexForward=False,
- Limit=1
- )
- # return corresponding restaurant id for give provider id.
- if response.get('Items', []):
- item = response['Items'][0]
- restaurant_id = item.get('restaurant_id')
- return restaurant_id
- # Create restaurant entry in database.
- else:
- restaurant_id = str(uuid.uuid4())
- restaurant_detail = {
- 'restaurant_id': restaurant_id,
- 'timestamp': timestamp,
- 'created_by': event.get('identity_id'),
- 'created_date': str(datetime.date.today())
- }
- try:
- geohash = Geohash.encode(
- float(event.get('latitude')),
- float(event.get('longitude')))
- restaurant_detail.update(
- {'geohash': geohash, 'geohash_5': geohash[:5],
- 'geohash_6': geohash[:6]})
- except Exception as err:
- logger.info("error in calculating restaurant geohash: %s ",
- str(err))
- # Map provider restaurant id in database if it does not exist.
- if create_restaurant(restaurant_detail):
- restaurant_mapper_detail = {'restaurant_id': restaurant_id,
- 'provider_restaurant_id': event.get(
- 'provider_restaurant_id'),
- }
- restaurant_mapper_response = restaurant_mapper_table.put_item(
- Item=restaurant_mapper_detail)
- if restaurant_mapper_response.get('ResponseMetadata', {}).get(
- 'HTTPStatusCode') == 200:
- return restaurant_id
- else:
- return None
- else:
- return None
- except Exception as err:
- logger.info("error in mapping and get restaurant id %s ", str(err))
- return None
- def update_elastic_search_db(menu_item_id, average_rating, reviews_count=None):
- try:
- update_data = {'average_rating': average_rating}
- if reviews_count:
- update_data.update({'reviews_count': reviews_count})
- get_response = esClient.get(
- index='menu_items_' + os.environ['environment'],
- doc_type='menu_item',
- id=menu_item_id)
- print('Old Data {0}'.format(get_response.get('_source', {})))
- response = esClient.update(
- index='menu_items_' + os.environ['environment'],
- doc_type='menu_item',
- id=menu_item_id,
- body={"doc": update_data},
- refresh='true')
- logger.info('Elastic Search Update Response {0}'.format(response))
- get_response = esClient.get(
- index='menu_items_' + os.environ['environment'],
- doc_type='menu_item',
- id=menu_item_id)
- print('New Data {0}'.format(get_response.get('_source', {})))
- except Exception as exc:
- logger.error(
- 'Exception in "update_elastic_search_db": {0}'.format(exc))
- def update_news_feed(meal_id):
- try:
- conn = pymysql.connect(rds_host, user=user_name, passwd=password,
- db=db_name, connect_timeout=5)
- logger.info('RDS connection Info {0}'.format(conn))
- current_time = int(time.time())
- with conn.cursor() as cur:
- cur.execute("SELECT * FROM newsFeed_" + os.environ['environment'] +
- " WHERE meal_id = %s", args=[meal_id])
- print('Data Before {0}'.format(cur.fetchall()))
- cur.execute("UPDATE newsFeed_" + os.environ['environment'] +
- " SET last_updated = %s" + " WHERE meal_id = %s",
- args=[str(current_time), meal_id])
- conn.commit()
- cur.execute("SELECT * FROM newsFeed_" + os.environ['environment'] +
- " WHERE meal_id = %s", args=[meal_id])
- print('Data After {0}'.format(cur.fetchall()))
- except Exception as exc:
- logger.error('Exception in "update_news_feed": {0}'.format(exc))
- def editPost_handler(event, context):
- try:
- logger.info("Input Data for %s environment is %s",
- str(os.environ['environment']), str(event))
- image_url = event.get('image_url')
- timestamp = int(time.time())
- image_id = str(uuid.uuid4())
- image_response = {}
- restaurant_id = ""
- menu_item_id = ""
- validation_response = False
- detailed_response = {}
- if image_url:
- if event.get('google_vision_status') != 'failure':
- validation_response = True
- if event.get('provider_restaurant_id') or event.get(
- 'restaurant_id'):
- if event.get('restaurant_id'):
- restaurant_id = event.get('restaurant_id')
- if event.get('menu_item_id'):
- menu_item_id = event.get('menu_item_id')
- if event.get('provider_restaurant_id'):
- restaurant_id = map_provider_restaurant_id(event, timestamp)
- if restaurant_id:
- menu_item_id = map_provider_menu_id(event,
- restaurant_id,
- image_url,
- timestamp,
- validation_response)
- if not restaurant_id:
- return {'status': 'failure',
- 'status_code': 500,
- 'message': 'Internal Server Error'}
- if not menu_item_id:
- return {'status': 'failure',
- 'status_code': 500,
- 'message': 'Internal Server Error'}
- images = [image_id]
- user_id = event.get('identity_id')
- return {'status': 'success',
- 'message': 'Successfully updated post'}
- except Exception as exc:
- logger.error('Exception in editPost_handler {0}'.format(exc))
- return {'status': 'failure', 'message': str(exc)}
Add Comment
Please, Sign In to add comment