Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- import time
- #Make it easier to connect in thonny
- print("Hello World")
- time.sleep(5)
- print("Loading...")
- from json import dumps as json_encode
- from GPIO import GPIO # https://pastebin.com/YX7jDw0E
- import _thread
- #import uasync_requests as urequest
- import urequest # https://github.com/micropython/micropython-lib/blob/master/python-ecosys/urequests/urequests.py
- import uasyncio
- sleep=uasyncio.sleep
- sleep_ms=uasyncio.sleep_ms
- def http(page,data=False,err=False):
- #return print("Simulation mode: ",page)
- # This function is ugly and i am not happy with it
- if not err:
- page="http://10.0.0.69:8080/"+page
- print("URL =",page,"DATA =",data)
- if not GPIO.nic.isconnected():
- print("----------NO NETWORK----------")
- return False
- try:
- if not data:
- r=urequest.get(page)
- else:
- r=urequest.post(page,data=json_encode(data),headers={"Content-type":"application/json"})
- except BadStatusLine:
- #No idea why my server does this sometimes, line is returned as []
- print("ERROR: BadStatusLine; Ignore")
- r.close()
- r=None
- return True
- except Exception as e:
- print("ERROR:",e)
- if not err:
- time.sleep_ms(500)
- #only try twice
- return http(page,data,True)
- else:
- return False
- if r.status_code == 200 and r.content==b"OK":
- r.close()
- r=True
- else:
- print(page,':',r.status_code,"-",r.content)
- r.close()
- r=False
- return r
- def polling():
- def counter(x,val):
- if val != x["state"]:
- x["count"]=1
- x["state"]=val
- x["last_time"]=x["time"]
- x["time"]=time.ticks_ms()
- elif x["count"]<GPIO.polling.m_sec:
- x["count"]+=1
- elif x["count"] == GPIO.polling.m_sec:
- return True
- return False
- count=[]
- doors=[]
- for i in GPIO.inputs:
- pin=GPIO.inputs[i]
- door=i[-4:]=="door"
- count.append({
- "pin":pin,
- "name":i,
- "state":pin.value(),
- "count":GPIO.polling.m_sec+1,
- "is_door":door,
- "time":0
- })
- if door:
- doors.append({
- "name":count[-1]["name"],
- "value":count[-1]["state"]
- })
- doorNotify(doors)
- doors=None
- buzz_ct=0
- while True:
- #START_TIME=time.ticks_us()
- for switch in count:
- val=switch["pin"].value()
- if counter(switch,val):
- #if not switch["is_door"] and time.ticks_diff(switch["time"],switch["last_time"]) < 1000:# anti-button spam; may never use
- GPIO.polling.work.append({
- "name":switch["name"],
- "is_door":switch["is_door"],
- "value":val,
- "time":switch["time"]
- })
- switch["count"]+=1
- if GPIO.buzzer.active:# 30 loops = ~50 ms
- buzz_ct+=1
- if buzz_ct==1:
- GPIO.buzzer.pin.duty_u16(32767)
- elif buzz_ct==30:
- GPIO.buzzer.pin.duty_u16(0)
- elif buzz_ct==60:
- buzz_ct=0
- elif buzz_ct:
- buzz_ct=0
- GPIO.buzzer.active=0
- GPIO.buzzer.pin.duty_u16(0)
- time.sleep_ms(1)
- #END_TIME=time.ticks_us()
- #print(time.ticks_diff(END_TIME,START_TIME))
- async def getWork():
- if not GPIO.polling.work:
- return 0
- job=GPIO.polling.work[0]
- GPIO.polling.work.pop(0)
- #print(GPIO.polling.work,job)
- if job["is_door"]:
- if job["name"] != "living_door" and not job["value"]:
- uasyncio.create_task(lightTimer())
- await sleep(0)
- doorNotify([job])
- elif job["name"] == "garage_button" and job["value"]:
- garageButtons(time.ticks_diff(time.ticks_ms(),job["time"]))
- elif job["name"] == "living_button" and not job["value"]:
- http("music/outputs.php?set=2&pico")
- return 1
- def readSensor(pin):
- pin.measure()
- data={
- "temp":pin.temperature(),
- "water":pin.humidity()
- }
- print(data)
- if data["temp"]==150.0:
- return False# temp==150.0 and humidity == 100 if read too soon at power up
- else:
- return data
- async def lightTimer():
- def isBright():
- bright=10000# see notes in GPIO.py
- lighting=GPIO.light.value()
- #print("light level:",lighting, ";threshold:",bright)
- return lighting > bright
- if GPIO.light.bulb.value() or isBright():
- return
- sec=30
- GPIO.light.bulb.value(1)
- while sec > 0:
- sec-=1
- if isBright():
- GPIO.light.bulb.value(0)
- GPIO.buzzer.active=0
- return
- if GPIO.light.onboard.value() or not GPIO.light.bulb.value():# light is set manually
- GPIO.buzzer.active=0
- return
- #print("light count down:",sec)
- GPIO.buzzer.active = sec==9 or sec < 3
- await sleep(1)
- GPIO.buzzer.active=0
- print("Timer up, lights out")
- GPIO.light.bulb.value(0)
- def garageButtons(held):
- pin=GPIO.inputs["garage_button"]
- while pin.value() and held < 750:
- sleep_ms(1)
- held+=1
- if held == 750:# if held was over supplied at over 750 something is VERY wrong, under 100 is expected
- GPIO.light.onboard.toggle()
- GPIO.light.bulb.value(GPIO.light.onboard.value())
- else:
- # http://10.0.0.69:8080/music/outputs.php?set=5
- http("music/outputs.php?set=5&pico")
- def doorNotify(doors):
- url="smart_home.php?update=garage"
- json={"doors":{},"status":getReadings()}
- for i in doors:
- if i["name"]=="living_door":
- i["value"]=int(not i["value"])
- json["doors"][i["name"]]=i["value"]
- #uasyncio.create_task(http(url,json))
- json["status"]=getReadings()
- http(url,json)
- def getReadings():
- return {
- "bulb": GPIO.light.bulb.value(),
- "onboard": GPIO.light.onboard.value(),
- "light": GPIO.light.value()
- }
- async def main():
- json={"DHT22":{},"status":{}}
- sensors={
- "index":-1,
- "pins":[],
- "names":[],
- "values":[{},{},{},{}],
- "last_index":-1,
- "error":0
- }
- for i in GPIO.sensors:
- sensors["pins"].append(GPIO.sensors[i])
- sensors["names"].append(i)
- sensors["last_index"]+=1
- loops=350000# 3600000 = ~ 15 minutes
- send_data=0
- sensor_error=0
- #debug_msg=0
- while True:
- if await getWork() and loops >= 345000:
- loops=345000
- print("Delay sensor reading")
- # Activity found, defer sensor reading for faster response
- #if GPIO.light.bulb.value() and not GPIO.light.onboard.value():# Timer running
- if send_data:
- send_data=0
- data={}
- for (name,val) in zip(sensors["names"], sensors["values"]):
- data[name]=val
- http("smart_home.php?update=garage",{
- "DHT22":data,
- "status":getReadings()
- })
- elif loops >= 360000 and not GPIO.light.bulb.value():
- # light is off thus no timer
- # Only read one sensor between checks for high priority work
- sensors["index"]+=1
- data=readSensor(sensors["pins"][ sensors["index"] ])
- if data:
- sensors["values"][ sensors["index"] ]=data
- sensor_error=0
- elif not sensor_error:
- # Only try twice
- sensor_error=1
- sensors["index"]-=1
- loops=358000
- #else:#deal with this later
- if sensors["index"] == sensors["last_index"]:
- loops=0
- send_data=1
- sensors["index"]=-1
- await sleep_ms(1)
- loops+=1
- #if debug_msg==10000:
- # print("alive")
- # debug_msg=0
- #debug_msg+=1
- def _main():
- uasyncio.run(main())
- print("Starting Work!")
- #_thread.start_new_thread(uasyncio.run,(main()))
- _thread.start_new_thread(_main,())
- polling()
- print("End of script")
Advertisement
Add Comment
Please, Sign In to add comment