Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: UTF-8 -*-
- """Reads input from GPIO with a reduced number of pins."""
- import time
- import RPi.GPIO as GPIO
- from configparser import ConfigParser
- class Matrix():
- def __init__(self):
- """Sets up constants and pin numbers."""
- # BCM (Broadcom) numbers are used
- GPIO.setmode(GPIO.BCM)
- parser = ConfigParser()
- filename = 'config.ini'
- parser.read(filename)
- gpio = parser['gpio']
- selection = parser['selection']
- self.rows = list(map(int, gpio['button_rows'].split(',')))
- self.columns = list(map(int, gpio['button_columns'].split(',')))
- topics = selection['topics'].replace(" ","").split(',')
- actions = selection['actions'].replace(" ","").split(',')
- while len(actions) <= len(topics):
- actions.append('unassigned')
- self.options_array = [
- topics,
- actions
- ]
- def get_button(self):
- """Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""
- # To search for the row of a pressed button:
- # - set all rows to input
- # - set all columns to output low
- # - save index in row_position
- row_position = -1
- for i in range(len(self.columns)):
- GPIO.setup(self.columns[i], GPIO.OUT)
- GPIO.output(self.columns[i], GPIO.LOW)
- for i in range(len(self.rows)):
- GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
- for i in range(len(self.rows)):
- temp_read = GPIO.input(self.rows[i])
- if temp_read == 0:
- row_position = i
- # cancel if no input was found
- if row_position < 0 or row_position >= len(self.rows):
- self.refresh()
- return
- # To search for the column of a pressed button in the row:
- # - set all columns to input
- # - rows[row_position] to output
- column_position = -1
- for j in range(len(self.columns)):
- GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
- GPIO.setup(self.rows[row_position], GPIO.OUT)
- GPIO.output(self.rows[row_position], GPIO.HIGH)
- for j in range(len(self.columns)):
- temp_read = GPIO.input(self.columns[j])
- if temp_read == 1:
- column_position = j
- # cancel if no input was found
- if column_position < 0 or column_position >= len(self.columns):
- self.refresh()
- return
- # returns coordinates of pressed button
- self.refresh()
- return self.options_array[row_position][column_position]
- def refresh(self):
- """Resets all pin states."""
- for i in range(len(self.rows)):
- GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
- for j in range(len(self.columns)):
- GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)
- if __name__ == '__main__':
- # For testing matrix functionality
- while True:
- user_input = None
- while user_input is None:
- user_input = Matrix().get_button()
- print(user_input)
- time.sleep(.2)
- #!/usr/bin/python3
- """Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
- import time
- import signal
- import sys
- import threading
- import RPi.GPIO as GPIO
- from threading import Timer
- from configparser import ConfigParser
- from matrix import Matrix
- from database_commit import DatabaseCommit as database
- class Controller():
- """Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
- def __init__(self):
- """Sets up constants."""
- # Use Broadcom-Numbering
- GPIO.setmode(GPIO.BCM)
- # Get constants from config.ini
- parser = ConfigParser()
- filename = 'config.ini'
- parser.read(filename)
- # Pin adresses
- gpio = parser['gpio']
- self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
- self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
- self.CANCEL_LED = self.ACTIONS_LED[0]
- self.CONFIRM_LED = self.ACTIONS_LED[1]
- self.POST_LED = self.ACTIONS_LED[2]
- self.NAGIOS_LED = self.ACTIONS_LED[3]
- # names, defaults, timers
- selection = parser['selection']
- self.TOPICS = selection['topics'].replace(" ", "").split(',')
- self.ACTIONS = selection['actions'].replace(" ", "").split(',')
- self.CANCEL = self.ACTIONS[0]
- self.CONFIRM = self.ACTIONS[1]
- self.POST = self.ACTIONS[2]
- self.NAGIOS = self.ACTIONS[3]
- self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
- self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
- self.SLEEP = float(selection['sleeptime'])
- self.BLINK = float(selection['blinktime'])
- self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])
- # reminder schedule
- schedule = parser['schedule']
- self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
- self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')
- self.current_selection = self.DEFAULT_SELECTION[:]
- self.led_state = self.DEFAULT_LED_STATE[:]
- # Setup the LED pins
- for i in range(len(self.TOPICS_LED)):
- GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
- GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
- for i in range(len(self.ACTIONS_LED)):
- GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
- GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)
- # Switch on default selection LEDs
- for i in range(len(self.DEFAULT_SELECTION)):
- for j in range(len(self.TOPICS)):
- if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
- GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
- self.led_state[j] = True
- # Notification status
- self.post_notification = False
- self.nagios_notification = False
- def led_reset_topics(self):
- """Disable all LEDs and enable default selection LEDs"""
- # All off
- for i in range(len(self.TOPICS_LED)):
- GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
- # Default an
- for i in range(len(self.DEFAULT_SELECTION)):
- for j in range(len(self.TOPICS)):
- if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
- GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
- self.led_state[j] = True
- return
- def continous_led_blink(blink_event, blink_thread, led):
- """Blink action LED continuously"""
- while not blink_event.isSet():
- GPIO.output(led, GPIO.HIGH)
- time.sleep(self.BLINK)
- event_is_set = blink_event.wait(blink_thread)
- if event_is_set:
- GPIO.output(led, GPIO.LOW)
- else:
- GPIO.output(led, GPIO.LOW)
- time.sleep(self.BLINK)
- def led_blink(self, led_pin):
- """Blink action LED
- """
- t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
- GPIO.output(led_pin, GPIO.HIGH)
- #time.sleep(.2)
- #GPIO.output(led_pin, GPIO.LOW)
- t.start()
- return
- def dismiss_notification(self, led_pin):
- """Disables notification LED
- :param led_pin: GPIO pin number
- """
- GPIO.output(led_pin, GPIO.LOW)
- def led_toggle_topic(self, led_index):
- """Toggles LED state of led_index
- :param led_index: Index in TOPICS_LED; contains GPIO pin number"""
- self.led_state[led_index] = not self.led_state[led_index]
- if self.led_state[led_index]:
- GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
- else:
- GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
- return
- def handle_selection(self, user_input):
- """
- Handles de/selection of topics
- Returns False if selection is altered
- Returns True if CANCEL or COMMIT is chosen
- :param user_input: String, name of pressed button
- """
- # If cancelled, reset to default selection
- if user_input == self.CANCEL:
- self.led_blink(self.CANCEL_LED)
- self.current_selection = self.DEFAULT_SELECTION[:]
- self.led_reset_topics()
- return True
- # If selection is non-empty, commit selection
- # - selection array is always sorted
- # If selection is empty, reset to default selection
- # and send default selection if SEND_DEFAULT_ON_EMPTY is True
- elif user_input == self.CONFIRM:
- self.led_blink(self.CONFIRM_LED)
- if self.current_selection != []:
- #self.current_selection.sort()
- database().make_commit(self.current_selection)
- self.current_selection = self.DEFAULT_SELECTION[:]
- else:
- self.current_selection = self.DEFAULT_SELECTION[:]
- if self.SEND_DEFAULT_ON_EMPTY:
- if len(self.DEFAULT_SELECTION) > 0:
- database().make_commit(self.current_selection)
- return True
- # TODO
- # Disable "Post" notification
- elif user_input == self.POST:
- """self.post_notification = False
- self.blink_thread.start()
- self.dismiss_notification(self.POST_LED)
- return False"""
- # TODO
- # Disable "Nagios" notification
- elif user_input == self.NAGIOS:
- """self.nagios_notification = False
- self.dismiss_notification(self.NAGIOS_LED)
- return False"""
- else:
- # Compare input with all TOPICS
- for i in range(len(self.TOPICS)):
- if user_input == self.TOPICS[i]:
- self.led_toggle_topic(i)
- already_exists = False
- # Delete if exists already
- for j in range(len(self.current_selection)):
- if user_input == self.current_selection[j]:
- already_exists = True
- self.current_selection.pop(j)
- print(user_input + " -")
- break
- # Add if does not exits yet
- if not already_exists:
- self.current_selection.append(user_input)
- print(user_input + " +")
- # Sort array
- # Same order as in TOPICS
- self.temp_selection = []
- for l in range(len(self.TOPICS)):
- self.temp_selection.append("")
- for i in range(len(self.TOPICS)):
- for j in range(len(self.current_selection)):
- if self.TOPICS[i] == self.current_selection[j]:
- self.temp_selection[i]=self.TOPICS[i]
- self.temp_selection = list(filter(None, self.temp_selection))
- self.current_selection = self.temp_selection[:]
- return False
- def wait_for_input(self):
- """Wartet auf Input und gibt an handle_selection weiter."""
- done_with_input = False
- while not done_with_input:
- user_input = None
- while user_input is None:
- user_input = Matrix().get_button()
- done_with_input = self.handle_selection(user_input)
- time.sleep(self.SLEEP)
- return
- if __name__ == '__main__':
- GPIO.setwarnings(False)
- database().external_prepare_table()
- while True:
- Controller().wait_for_input()
- GPIO.cleanup()
- sys.exit(0)
- #!/usr/bin/python
- # -*- coding: UTF-8 -*-
- import datetime
- import time
- from configparser import ConfigParser
- from mysql.connector import MySQLConnection, Error
- class DatabaseCommit():
- def read_config(section, filename='config.ini'):
- """ Reads config.ini and returns dictionary with config from section
- :param filename: name of config file
- :param section: name of section in config file
- :return: dictionary with database settings
- """
- parser = ConfigParser()
- parser.read(filename)
- config = {}
- if parser.has_section(section):
- items = parser.items(section)
- for item in items:
- config[item[0]] = item[1].replace(" ", "")
- else:
- raise Exception('{0} not found in {1} '.format(section, filename))
- return config
- def establish_connection(self):
- db_config = DatabaseCommit.read_config('mysql')
- conn = MySQLConnection(**db_config)
- if conn.is_connected():
- return conn
- else:
- print("Connection failed")
- def prepare_table(self, cursor):
- # TODO check for existing tables, add any if neccessary, fill with 0
- database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
- table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
- counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
- date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
- sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
- sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
- sql += date_column +' DATETIME'
- for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
- topic = topic.lower()
- sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
- sql += ');'
- print("n"+sql+"n")
- cursor.execute(sql)
- sql = ''
- return cursor
- def external_prepare_table(self):
- conn = self.establish_connection()
- cursor = conn.cursor()
- self.prepare_table(cursor)
- def insert_user_input(self, user_input, cursor, table_name):
- date_column = DatabaseCommit.read_config('table')['date_column']
- sql = 'INSERT INTO ' + table_name + '(' + date_column
- for topic in user_input:
- sql += ','+ topic
- sql += ')'
- sql += 'VALUES(NOW()'
- for topic in user_input:
- sql += ','+'TRUE'
- sql += ')'
- cursor.execute(sql)
- def make_commit(self, user_input):
- conn = self.establish_connection()
- cursor = conn.cursor()
- #self.prepare_table(cursor)
- table_name = DatabaseCommit.read_config('table')['table_name']
- self.insert_user_input(user_input, cursor, table_name)
- cursor.execute('COMMIT;')
- print("made commit :")
- print(user_input)
- cursor.close()
- conn.close()
- [gpio]
- button_rows = 4, 17
- button_columns = 18, 23, 24, 25, 12, 16
- topics_led = 2, 3, 27, 22, 10, 9
- actions_led = 11, 5, 6, 13, 19, 26
- [selection]
- topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
- actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
- default_selection = topic_2
- sleeptime = .3
- blinktime = .2
- send_default_on_empty = True
- [schedule]
- post = 11:30
- nagios = 09:30
- [mysql]
- host = 127.0.0.1
- database = mydatabase
- user = rpi
- password = password
- [table]
- table_name = test
- counter_column = number
- date_column = date
Add Comment
Please, Sign In to add comment