Advertisement
ClusterM

TuyaAPI Client on Python

Dec 7th, 2021
433
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import os
  2. import time
  3. import hashlib
  4. import hmac
  5. import json
  6. import requests
  7. import logging
  8.  
  9. logger = logging.getLogger()
  10.  
  11. class TuyaAPI:
  12.     def __init__(self, api_region, access_id, api_secret, token_cache=None):
  13.         self.api_region = api_region
  14.         self.access_id = access_id
  15.         self.api_secret = api_secret
  16.         self.token = None
  17.         self.token_cache = token_cache
  18.         if self.token_cache and os.path.isfile(self.token_cache):
  19.             with open(self.token_cache, 'r') as f:
  20.                 self.token = json.loads(f.read())
  21.                 logger.info(f"Tuya - loaded cached token from {self.token_cache}")
  22.  
  23.         api_region = api_region.lower()
  24.         if (api_region == "ch"):
  25.             self.urlhost = "openapi.tuyacn.com"      # China Data Center
  26.         elif (api_region == "us"):
  27.             self.urlhost = "openapi.tuyaus.com"      # Western America Data Center
  28.         elif (api_region == "us-e"):
  29.             self.urlhost = "openapi-ueaz.tuyaus.com" # Eastern America Data Center
  30.         elif (api_region == "eu"):
  31.             self.urlhost = "openapi.tuyaeu.com"      # Central Europe Data Center
  32.         elif (api_region == "eu-w"):
  33.             self.urlhost = "openapi-weaz.tuyaeu.com" # Western Europe Data Center
  34.         elif (api_region == "in"):
  35.             self.urlhost = "openapi.tuyain.com"      # India Datacenter
  36.         else:
  37.             raise NotImplementedError(f"Unknown region '{api_region}'")
  38.  
  39.     def request(self, method, uri, body=None, headers=None, no_token=False):
  40.         # Build URL
  41.         while uri.startswith("/"): uri = uri[1:]
  42.         url = "https://%s/%s" % (self.urlhost, uri)
  43.  
  44.         # Check body
  45.         if body and type(body) == dict:
  46.             body = json.dumps(body)
  47.  
  48.         # Build Header
  49.         now = int(time.time() * 1000)
  50.         headers = dict(list(headers.items()) + [('Signature-Headers', ":".join(headers.keys()))]) if headers else {}
  51.         if (no_token):
  52.             payload = self.access_id + str(now)
  53.             headers['secret'] = self.api_secret
  54.         else:
  55.             self.refresh_token()
  56.             payload = self.access_id + self.token["access_token"] + str(now)
  57.  
  58.         payload += (method + '\n' +                                                  # HTTPMethod
  59.             hashlib.sha256(bytes((body or "").encode('utf-8'))).hexdigest() + '\n' + # Content-SHA256
  60.             ''.join(['%s:%s\n'%(key, headers[key])                                   # Headers
  61.                      for key in headers.get("Signature-Headers", "").split(":")
  62.                      if key in headers]) + '\n' +
  63.             '/' + url.split('//', 1)[-1].split('/', 1)[-1])
  64.  
  65.         # Sign Payload
  66.         signature = hmac.new(
  67.             self.api_secret.encode('utf-8'),
  68.             msg=payload.encode('utf-8'),
  69.             digestmod=hashlib.sha256
  70.         ).hexdigest().upper()
  71.  
  72.         # Create Header Data
  73.         headers['client_id'] = self.access_id
  74.         headers['sign'] = signature
  75.         headers['t'] = str(now)
  76.         headers['sign_method'] = 'HMAC-SHA256'
  77.  
  78.         if (not no_token):
  79.             headers['access_token'] = self.token["access_token"]
  80.         if method == "GET":
  81.             response = requests.get(url, headers=headers)
  82.         elif method == "POST":
  83.             response = requests.post(url, headers=headers, data=body)
  84.         elif method == "PUT":
  85.             response = requests.put(url, headers=headers, data=body)
  86.         elif method == "DELETE":
  87.             response = requests.delete(url, headers=headers)
  88.         else:
  89.             raise NotImplementedError(f"Unknown method '{method}'")
  90.  
  91.         response_dict = response.json()
  92.         success = response_dict["success"]
  93.         if not success: raise TuyaError(response_dict["msg"])
  94.         return response_dict["result"]
  95.  
  96.     def save_token(self):
  97.         if self.token_cache:
  98.             with open(self.token_cache, 'w') as f:
  99.                 f.write(json.dumps(self.token))
  100.  
  101.     def refresh_token(self):
  102.         if self.token and time.time() < self.token["token_time"] + self.token["expire_time"]:
  103.             return
  104.         if not self.token:
  105.             logger.info(f"Tuya - token is empty, requesting new token...")
  106.         else:
  107.             logger.info(f"Tuya - token expired, requesting new one...")
  108.         uri = "v1.0/token?grant_type=1"
  109.         self.token = self.request("GET", uri, no_token=True)
  110.         self.token["token_time"] = time.time()
  111.         self.save_token()
  112.         logger.info(f"Tuya - new token received")
  113.  
  114. def TuyaError(Exception):
  115.     pass
  116.  
Advertisement
Advertisement
Advertisement
RAW Paste Data Copied
Advertisement