Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- # pip3 install goodwe==0.3.1
- import asyncio
- import goodwe
- import goodwe.sensor
- import logging
- import sys
- import time
- from optparse import OptionParser
- import aiohttp
- import datetime
- import json
- import importlib.metadata
- import time
- if False:
- import packaging.version
- if (packaging.version.parse(importlib.metadata.version('goodwe')) <= packaging.version.parse('0.3')):
- for i in range(1, 6):
- print("You should upgrade your goodwe library" + ("!" * i))
- time.sleep(1)
- #endif
- #endif
- #endif
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- handler = logging.StreamHandler(sys.stdout)
- handler.setLevel(logging.INFO)
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- root.addHandler(handler)
- parser = OptionParser(usage = '%prog [options]')
- parser.add_option("", "--inverter-host", dest="inverter_host", help="Goodwe Inverter Address")
- parser.add_option("", "--inverter-id", dest="inverter_id", help="Id of inverter")
- parser.add_option("", "--http-post-url", dest="http_post_url", help="HTTP post url")
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Verbose logging")
- (options, args) = parser.parse_args()
- if (not all([options.inverter_host, options.http_post_url])):
- print("Not all options given", file = sys.stderr)
- parser.print_help()
- sys.exit(-1)
- #endif
- if (options.verbose):
- handler.setLevel(logging.DEBUG)
- #endif
- def getDBColumnFromSensor(sensor):
- if (sensor.id_ == 'grid_mode'):
- sensor.kind = goodwe.inverter.SensorKind.GRID
- elif (sensor.id_ == 'grid_mode_label'):
- sensor.kind = goodwe.inverter.SensorKind.GRID
- elif (sensor.id_ == 'temperature_air'):
- sensor.kind = None
- elif (sensor.id_ == 'temperature_module'):
- sensor.kind = None
- elif (sensor.id_ == 'temperature'):
- sensor.kind = None
- elif (sensor.id_ == 'safety_country'):
- sensor.kind = None
- elif (sensor.id_ == 'safety_country_label'):
- sensor.kind = None
- elif (sensor.id_ == 'vgrid'):
- sensor.id_ = 'vgrid1'
- elif (sensor.id_ == 'igrid'):
- sensor.id_ = 'igrid1'
- elif (sensor.id_ == 'fgrid'):
- sensor.id_ = 'fgrid1'
- elif (sensor.id_ == 'pgrid'):
- sensor.id_ = 'pgrid1'
- #endif
- dbType = None
- if (isinstance(sensor, goodwe.sensor.Long)):
- dbType = 'int'
- elif (isinstance(sensor, goodwe.sensor.Integer)):
- dbType = 'int'
- elif (isinstance(sensor, goodwe.sensor.Timestamp)):
- dbType = 'datetime'
- elif (isinstance(sensor, goodwe.sensor.Voltage)):
- dbType = 'double'
- elif (isinstance(sensor, goodwe.sensor.Current)):
- dbType = 'double'
- elif (isinstance(sensor, goodwe.sensor.Power4)):
- dbType = 'double'
- elif (isinstance(sensor, goodwe.sensor.Power)):
- dbType = 'double'
- elif (isinstance(sensor, goodwe.sensor.Energy4)):
- dbType = 'double'
- elif (isinstance(sensor, goodwe.sensor.Energy)):
- dbType = 'double'
- elif (isinstance(sensor, goodwe.sensor.Frequency)):
- dbType = 'double'
- elif (isinstance(sensor, goodwe.sensor.Temp)):
- dbType = 'double'
- elif (isinstance(sensor, goodwe.sensor.Decimal)):
- dbType = 'double'
- elif (isinstance(sensor, goodwe.sensor.Float)):
- dbType = 'double'
- elif (isinstance(sensor, goodwe.sensor.Calculated)):
- if (sensor.id_ == 'ppv'):
- dbType = 'int'
- elif (sensor.id_ == 'grid_in_out'):
- dbType = 'int'
- elif (sensor.id_ == 'grid_in_out_label'):
- dbType = 'text'
- elif (sensor.id_ == 'pbattery1'):
- dbType = 'int'
- elif (sensor.id_ == 'errors'):
- dbType = 'text'
- elif (sensor.id_ == 'diagnose_result_label'):
- dbType = 'text'
- elif (sensor.id_ == 'house_consumption'):
- dbType = 'int'
- elif (sensor.id_ == 'battery_error'):
- dbType = 'text'
- elif (sensor.id_ == 'battery_warning'):
- dbType = 'text'
- #endif
- elif (isinstance(sensor, goodwe.sensor.Enum2)):
- dbType = 'text'
- elif (isinstance(sensor, goodwe.sensor.Enum)):
- dbType = 'text'
- elif (isinstance(sensor, goodwe.sensor.Byte)):
- dbType = 'int'
- elif (isinstance(sensor, goodwe.sensor.EnumCalculated)):
- dbType = 'text'
- elif (hasattr(goodwe.sensor, 'EnumL') and isinstance(sensor, goodwe.sensor.EnumL)):
- dbType = 'text'
- elif (hasattr(goodwe.sensor, 'EnumH') and isinstance(sensor, goodwe.sensor.EnumH)):
- dbType = 'text'
- elif (hasattr(goodwe.sensor, 'EnumBitmap4') and isinstance(sensor, goodwe.sensor.EnumBitmap4)):
- dbType = 'text'
- elif (hasattr(goodwe.sensor, 'EnumBitmap22') and isinstance(sensor, goodwe.sensor.EnumBitmap22)):
- dbType = 'text'
- elif (hasattr(goodwe.sensor, 'Reactive') and isinstance(sensor, goodwe.sensor.Reactive)):
- dbType = 'int'
- elif (hasattr(goodwe.sensor, 'Reactive4') and isinstance(sensor, goodwe.sensor.Reactive4)):
- dbType = 'int'
- elif (hasattr(goodwe.sensor, 'Apparent') and isinstance(sensor, goodwe.sensor.Apparent)):
- dbType = 'int'
- elif (hasattr(goodwe.sensor, 'Apparent4') and isinstance(sensor, goodwe.sensor.Apparent4)):
- dbType = 'int'
- elif (hasattr(goodwe.sensor, 'CellVoltage') and isinstance(sensor, goodwe.sensor.CellVoltage)):
- dbType = 'double'
- #endif
- if (dbType == None):
- print('Unknown sensor db type', sensor)
- return None
- #endif
- columnName = sensor.id_
- columnNamePrefix = None
- if (sensor.kind == goodwe.inverter.SensorKind.PV):
- columnNamePrefix = 'pv'
- elif (sensor.kind == goodwe.inverter.SensorKind.AC):
- columnNamePrefix = 'ac'
- elif (sensor.kind == goodwe.inverter.SensorKind.UPS):
- columnNamePrefix = 'backup'
- elif (sensor.kind == goodwe.inverter.SensorKind.BAT):
- columnNamePrefix = 'battery'
- elif (sensor.kind == goodwe.inverter.SensorKind.GRID):
- columnNamePrefix = 'grid'
- elif (sensor.kind == None):
- columnNamePrefix = ''
- #endif
- if (columnNamePrefix == None):
- print('Unknown sensor kind type', sensor)
- return None
- #endif
- if (not columnName.startswith(columnNamePrefix) and columnNamePrefix != ''):
- columnName = '%s_%s' % (columnNamePrefix, columnName)
- #endif
- return {
- 'name': columnName,
- 'type': dbType,
- 'comment': sensor.name + (' (%s)' % sensor.unit if sensor.unit else ''),
- 'kind': columnNamePrefix,
- }
- #enddef
- async def get_runtime_data(options):
- inverter = goodwe.ET(options.inverter_host, 0, timeout = 1, retries = 0)
- await inverter.read_device_info()
- print("Connected to inverter %s, S/N:%s." % (inverter.model_name, inverter.serial_number))
- debugColumns = False
- while True:
- try:
- runtime_data = await inverter.read_runtime_data()
- except Exception as e:
- logging.info('ERRRRRRRRRRRRRRRRRRR! %s' % e)
- print(e)
- # pockat 10s v pripade chyby, protoze treba komunikuje s menicem
- # neco jineho napr. SolarGo, tak abychom mu umoznili, taky se liznout
- await asyncio.sleep(10)
- continue
- #endtry
- #time.sleep(0.1)
- #continue
- logging.info('Got data')
- columnValues = {
- 'pv_inverter_id' : int(options.inverter_id),
- }
- for sensor in inverter.sensors():
- if sensor.id_ in runtime_data:
- value = runtime_data[sensor.id_]
- if ((sensor.id_.endswith('_h') or sensor.id_.endswith('_l')) and value < 0):
- value = 65536 + value
- #endif
- column = getDBColumnFromSensor(sensor)
- columnValues[column['name']] = value
- if (debugColumns):
- print(sensor.id_ + ':')
- print(' value:', value)
- print(' sensor:', sensor)
- print(' column:', column)
- #endif
- #endif
- if (debugColumns):
- print('---')
- #endif
- #endfor
- if (debugColumns):
- sys.exit()
- #endif
- for k, v in columnValues.items():
- if (isinstance(v, datetime.datetime)):
- columnValues[k] = v.strftime('%Y-%m-%d %H:%M:%S')
- #endif
- #endfor
- logging.info('Post data...')
- logging.debug(json.dumps(columnValues, ensure_ascii=False, indent=4, sort_keys=True))
- while True:
- try:
- async with aiohttp.ClientSession() as session:
- logging.info('Do post...')
- async with session.post(options.http_post_url, json = columnValues, timeout = 10) as response:
- logging.info('Got response %s', await response.text())
- res = await response.json()
- if (res['status'] == 200):
- break
- #endif
- #endwith
- #endwith
- except Exception as e:
- print('Error: %s' % repr(e))
- #endtry
- time.sleep(1)
- #endwhile
- time.sleep(1)
- #endwhile
- #enddef
- asyncio.run(get_runtime_data(options))
Advertisement
Add Comment
Please, Sign In to add comment