Guest User

Untitled

a guest
Feb 2nd, 2020
864
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.38 KB | None | 0 0
  1. import usocket, network, time
  2. import lcd, image
  3. from Maix import GPIO
  4. from machine import UART
  5. from fpioa_manager import fm, board_info
  6.  
  7.  
  8. ssid = "SSID"
  9. password = "PASSWORD"
  10.  
  11.  
  12.  
  13. # IO map for ESP32 on Maixduino
  14. fm.register(25,fm.fpioa.GPIOHS10)#cs
  15. fm.register(8,fm.fpioa.GPIOHS11)#rst
  16. fm.register(9,fm.fpioa.GPIOHS12)#rdy
  17. fm.register(28,fm.fpioa.GPIOHS13)#mosi
  18. fm.register(26,fm.fpioa.GPIOHS14)#miso
  19. fm.register(27,fm.fpioa.GPIOHS15)#sclk
  20.  
  21. def wifi_enable(en):
  22. #    global wifi_en
  23. #    wifi_en.value(en)
  24.     print("")
  25.  
  26. def wifi_reset():
  27.     global uart
  28. #    wifi_enable(0)
  29. #    time.sleep_ms(200)
  30. #    wifi_enable(1)
  31. #    time.sleep(2)
  32. #    uart = UART(UART.UART2,115200,timeout=1000, read_buf_len=4096)
  33. #    tmp = uart.read()
  34. #    uart.write("AT+UART_CUR=921600,8,1,0,0\r\n")
  35. #    print(uart.read())
  36. #    uart = UART(UART.UART2,921600,timeout=1000, read_buf_len=10240) # important! baudrate too low or read_buf_len too small will loose data
  37. #    uart.write("AT\r\n")
  38. #    tmp = uart.read()
  39. #    print(tmp)
  40. #    if not tmp.endswith("OK\r\n"):
  41. #        print("reset fail")
  42. #        return None
  43. #    try:
  44. #        nic = network.ESP8285(uart)
  45. #    except Exception:
  46. #        return None
  47. #    return nic
  48.  
  49. #nic = wifi_reset()
  50. #if not nic:
  51. #    raise Exception("WiFi init fail")
  52.  
  53. nic = network.ESP32_SPI(cs=fm.fpioa.GPIOHS10,rst=fm.fpioa.GPIOHS11,rdy=fm.fpioa.GPIOHS12, mosi=fm.fpioa.GPIOHS13,miso=fm.fpioa.GPIOHS14,sclk=fm.fpioa.GPIOHS15)
  54. nic.connect(ssid,password)
  55. nic.ifconfig()
  56.  
  57. class Response:
  58.  
  59.     def __init__(self, f):
  60.         self.raw = f
  61.         self.encoding = "utf-8"
  62.         self._cached = None
  63.  
  64.     def close(self):
  65.         if self.raw:
  66.             self.raw.close()
  67.             self.raw = None
  68.         self._cached = None
  69.  
  70.     @property
  71.     def content(self):
  72.         if self._cached is None:
  73.             try:
  74.                 self._cached = self.raw.read()
  75.             finally:
  76.                 self.raw.close()
  77.                 self.raw = None
  78.         return self._cached
  79.  
  80.     @property
  81.     def text(self):
  82.         return str(self.content, self.encoding)
  83.  
  84.     def json(self):
  85.         import ujson
  86.         return ujson.loads(self.content)
  87.  
  88.  
  89. def request(method, url, data=None, json=None, headers={}, stream=None, parse_headers=True):
  90.     redir_cnt = 1
  91.     if json is not None:
  92.         assert data is None
  93.         import ujson
  94.         data = ujson.dumps(json)
  95.  
  96.     while True:
  97.         try:
  98.             proto, dummy, host, path = url.split("/", 3)
  99.         except ValueError:
  100.             proto, dummy, host = url.split("/", 2)
  101.             path = ""
  102.         if proto == "http:":
  103.             port = 80
  104.         elif proto == "https:":
  105.             import ussl
  106.             port = 443
  107.         else:
  108.             raise ValueError("Unsupported protocol: " + proto)
  109.  
  110.         if ":" in host:
  111.             host, port = host.split(":", 1)
  112.             port = int(port)
  113.  
  114.         ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM)
  115.         ai = ai[0]
  116.  
  117.         resp_d = None
  118.         if parse_headers is not False:
  119.             resp_d = {}
  120.  
  121.         s = usocket.socket(ai[0], ai[1], ai[2])
  122.         try:
  123.             s.connect(ai[-1])
  124.             if proto == "https:":
  125.                 s = ussl.wrap_socket(s, server_hostname=host)
  126.             s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
  127.             if not "Host" in headers:
  128.                 s.write(b"Host: %s\r\n" % host)
  129.             # Iterate over keys to avoid tuple alloc
  130.             for k in headers:
  131.                 s.write(k)
  132.                 s.write(b": ")
  133.                 s.write(headers[k])
  134.                 s.write(b"\r\n")
  135.             if json is not None:
  136.                 s.write(b"Content-Type: application/json\r\n")
  137.             if data:
  138.                 s.write(b"Content-Length: %d\r\n" % len(data))
  139.             s.write(b"Connection: close\r\n\r\n")
  140.             if data:
  141.                 s.write(data)
  142.  
  143.             l = s.readline()
  144.             #print(l)
  145.             l = l.split(None, 2)
  146.             status = int(l[1])
  147.             reason = ""
  148.             if len(l) > 2:
  149.                 reason = l[2].rstrip()
  150.             while True:
  151.                 l = s.readline()
  152.                 if not l or l == b"\r\n":
  153.                     break
  154.                 #print(l)
  155.  
  156.                 if l.startswith(b"Transfer-Encoding:"):
  157.                     if b"chunked" in l:
  158.                         raise ValueError("Unsupported " + l)
  159.                 elif l.startswith(b"Location:") and 300 <= status <= 399:
  160.                     if not redir_cnt:
  161.                         raise ValueError("Too many redirects")
  162.                     redir_cnt -= 1
  163.                     url = l[9:].decode().strip()
  164.                     #print("redir to:", url)
  165.                     status = 300
  166.                     break
  167.  
  168.                 if parse_headers is False:
  169.                     pass
  170.                 elif parse_headers is True:
  171.                     l = l.decode()
  172.                     k, v = l.split(":", 1)
  173.                     resp_d[k] = v.strip()
  174.                 else:
  175.                     parse_headers(l, resp_d)
  176.         except OSError:
  177.             s.close()
  178.             raise
  179.  
  180.         if status != 300:
  181.             break
  182.  
  183.     resp = Response(s)
  184.     resp.status_code = status
  185.     resp.reason = reason
  186.     if resp_d is not None:
  187.         resp.headers = resp_d
  188.     return resp
  189.  
  190.  
  191. def head(url, **kw):
  192.     return request("HEAD", url, **kw)
  193.  
  194. def get(url, **kw):
  195.     return request("GET", url, **kw)
  196.  
  197. def post(url, **kw):
  198.     return request("POST", url, **kw)
  199.  
  200. def put(url, **kw):
  201.     return request("PUT", url, **kw)
  202.  
  203. def patch(url, **kw):
  204.     return request("PATCH", url, **kw)
  205.  
  206. def delete(url, **kw):
  207.     return request("DELETE", url, **kw)
  208.  
  209. headers ={
  210.     "User-Agent": "MaixPy"
  211. }
  212.  
  213. res = get("http://dl.sipeed.com/MAIX/MaixPy/assets/Alice.jpg", headers=headers)
  214. print("response:", res.status_code)
  215. content = res.content
  216. print("get img, length:{}, should be:{}".format(len(content), int(res.headers['Content-Length'])))
  217.  
  218. if len(content)!= int(res.headers['Content-Length']):
  219.     print("download img fail, not complete, try again")
  220. else:
  221.     print("save to /flash/Alice.jpg")
  222.     f = open("/flash/Alice.jpg","wb")
  223.     f.write(content)
  224.     f.close()
  225.     del content
  226.     print("save ok")
  227.     print("display")
  228.     img = image.Image("/flash/Alice.jpg")
  229.     lcd.init()
  230.     lcd.display(img)
Advertisement
Add Comment
Please, Sign In to add comment