Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- DEVVVVVVVV Support for UPB lights
- Improvements:
- - ditch upb-cli and do everything natively
- - listen for status changes in real time on the network
- - optionally poll all the devices in the network every x minutes for their
- status
- - instead of recreating the light config in configuration.yaml, read the config
- from the upstart export file
- """
- import asyncio
- import logging
- import time
- from subprocess import check_output, CalledProcessError, STDOUT
- import voluptuous as vol
- from homeassistant.components.light import (
- ATTR_BRIGHTNESS, ATTR_BRIGHTNESS_PCT, SUPPORT_BRIGHTNESS, Light,
- PLATFORM_SCHEMA)
- from homeassistant.const import (CONF_NAME, CONF_ID, CONF_DEVICES)
- import homeassistant.helpers.config_validation as cv
- serial_port = ''
- network_id = ''
- _LOGGER = logging.getLogger(__name__)
- CONF_SERIAL_PORT = 'serial_port'
- CONF_NETWORK_ID = 'network_id'
- UPB_LIGHTS = 'upb_lights'
- PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
- vol.Required(CONF_DEVICES): vol.All(cv.ensure_list, [
- {
- vol.Required(CONF_ID): cv.string,
- vol.Required(CONF_NAME): cv.string,
- }
- ]),
- vol.Required(CONF_SERIAL_PORT): cv.string,
- vol.Required(CONF_NETWORK_ID): cv.string,
- })
- # Initialize a list / array
- upb_dev_bright_time = [None]*100
- upb_dev_bright_lvl = [None]*100
- upb_dev_bright_lvl[33] = 88
- upb_dev_bright_lvl[4] = 4
- upb_dev_bright_lvl[83] = 83
- def dump(obj):
- for attr in dir(obj):
- if hasattr( obj, attr ):
- print( "obj.%s = %s" % (attr, getattr(obj, attr)))
- def setup_platform(hass, config, add_devices, discovery_info=None):
- """Set up the UPB Light platform"""
- global serial_port, network_id
- serial_port = config.get(CONF_SERIAL_PORT)
- network_id = config.get(CONF_NETWORK_ID)
- try:
- upb_command('-V')
- except CalledProcessError as err:
- _LOGGER.error(err.output)
- return False
- for l in config[CONF_DEVICES]:
- print(l) # OrderedDict([('id', '83'), ('name', 'Office Lamp')])
- #add_devices(l)
- # hass.data[UPB_LIGHTS][light] = light
- add_devices(UPBLight(light) for light in config[CONF_DEVICES]) # add_devices returns None
- # devs = []
- # for (area_name, device) in hass.data[LUTRON_DEVICES]['light']:
- # dev = LutronLight(area_name, device, hass.data[LUTRON_CONTROLLER])
- # devs.append(dev)
- # light_devices = bridge.get_devices_by_domain(DOMAIN)
- # for light_device in light_devices:
- # print(light_device)
- """Handle when an entity is about to be added to Home Assistant."""
- #self._serial_loop_task = self.hass.loop.create_task(
- #self.serial_read(self._port, self._baudrate))
- myloop = hass.loop.create_task(serial_read(hass, '/dev/ttyS1', 4800))
- class UPBLight(Light):
- """Representation of an UPB Light"""
- def __init__(self, light):
- """Initialize an UPB Light"""
- self._light = light
- self._name = 'upb_' + light['name']
- self._id = light['id']
- self._brightness = None
- self._state = None
- @property
- def name(self):
- """Return the display name of this light"""
- return self._name
- @property
- def supported_features(self):
- """Flag supported features"""
- return SUPPORT_BRIGHTNESS
- @property
- def brightness(self):
- """Return the brightness of the light"""
- global upb_dev_bright_lvl
- #return self._brightness
- return upb_dev_bright_lvl[int(self._id)]
- @property
- def is_on(self):
- """Return true if light is on"""
- return self._state
- def turn_on(self, **kwargs):
- """Instruct the light to turn on"""
- global upb_dev_bright_lvl, upb_dev_bright_time
- bright_pct = int((kwargs.get(ATTR_BRIGHTNESS,255)/255)*100)
- #self._brightness = bright_pct
- upb_dev_bright_lvl[int(self._id)] = bright_pct
- #print(str(self._id)) ## This is the id value from the configutaion.yaml
- #upb_command('-n 99 -i 83 -t device -c goto -l 100 --send -p /dev/ttyS1' )
- #upb_command(' -i ' + self._id + ' -t device -c goto -l ' + str(bright_pct) + ' --send' )
- print("turn_on: " + "id: " + self._id + " brightness: " + str(bright_pct))
- #print(kwargs)
- self._state = True
- self._brightness = bright_pct
- def turn_off(self, **kwargs):
- """Instruct the light to turn off"""
- # upb_command(' -i ' + self._id + ' -t device -c goto -l 0 --send' )
- # time.sleep(1)
- # upb_command(' -i ' + self._id + ' -t device -c goto -l 0 --send' )
- self._state = False
- def update(self, **kwargs):
- """Update light"""
- global upb_dev_bright_lvl
- print('update kwargs: '+ str(kwargs))
- print("update for id: " + str(self._id) + " brightness: " + str(upb_dev_bright_lvl[int(self._id)]))
- self._brightness = upb_dev_bright_lvl[int(self._id)]
- #self._brightness = upb_dev_bright_lvl[self._id]
- #print(upb_dev_bright_lvl)
- #self._state = bool(get_unit_status(self._id))
- #self._light.update()
- #self._state = self._light.is_on()
- #self._brightness = self._light.brightness
- #self._brightness = self._light.brightness
- @asyncio.coroutine
- def serial_read(hass, device, rate, **kwargs):
- """Read the stream of light statuses from the serial port"""
- import serial_asyncio
- global upb_dev_bright_lvl
- print('serial_read kwargs: ' + str(kwargs))
- print('serial read method')
- reader, _ = yield from serial_asyncio.open_serial_connection(
- url=device, baudrate=rate, **kwargs)
- print('after reader 3')
- #hass.core.set('light.upb_office_lamp', 'on')
- #hass.states.async_set('light.upb_office_lamp', 'on')
- #self.hass.states.async_set(entity_id, STATE_CONFIGURE, data)
- while True:
- print('before readuntil')
- line = yield from reader.readuntil(separator=b'\r')
- line = line.decode('utf-8').strip()
- print('my line: '+ line)
- #id, brightness = decode_line(line)
- brightness = 77
- id = 4
- #hass.states.async_set('light.upb_office_lamp', 'on')
- upb_dev_bright_lvl[4] = 77
- print(str(upb_dev_bright_lvl))
- # create light object here currentlight = hass.data[UPB_LIGHTS][id]
- # lightobject.brightness(lightobject)
- # try:
- # data = json.loads(line)
- # if isinstance(data, dict):
- # self._attributes = data
- # except ValueError:
- # pass
- _LOGGER.debug("Received: %s", line)
- #self._state = line
- #self.async_schedule_update_ha_state()
- def upb_command(command):
- """Execute UPB command and check output"""
- #print("config: " + config[CONF_SERIAL_PORT])
- # print("serial: "+ serial_port)
- # print("upbnet: "+ network_id)
- return check_output(['upb-cli','-p',serial_port,'-n',network_id] + command.split(' '), stderr=STDOUT)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement