Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- import functools
- import time
- import json
- import os
- from datetime import datetime, timedelta
- import logging
- # Constants
- API_TOKEN = # TODO
- HEADERS = {
- "Authorization": f"Bearer {API_TOKEN}",
- "Content-Type": "application/json",
- }
- GRAPHQL_ENDPOINT = "https://api.stratz.com/graphql"
- # Configure logging
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
- # Constants for API rate limits
- SECOND_LIMIT = 20 - 2 # timers aren't perfect
- LIMIT_BUFFER = 20 # larger buffer in case you used some up on the website "manually"
- MINUTE_LIMIT = 250 - LIMIT_BUFFER
- HOUR_LIMIT = 2000 - LIMIT_BUFFER
- DAY_LIMIT = 10000 - LIMIT_BUFFER
- # File path for storing request counts
- REQUEST_COUNT_FILE = os.path.join(os.path.dirname(__file__), "api_call_counts.json")
- def update_call_counts():
- """Update call counts for the current time unit, resetting counts where necessary."""
- now = datetime.now()
- time_keys = {
- "second": now.strftime("%Y-%m-%d %H:%M:%S"),
- "minute": now.strftime("%Y-%m-%d %H:%M"),
- "hour": now.strftime("%Y-%m-%d %H"),
- "day": now.strftime("%Y-%m-%d"),
- }
- # Load or initialize request counts
- if os.path.exists(REQUEST_COUNT_FILE):
- with open(REQUEST_COUNT_FILE, "r") as f:
- counts = json.load(f)
- else:
- counts = {k: {"time": "", "count": 0} for k in time_keys}
- # Update counts or reset if the time unit has changed
- for unit, key in time_keys.items():
- if counts[unit]["time"] == key:
- counts[unit]["count"] += 1
- else:
- counts[unit] = {"time": key, "count": 1}
- # Save updated counts
- with open(REQUEST_COUNT_FILE, "w") as f:
- json.dump(counts, f)
- def check_rate_limits():
- """Check if the call is within rate limits. If not, calculate wait time and pause."""
- if not os.path.exists(REQUEST_COUNT_FILE): # No need to check if file doesn't exist
- return
- with open(REQUEST_COUNT_FILE, "r") as f:
- counts = json.load(f)
- limits = {
- "second": SECOND_LIMIT,
- "minute": MINUTE_LIMIT,
- "hour": HOUR_LIMIT,
- "day": DAY_LIMIT,
- }
- for unit, limit in limits.items():
- if counts[unit]["count"] >= limit:
- now = datetime.now()
- next_unit_start = now + timedelta(**{unit + "s": 1})
- next_unit_start = next_unit_start.replace(microsecond=0)
- if unit == "minute":
- next_unit_start = next_unit_start.replace(second=0)
- elif unit == "hour":
- next_unit_start = next_unit_start.replace(minute=0, second=0)
- elif unit == "day":
- next_unit_start = next_unit_start.replace(hour=0, minute=0, second=0)
- wait_seconds = (next_unit_start - now).total_seconds()
- logging.info(
- f"Rate limit for {unit} exceeded. Waiting for {wait_seconds} seconds."
- )
- time.sleep(wait_seconds)
- break # Once the longest wait is identified, no need to check smaller units
- def rate_limited(func):
- @functools.wraps(func)
- def wrapper_rate_limited(*args, **kwargs):
- check_rate_limits()
- update_call_counts()
- return func(*args, **kwargs)
- return wrapper_rate_limited
- @rate_limited
- def post_query(query):
- response = requests.post(GRAPHQL_ENDPOINT, headers=HEADERS, json={"query": query})
- if response.status_code == 200:
- return response.json()
- else:
- raise Exception(f"Failed to fetch data: {response.status_code} {response.text}")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement