Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import time
- import hashlib
- import hmac
- import json
- import requests
- import logging
- logger = logging.getLogger()
- class TuyaAPI:
- def __init__(self, api_region, access_id, api_secret, token_cache=None):
- self.api_region = api_region
- self.access_id = access_id
- self.api_secret = api_secret
- self.token = None
- self.token_cache = token_cache
- if self.token_cache and os.path.isfile(self.token_cache):
- with open(self.token_cache, 'r') as f:
- self.token = json.loads(f.read())
- logger.info(f"Tuya - loaded cached token from {self.token_cache}")
- api_region = api_region.lower()
- if (api_region == "ch"):
- self.urlhost = "openapi.tuyacn.com" # China Data Center
- elif (api_region == "us"):
- self.urlhost = "openapi.tuyaus.com" # Western America Data Center
- elif (api_region == "us-e"):
- self.urlhost = "openapi-ueaz.tuyaus.com" # Eastern America Data Center
- elif (api_region == "eu"):
- self.urlhost = "openapi.tuyaeu.com" # Central Europe Data Center
- elif (api_region == "eu-w"):
- self.urlhost = "openapi-weaz.tuyaeu.com" # Western Europe Data Center
- elif (api_region == "in"):
- self.urlhost = "openapi.tuyain.com" # India Datacenter
- else:
- raise NotImplementedError(f"Unknown region '{api_region}'")
- def request(self, method, uri, body=None, headers=None, no_token=False):
- # Build URL
- while uri.startswith("/"): uri = uri[1:]
- url = "https://%s/%s" % (self.urlhost, uri)
- # Check body
- if body and type(body) == dict:
- body = json.dumps(body)
- # Build Header
- now = int(time.time() * 1000)
- headers = dict(list(headers.items()) + [('Signature-Headers', ":".join(headers.keys()))]) if headers else {}
- if (no_token):
- payload = self.access_id + str(now)
- headers['secret'] = self.api_secret
- else:
- self.refresh_token()
- payload = self.access_id + self.token["access_token"] + str(now)
- payload += (method + '\n' + # HTTPMethod
- hashlib.sha256(bytes((body or "").encode('utf-8'))).hexdigest() + '\n' + # Content-SHA256
- ''.join(['%s:%s\n'%(key, headers[key]) # Headers
- for key in headers.get("Signature-Headers", "").split(":")
- if key in headers]) + '\n' +
- '/' + url.split('//', 1)[-1].split('/', 1)[-1])
- # Sign Payload
- signature = hmac.new(
- self.api_secret.encode('utf-8'),
- msg=payload.encode('utf-8'),
- digestmod=hashlib.sha256
- ).hexdigest().upper()
- # Create Header Data
- headers['client_id'] = self.access_id
- headers['sign'] = signature
- headers['t'] = str(now)
- headers['sign_method'] = 'HMAC-SHA256'
- if (not no_token):
- headers['access_token'] = self.token["access_token"]
- if method == "GET":
- response = requests.get(url, headers=headers)
- elif method == "POST":
- response = requests.post(url, headers=headers, data=body)
- elif method == "PUT":
- response = requests.put(url, headers=headers, data=body)
- elif method == "DELETE":
- response = requests.delete(url, headers=headers)
- else:
- raise NotImplementedError(f"Unknown method '{method}'")
- response_dict = response.json()
- success = response_dict["success"]
- if not success: raise TuyaError(response_dict["msg"])
- return response_dict["result"]
- def save_token(self):
- if self.token_cache:
- with open(self.token_cache, 'w') as f:
- f.write(json.dumps(self.token))
- def refresh_token(self):
- if self.token and time.time() < self.token["token_time"] + self.token["expire_time"]:
- return
- if not self.token:
- logger.info(f"Tuya - token is empty, requesting new token...")
- else:
- logger.info(f"Tuya - token expired, requesting new one...")
- uri = "v1.0/token?grant_type=1"
- self.token = self.request("GET", uri, no_token=True)
- self.token["token_time"] = time.time()
- self.save_token()
- logger.info(f"Tuya - new token received")
- def TuyaError(Exception):
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement