Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Sends api requests in batches of 10 to update the etp site
- import requests
- import json
- import logging
- import time
- import config
- def send_requests(json_fname, api_url, api_email, api_password, store_name):
- results_json = []
- try:
- with open(json_fname) as json_file:
- data = json.load(json_file)
- # Split requests into chunks of 10
- request_chunks = list(divide_chunks(data, 10))
- # Send out requests in chunks of 10 or less
- for i in range(0,len(request_chunks)):
- # Pause every 100 requests
- if (i%100 == 0):
- print(f"Sent {i} requests so far")
- print(f"Sending request chunk {i} of {len(request_chunks)-1}")
- # payload = json.dumps(request_chunks[i])
- payload = request_chunks[i]
- try:
- r = requests.post(api_url, auth=(api_email,api_password), json=payload)
- if r.status_code == 200:
- print(f"Updated Chunk {i} Successfully")
- results_str = r.content.decode('utf8')
- current_results_list=json.loads(results_str)
- for j in range(len(current_results_list)):
- results_json.append(current_results_list[j])
- time.sleep(1)
- else:
- print(r.status_code)
- except requests.exceptions.HTTPError as httpErr:
- print("Http Error:", httpErr)
- except requests.exceptions.ConnectionError as connErr:
- print("Error Connecting:", connErr)
- except requests.exceptions.Timeout as timeOutErr:
- print("Timeout Error:", timeOutErr)
- except requests.exceptions.RequestException as reqErr:
- print("Something Else:", reqErr)
- if (len(results_json)>0):
- try:
- json_fname = "./json/"+store_name+"/"+store_name+"_etp_update_results_"+config.today.strftime("%Y")+"_"+config.today.strftime("%m")+"_"+config.today.strftime("%d")+".json"
- with open(json_fname, 'w') as fp:
- json.dump(results_json, fp)
- return json_fname
- except:
- print("Error saving json file.")
- pass
- else:
- print("No product changes have been detected")
- return False
- except Exception as e:
- print("An error occured")
- logging.error(e)
- # Split data into chunks (Credit to https://auth.geeksforgeeks.org/user/Shantanu%20Sharma./)
- # Yield successive n-sized chunks from l.
- def divide_chunks(l, n):
- # looping till length l
- for i in range(0, len(l), n):
- yield l[i:i + n]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement