Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- SAM yaml:
- AWSTemplateFormatVersion: '2010-09-09'
- Transform: 'AWS::Serverless-2016-10-31'
- Description: An AWS Serverless Specification template describing your function.
- Resources:
- teslaordercheck:
- Type: 'AWS::Serverless::Function'
- Properties:
- Handler: lambda_function.lambda_handler
- Runtime: python3.9
- CodeUri: .
- Description: ''
- MemorySize: 128
- Timeout: 15
- Role: >-
- arn:aws:iam::<redacted>:role/service-role/tesla-order-check-role
- Events:
- Schedule1:
- Type: Schedule
- Properties:
- Schedule: cron(0/15 * ? * * *)
- Layers:
- - >-
- arn:aws:lambda:ap-southeast-2:<redacted>:layer:AWSLambda-Python-AWS-SDK:4
- python source:
- import json
- import boto3
- import re
- from botocore.vendored import requests
- def stock_check():
- found = False
- notification = ""
- url_template = "https://www.tesla.com/inventory/api/v1/inventory-results?query=%7B%22query%22%3A%7B%22model%22%3A%22{}%22%2C%22condition%22%3A%22{}%22%2C%22options%22%3A%7B%22FleetSalesRegions%22%3A%5B%22Victoria%22%5D%7D%2C%22arrangeby%22%3A%22Price%22%2C%22order%22%3A%22asc%22%2C%22market%22%3A%22AU%22%2C%22language%22%3A%22en%22%2C%22super_region%22%3A%22north%20america%22%7D%7D"
- for model in ["m3", "ms", "mx"]:
- for condition in ["new", "used"]:
- response = requests.get( url_template.format(model, condition))
- json_response = response.json()
- cars_count = json_response['total_matches_found']
- if (cars_count != 0):
- found = True
- notification += "{} {} {} cars: {}\n".format(cars_count, model, condition, url_template.format(model, condition))
- if found:
- client = boto3.client('sns')
- response = client.publish (
- TargetArn = "arn:aws:sns:ap-southeast-2:<REDACTED>:tesla-update",
- Message = json.dumps({'default': notification}),
- MessageStructure = 'json'
- )
- def account_check():
- changes_found = False
- data_found = False
- notification = ""
- reservation = "<PASTE YOUR RNxxxx HERE>"
- url_template = "https://www.tesla.com/en_AU/teslaaccount/product-finalize?rn={}"
- headers = {'Referer': 'https://www.tesla.com/en_au/teslaaccount',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:95.0) Gecko/20100101 Firefox/95.0',
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
- 'Accept-Language': 'en-US,en;q=0.5',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'DNT': '1',
- 'Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-User': '?1','Sec-GPC': '1','Cache-Control': 'max-age=0',
- 'Cookie': '<PASTE COOKIES FROM YOUR BROWSER HERE>'
- }
- response = requests.get(url_template.format(reservation), headers = headers)
- text_response = response.text
- for line in text_response.split("\n"):
- if "Tesla.ProductF" in line:
- re.search("Tesla.ProductF = ({.*})\)\(\)\s*$",line.strip()).group(1)
- json_response = json.loads(re.search("Tesla.ProductF = ({.*)}\)\(\)\s*$",line.strip()).group(1))
- del json_response['Data']['Insurance']['form']
- data_found = True
- dynamo = boto3.resource('dynamodb').Table('tesla_checks')
- last_values = dynamo.get_item( Key = {'id': "1"})
- if ("Item" in last_values):
- if (json.loads(last_values['Item']['jsonvalue']) != json_response):
- changes_found = True
- notification += 'Changes found.'
- print ("Changes found")
- dynamo.update_item(Key = {'id': "1"},
- UpdateExpression="set jsonvalue=:r",
- ExpressionAttributeValues = {':r': json.dumps(json_response)},
- ReturnValues="UPDATED_NEW"
- )
- else:
- print ("New record")
- dynamo.put_item(Item = {'id': "1", 'jsonvalue': json.dumps(json_response)})
- if data_found == False:
- client = boto3.client('sns')
- response = client.publish (
- TargetArn = "arn:aws:sns:ap-southeast-2:<redacted>:tesla-update",
- Message = json.dumps({'default': 'Tesla check - no data found'}),
- MessageStructure = 'json'
- )
- if changes_found:
- client = boto3.client('sns')
- response = client.publish (
- TargetArn = "arn:aws:sns:ap-southeast-2:<redacted>:tesla-update",
- Message = json.dumps({'default': notification}),
- MessageStructure = 'json'
- )
- def lambda_handler(event, context):
- stock_check()
- account_check()
- return {
- 'statusCode': 200,
- 'body': "{}"
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement