Advertisement
ClusterM

TuyaAPI Client on Python

Dec 7th, 2021
480
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.55 KB | None | 0 0
  1. import os
  2. import time
  3. import hashlib
  4. import hmac
  5. import json
  6. import requests
  7. import logging
  8.  
  9. logger = logging.getLogger()
  10.  
  11. class TuyaAPI:
  12.     def __init__(self, api_region, access_id, api_secret, token_cache=None):
  13.         self.api_region = api_region
  14.         self.access_id = access_id
  15.         self.api_secret = api_secret
  16.         self.token = None
  17.         self.token_cache = token_cache
  18.         if self.token_cache and os.path.isfile(self.token_cache):
  19.             with open(self.token_cache, 'r') as f:
  20.                 self.token = json.loads(f.read())
  21.                 logger.info(f"Tuya - loaded cached token from {self.token_cache}")
  22.  
  23.         api_region = api_region.lower()
  24.         if (api_region == "ch"):
  25.             self.urlhost = "openapi.tuyacn.com"      # China Data Center
  26.         elif (api_region == "us"):
  27.             self.urlhost = "openapi.tuyaus.com"      # Western America Data Center
  28.         elif (api_region == "us-e"):
  29.             self.urlhost = "openapi-ueaz.tuyaus.com" # Eastern America Data Center
  30.         elif (api_region == "eu"):
  31.             self.urlhost = "openapi.tuyaeu.com"      # Central Europe Data Center
  32.         elif (api_region == "eu-w"):
  33.             self.urlhost = "openapi-weaz.tuyaeu.com" # Western Europe Data Center
  34.         elif (api_region == "in"):
  35.             self.urlhost = "openapi.tuyain.com"      # India Datacenter
  36.         else:
  37.             raise NotImplementedError(f"Unknown region '{api_region}'")
  38.  
  39.     def request(self, method, uri, body=None, headers=None, no_token=False):
  40.         # Build URL
  41.         while uri.startswith("/"): uri = uri[1:]
  42.         url = "https://%s/%s" % (self.urlhost, uri)
  43.  
  44.         # Check body
  45.         if body and type(body) == dict:
  46.             body = json.dumps(body)
  47.  
  48.         # Build Header
  49.         now = int(time.time() * 1000)
  50.         headers = dict(list(headers.items()) + [('Signature-Headers', ":".join(headers.keys()))]) if headers else {}
  51.         if (no_token):
  52.             payload = self.access_id + str(now)
  53.             headers['secret'] = self.api_secret
  54.         else:
  55.             self.refresh_token()
  56.             payload = self.access_id + self.token["access_token"] + str(now)
  57.  
  58.         payload += (method + '\n' +                                                  # HTTPMethod
  59.             hashlib.sha256(bytes((body or "").encode('utf-8'))).hexdigest() + '\n' + # Content-SHA256
  60.             ''.join(['%s:%s\n'%(key, headers[key])                                   # Headers
  61.                      for key in headers.get("Signature-Headers", "").split(":")
  62.                      if key in headers]) + '\n' +
  63.             '/' + url.split('//', 1)[-1].split('/', 1)[-1])
  64.  
  65.         # Sign Payload
  66.         signature = hmac.new(
  67.             self.api_secret.encode('utf-8'),
  68.             msg=payload.encode('utf-8'),
  69.             digestmod=hashlib.sha256
  70.         ).hexdigest().upper()
  71.  
  72.         # Create Header Data
  73.         headers['client_id'] = self.access_id
  74.         headers['sign'] = signature
  75.         headers['t'] = str(now)
  76.         headers['sign_method'] = 'HMAC-SHA256'
  77.  
  78.         if (not no_token):
  79.             headers['access_token'] = self.token["access_token"]
  80.         if method == "GET":
  81.             response = requests.get(url, headers=headers)
  82.         elif method == "POST":
  83.             response = requests.post(url, headers=headers, data=body)
  84.         elif method == "PUT":
  85.             response = requests.put(url, headers=headers, data=body)
  86.         elif method == "DELETE":
  87.             response = requests.delete(url, headers=headers)
  88.         else:
  89.             raise NotImplementedError(f"Unknown method '{method}'")
  90.  
  91.         response_dict = response.json()
  92.         success = response_dict["success"]
  93.         if not success: raise TuyaError(response_dict["msg"])
  94.         return response_dict["result"]
  95.  
  96.     def save_token(self):
  97.         if self.token_cache:
  98.             with open(self.token_cache, 'w') as f:
  99.                 f.write(json.dumps(self.token))
  100.  
  101.     def refresh_token(self):
  102.         if self.token and time.time() < self.token["token_time"] + self.token["expire_time"]:
  103.             return
  104.         if not self.token:
  105.             logger.info(f"Tuya - token is empty, requesting new token...")
  106.         else:
  107.             logger.info(f"Tuya - token expired, requesting new one...")
  108.         uri = "v1.0/token?grant_type=1"
  109.         self.token = self.request("GET", uri, no_token=True)
  110.         self.token["token_time"] = time.time()
  111.         self.save_token()
  112.         logger.info(f"Tuya - new token received")
  113.  
  114. def TuyaError(Exception):
  115.     pass
  116.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement