Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import RPi.GPIO as GPIO
- from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
- import time
- import signal
- import sys
- from lib import send_mail
- ###############
- # Global Vars #
- ###############
- XMLRPC_IP = "0.0.0.0"
- XMLRPC_PORT = 9000
- RELAY_PIN = 37
- RESET_BTN = 35
- # Keeps track of the last time a button was pressed
- last_reset_button_press_at = 0.0
- # Keeps track of the last time the relay was flipped
- last_relay_flip_at = 0.0
- last_alert = ''
- last_alert_at = 0.0
- ##############
- # Setup Pins #
- ##############
- GPIO.setmode(GPIO.BOARD)
- GPIO.setup(RELAY_PIN, GPIO.OUT)
- GPIO.setup(RESET_BTN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
- #######################
- # Kill Signal Handler #
- #######################
- def signal_handler(*_):
- GPIO.cleanup()
- print("\nExiting...")
- sys.exit()
- signal.signal(signal.SIGINT, signal_handler)
- signal.signal(signal.SIGTERM, signal_handler)
- #########################
- # Set to Low as Default #
- #########################
- GPIO.output(RELAY_PIN, GPIO.LOW)
- #############
- # Functions #
- #############
- # Makes sure the button is being released - not pressed
- # Due to a bug in the GPIO library, a GPIO.FALLING appears the same as
- # GPIO.BOTH. This means the callback will be called when the button is
- # being released (as it should), and sometimes when the button is being
- # pressed. This results in 2 calls per button press. This decorator
- # waits 5ms and checks the input's state, to see if it's currently pressed
- # or released. This eliminates false positives
- def ensure_press(func):
- def wrapper(*args, **kwargs):
- global last_reset_button_press_at
- channel = args[0]
- # Wait before checking state of pin
- time.sleep(0.005)
- if GPIO.input(channel):
- return
- if (time.time() - last_reset_button_press_at) > 1.0:
- last_reset_button_press_at = time.time()
- return func(*args, **kwargs)
- return wrapper
- def water_detected(location):
- global last_alert
- global last_alert_at
- global last_relay_flip_at
- # If the reset button was recently pressed, ignore alerts
- # There's some sort of bug where sensors trigger when reset button is pressed
- if (time.time() - last_reset_button_press_at) < 2.0:
- return True
- # If the relay was recently flipped, ignore alerts
- # There's some sort of bug where sensors trigger when relay is flipped
- if (time.time() - last_relay_flip_at) < 2.0:
- return True
- # If the alert we are seeing is the same as the last one we got, and
- # it's been less than 5 minutes since the last alert, ignore it
- # Makes sure it doesn't send us dozens of alerts per minute for the
- # same incident
- if last_alert == location and time.time() - last_alert_at < 300:
- return True
- print("Water detected at %s" % location)
- GPIO.output(RELAY_PIN, GPIO.HIGH)
- last_relay_flip_at = time.time()
- last_alert = location
- last_alert_at = time.time()
- # Send email
- recipients = 'my@email.com, some_other@email.com'
- subject = 'Water Leak Detected'
- body = 'Water detected at %s' % location
- credentials = {"gmail_username": "some_account@gmail.com",
- "gmail_password": ""}
- send_mail(recipients=recipients,
- subject=subject,
- body=body,
- credentials=credentials)
- return True
- @ensure_press
- def reset_btn_pressed(channel):
- global last_alert
- global last_alert_at
- print("Reset button pressed - resetting relay to allow water flow")
- GPIO.output(RELAY_PIN, GPIO.LOW)
- last_alert = ''
- last_alert_at = 0.0
- ########
- # Main #
- ########
- # XMLRPC Server
- print("Running XMLRPC server on %s:%s..." % (XMLRPC_IP, XMLRPC_PORT))
- # Restrict to a particular path
- class RequestHandler(SimpleXMLRPCRequestHandler):
- rpc_paths = ('/RPC2',)
- # Create server
- server = SimpleXMLRPCServer((XMLRPC_IP, XMLRPC_PORT),
- requestHandler=RequestHandler)
- # Register my functions
- server.register_function(water_detected)
- # Setup reset button handler
- GPIO.add_event_detect(RESET_BTN, GPIO.FALLING, callback=reset_btn_pressed, bouncetime=300)
- # Run the server
- server.serve_forever()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement