Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from sense_energy import Senseable
- from time import sleep, time
- import pprint
- import sys
- if sys.version_info >= (3, 6):
- import asyncio
- from sense_energy import ASyncSenseable
- username = '<email>@site.com'
- password = '<password>'
- ##last_call = 0
- ##while True:
- ## if time() - last_call > 5:
- ## last_call = time()
- ## sense.get_realtime()
- ## print ("Active Devices:",", ".join(sense.active_devices))
- ## sleep(1)
- ##devices = sense.get_discovered_device_data()
- ##print "get_discovered_device_names:"
- ##pprint.pprint(sense.get_discovered_device_names())
- ##print "get_discovered_device_data:"
- ##pprint.pprint(sense.get_discovered_device_data())
- ##print "always_on_info:"
- ##pprint.pprint(sense.always_on_info())
- ##print "get_monitor_info:"
- ##pprint.pprint(sense.get_monitor_info())
- ##print "get_notification_preferences:"
- ##pprint.pprint(sense.get_notification_preferences())
- ##print "get_daily_kWh:"
- ##pprint.pprint(sense.get_daily_kWh())
- #device probabilities
- #pprint.pprint(sense.get_realtime())
- def sync_test():
- print ("Initialize")
- sense = Senseable(wss_timeout=10)
- sense.rate_limit = 2
- print ("Authenticate")
- sense.authenticate(username, password)
- print ("\nInitial update realtime and trend:")
- sense.update_realtime()
- sense.update_trend_data()
- print ("Active:",sense.active_power,"W")
- print ("Active Solar:",sense.active_solar_power,"W")
- print ("Daily:",sense.daily_usage,"KW")
- print ("Daily Solar:",sense.daily_production,"KW")
- print ("Active Devices:",", ".join(sense.active_devices))
- print ("\nRate Limit Test:")
- for i in range(3):
- sense.update_realtime()
- print ("Active:",sense.active_power,"W")
- sleep(1)
- print ("\nStream Test:")
- start = 0
- i = 0
- for rt in sense.get_realtime_stream():
- print ("Active:",sense.active_power,"W")
- if i > 3: break
- i +=1
- #sync_test()
- if sys.version_info >= (3, 6):
- async def async_test():
- global sense
- print ("\n\nASYNC Initialize:")
- sense = ASyncSenseable(wss_timeout=2)
- sense.rate_limit = 2
- print ("Authenticate")
- await sense.authenticate(username, password)
- loop = asyncio.get_event_loop()
- print ("\nInitial update realtime and trend:")
- await sense.update_realtime()
- await sense.update_trend_data()
- print ("Active:",sense.active_power,"W")
- print ("Active Solar:",sense.active_solar_power,"W")
- print ("Daily:",sense.daily_usage,"KW")
- print ("Daily Solar:",sense.daily_production,"KW")
- print ("Active Devices:",", ".join(sense.active_devices))
- print ("\nRate Limit Test:")
- for i in range(3):
- await sense.update_realtime()
- print ("Active:",sense.active_power,"W")
- await asyncio.sleep(1)
- print ("\nStream Test:")
- obj = [0]
- async def monitor_it(task):
- while obj[0] < 3:
- await asyncio.sleep(1)
- print ("Tick",obj[0])
- print ("Done")
- task.cancel()
- sense = 0
- asyncio.get_event_loop().run_until_complete(async_test())
- def cb(x):
- print ("Active:",sense.active_power,"W")
- obj[0] += 1
- fut = sense.get_realtime_future(cb)
- tasks = [
- asyncio.ensure_future(fut),
- asyncio.ensure_future(monitor_it(fut)),
- ]
- asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))
Advertisement
Add Comment
Please, Sign In to add comment