Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import sys
- import serial
- import sqlite3
- import time
- from datetime import datetime
- import ntplib
- def get_ntp_time():
- try:
- client = ntplib.NTPClient()
- response = client.request('pool.ntp.org')
- return datetime.fromtimestamp(response.tx_time)
- except:
- return datetime.now()
- def uart_read(port, baudrate=9600, bytes_to_read=240):
- try:
- with serial.Serial(port, baudrate, timeout=1) as ser:
- data = ser.read(bytes_to_read)
- return data.decode('utf-8').strip()
- except Exception as e:
- print(f"UART error: {e}")
- return None
- def init_db(db_path):
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS uart_data (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL,
- line_num INTEGER NOT NULL,
- data TEXT NOT NULL
- )
- ''')
- conn.commit()
- return conn
- def save_to_db(conn, data_chunks):
- cursor = conn.cursor()
- timestamp = datetime.now().isoformat()
- for i, chunk in enumerate(data_chunks, 1):
- cursor.execute('''
- INSERT INTO uart_data (timestamp, line_num, data)
- VALUES (?, ?, ?)
- ''', (timestamp, i, chunk))
- conn.commit()
- def main():
- # Конфигурация
- UART_PORT = '/dev/ttyS0' # Измените на нужный порт
- DB_PATH = '/var/lib/uart_data.db' # Путь к базе данных
- # Инициализация
- conn = init_db(DB_PATH)
- # Основной цикл
- while True:
- try:
- # Получаем время
- current_time = get_ntp_time()
- print(f"Current time: {current_time}")
- # Читаем данные с UART
- data = uart_read(UART_PORT)
- if data and len(data) == 240:
- # Разбиваем на 4 строки по 60 байт
- chunks = [data[i:i+60] for i in range(0, 240, 60)]
- # Сохраняем в базу
- save_to_db(conn, chunks)
- print("Data saved to database")
- else:
- print("Invalid or no data received")
- # Ждем перед следующей итерацией
- time.sleep(60)
- except KeyboardInterrupt:
- print("Shutting down...")
- conn.close()
- sys.exit(0)
- except Exception as e:
- print(f"Error: {e}")
- time.sleep(10)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement