Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import hassapi as hass
- # App to track the trend of a sensor
- # Args:
- # sensor: The sensor entity to track. Any entity that has a value in its state may be used.
- # trend: The entity to set to either or "on" or "off". A state of "on" indicates an increasing trend and
- # a state of "off" indicates a decreasing trend. Any entity that can be turned on or off may be used.
- # samples: Number of samples to use. Must be a number and not an entity id
- # the app only reads "live" samples and does not read samples from the HA history when the app is instantiated
- # For fast changing sensors, increase the number of samples to get a more accurate indication of the trend. For a slow
- # changing system like the indoor temperature, 4 samples will be sufficient.
- # Example configuration to put in apps.yaml
- # trendsensor:
- # module: trend
- # class: trendsensor
- # sensor: sensor.temperature
- # samples: 4
- # trend: input_boolean.temperature_trend
- # Author: Tomas Jansson
- # Version 1.0
- # Version date: 2017-11-21
- # todo: Read samples from history when instantiated
- class trendsensor(hass.Hass):
- SENSOR = "sensor"
- SAMPLES = "samples"
- TREND = "trend"
- STATE_ON = "on"
- STATE_OFF = "off"
- # Listen for state changes for the sensor.
- def initialize(self):
- self.listen_state(self.sensor_change, self.args[self.SENSOR])
- self.index = 0
- self.samples = [0] * self.args[self.SAMPLES]
- def sensor_change(self, entity, attribute, old, new, kwargs):
- # Rotate the list left one step.
- self.samples = self.rotate(self.samples,1)
- # Add the new sensor value to the end of the list.
- self.samples[self.args[self.SAMPLES]-1] = (self.to_float(self.SENSOR))
- # Get the number of items in samples.
- x = range(1,len(self.samples)+1)
- self.log(self.samples)
- if self.beta(x,self.samples) == self.STATE_ON:
- self.turn_on(self.args[self.TREND])
- else:
- self.turn_off(self.args[self.TREND])
- def var(self,X):
- S = 0.0
- SS = 0.0
- for x in X:
- S += x
- SS += x*x
- xbar = S/float(len(X))
- return (SS - len(X) * xbar * xbar) / (len(X) -1.0)
- def cov(self,X,Y):
- n = len(X)
- xbar = sum(X) / n
- ybar = sum(Y) / n
- return sum([(x-xbar)*(y-ybar) for x,y in zip(X,Y)])/(n-1)
- def beta(self,x,y):
- if (self.cov(x,y) / self.var(x)) > 0:
- return self.STATE_ON
- else:
- return self.STATE_OFF
- # Returns the value of the state of an entity in the args list as a float.
- def to_float(self, arg):
- return float(self.get_state(self.args[arg]))
- # Returns the state of an entity in the args list
- def to_state(self, arg):
- return self.get_state(self.args[arg])
- # Rotates a list n steps to the left.
- def rotate(self,l, n):
- return l[n:] + l[:n]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement