Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from time import sleep
- import requests
- from requests.auth import HTTPBasicAuth
- import json
- from dateutil.tz import tzlocal
- import time
- from datetime import datetime
- import io
- import math
- import smbus
- import subprocess
- import threading
- import random
- # Import SPI library (for hardware SPI) and MCP3008 library.
- import Adafruit_GPIO.SPI as SPI
- import Adafruit_MCP3008
- import RPi.GPIO as GPIO
- GPIO.setmode(GPIO.BCM)
- GPIO.setup(18, GPIO.IN)
- GPIO.setup(17, GPIO.OUT)
- global revcount
- revcount = 0
- url = 'https://tarktartu.iot.cs.ut.ee/'
- tenant = "tarktartu"
- deviceID = "10220"
- serialNr = "0000000017b769d5"
- username = "testing"
- password = "garage48"
- auth = HTTPBasicAuth(tenant+"/"+username, password)
- SPI_PORT = 0
- SPI_DEVICE = 0
- mcp = Adafruit_MCP3008.MCP3008(spi=SPI.SpiDev(SPI_PORT, SPI_DEVICE))
- def SendCustomTempMeasurement(targetID = 10000, tempValue = 25):
- requestUrl = url+'measurement/measurements'
- headers = {
- "Content-Type": "application/json",
- "Accept": "application/vnd.com.nsn.cumulocity.measurement+json",
- "Connection": "close"
- }
- data = {
- "com_stagnationlab_c8y_driver_measurements_TemperatureMeasurement": {
- "temperature": {
- "value": tempValue,
- "unit": "C" }
- },
- #"time": str(datetime.now(tzlocal()).isoformat()),
- "time": str(datetime.utcnow().isoformat()),
- "source": { "id": targetID },
- "type": "com_stagnationlab_c8y_driver_measurements_TemperatureMeasurement"
- }
- output = io.StringIO()
- json.dump(data, output)
- jsonString = output.getvalue()
- #print(jsonString)
- response = s.post(requestUrl, auth=auth, data=jsonString, headers=headers)
- print(response)
- print(response.content)
- def SendCustomSoundMeasurement(targetID = 10000, soundValue = 40):
- requestUrl = url+'measurement/measurements'
- headers = {
- "Content-Type": "application/json",
- "Accept": "application/vnd.com.nsn.cumulocity.measurement+json",
- }
- data = {
- "com_stagnationlab_c8y_driver_measurements_SoundMeasurement": {
- "soundLevel": {
- "value": soundValue,
- "unit": "dB" }
- },
- #"time": str(datetime.now(tzlocal()).isoformat()),
- "time": str(datetime.utcnow().isoformat()),
- "source": { "id": targetID },
- "type": "com_stagnationlab_c8y_driver_measurements_SoundMeasurement"
- }
- output = io.StringIO()
- json.dump(data, output)
- jsonString = output.getvalue()
- #print(jsonString)
- response = s.post(requestUrl, auth=auth, data=jsonString, headers=headers)
- print(response)
- print(response.content)
- def SendCustomPressureMeasurement(targetID = 10000, pressureValue = 40):
- requestUrl = url+'measurement/measurements'
- headers = {
- "Content-Type": "application/json",
- "Accept": "application/vnd.com.nsn.cumulocity.measurement+json",
- "Connection": "close"
- }
- data = {
- "com_stagnationlab_c8y_driver_measurements_PressureMeasurement": {
- "pressure": {
- "value": pressureValue,
- "unit": "kPa" }
- },
- "time": str(datetime.now(tzlocal()).isoformat()),
- "source": { "id": targetID },
- "type": "com_stagnationlab_c8y_driver_measurements_PressureMeasurement"
- }
- output = io.StringIO()
- json.dump(data, output)
- jsonString = output.getvalue()
- #print(jsonString)
- response = s.post(requestUrl, auth=auth, data=jsonString, headers=headers)
- print(response)
- print(response.content)
- def SendCustomHumidityMeasurement(targetID = 10000, humValue = 40):
- requestUrl = url+'measurement/measurements'
- headers = {
- "Content-Type": "application/json",
- "Accept": "application/vnd.com.nsn.cumulocity.measurement+json",
- "Connection": "close"
- }
- data = {
- "com_stagnationlab_c8y_driver_measurements_HumidityMeasurement": {
- "humidity": {
- "value": humValue,
- "unit": "%" }
- },
- "time": str(datetime.now(tzlocal()).isoformat()),
- "source": { "id": targetID },
- "type": "com_stagnationlab_c8y_driver_measurements_HumidityMeasurement"
- }
- output = io.StringIO()
- json.dump(data, output)
- jsonString = output.getvalue()
- #print(jsonString)
- response = requests.post(requestUrl, auth=auth, data=jsonString, headers=headers)
- print(response)
- print(response.content)
- def SendCustomLightMeasurement(targetID = 10000, lightValue = 40):
- requestUrl = url+'measurement/measurements'
- headers = {
- "Content-Type": "application/json",
- "Accept": "application/vnd.com.nsn.cumulocity.measurement+json",
- "Connection": "close"
- }
- data = {
- "com_stagnationlab_c8y_driver_measurements_LightMeasurement": {
- "lightLevel": {
- "value": lightValue,
- "unit": "lux" }
- },
- "time": str(datetime.now(tzlocal()).isoformat()),
- "source": { "id": targetID },
- "type": "com_stagnationlab_c8y_driver_measurements_LightMeasurement"
- }
- output = io.StringIO()
- json.dump(data, output)
- jsonString = output.getvalue()
- #print(jsonString)
- response = s.post(requestUrl, auth=auth, data=jsonString, headers=headers)
- print(response)
- print(response.content)
- def SendCustomAnemoMeasurement(targetID = 10000, anemoValue = 40):
- requestUrl = url+'measurement/measurements'
- headers = {
- "Content-Type": "application/json",
- "Accept": "application/vnd.com.nsn.cumulocity.measurement+json",
- "Connection": "close"
- }
- data = {
- "com_stagnationlab_c8y_driver_measurements_AnemoMeasurement": {
- "anemo": {
- "value": anemoValue,
- "unit": "m/s" }
- },
- "time": str(datetime.now(tzlocal()).isoformat()),
- "source": { "id": targetID },
- "type": "com_stagnationlab_c8y_driver_measurements_AnemoMeasurement"
- }
- output = io.StringIO()
- json.dump(data, output)
- jsonString = output.getvalue()
- #print(jsonString)
- response = s.post(requestUrl, auth=auth, data=jsonString, headers=headers)
- print(response)
- print(response.content)
- def increaserev(channel):
- global revcount
- revcount += 1
- GPIO.add_event_detect(18, GPIO.RISING, callback=increaserev)
- if __name__ == '__main__':
- s= requests.session()
- while True:
- global revcount
- termo_file = open("/sys/bus/w1/devices/28-0516863925ff/w1_slave")
- lines = termo_file.readlines()
- t_pos = lines[1].find('t=')
- temp_string = lines[1][t_pos+2:]
- temp_c = float(temp_string) / 1000.0
- SendCustomTempMeasurement(deviceID, temp_c)
- termo_file.close()
- dB = round(20.0 * math.log10(mcp.read_adc(1)+1) + 10)
- SendCustomSoundMeasurement(deviceID, dB)
- #Valgus
- lux = round(mcp.read_adc(0)*1123/691)
- print(lux)
- if lux < 50:
- now = datetime.now()
- if now.hour > 10 or now.hour < 20:
- GPIO.output(17, True)
- else:
- GPIO.output(17, False)
- else:
- GPIO.output(17, False)
- SendCustomLightMeasurement(deviceID, lux)
- time.sleep(1)
- rc = revcount * 60
- print ("RPM is " + str(rc))
- print ("M/S is " + str(rc/100))
- revcount = 0
- if rc/100 is not 0.0:
- SendCustomAnemoMeasurement(deviceID, rc/100)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement