Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- import time
- import threading
- from collections import deque
- from datetime import datetime
- import socket
- import os
- import re
- # ================== KONFIGURACJA ==================
- CAMERAS = {
- # nazwa_kamery : URL RTSP
- "KAMERA1": "rtsp://admin:[email protected]:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif",
- # w przyszłości możesz dodać np.:
- # "KAMERA2": "rtsp://admin:[email protected]:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif",
- }
- PRE_SECONDS = 20 # ile sekund wstecz ma być w nagraniu (dla pierwszego nagrania po przerwie)
- POST_SECONDS = 60 # ile sekund po triggerze nagrywać
- FPS_DEFAULT = 25 # używane gdy kamera nie zwraca sensownego FPS
- # ŚCIEŻKA NA DYSK USB – ZMIEŃ LITERĘ DYSKU JEŚLI POTRZEBA:
- OUTPUT_DIR = r"E:\Nagrania"
- RETENTION_DAYS = 30 # ile dni trzymać nagrania, starsze są kasowane
- TCP_HOST = "0.0.0.0" # nasłuch na wszystkich interfejsach
- TCP_PORT = 5000 # port, na który HMI wysyła kod części
- # ================== POMOCNICZE ==================
- def ensure_output_dir():
- os.makedirs(OUTPUT_DIR, exist_ok=True)
- def cleanup_old_files():
- """
- Usuwa pliki starsze niż RETENTION_DAYS w OUTPUT_DIR.
- """
- now = time.time()
- cutoff = now - RETENTION_DAYS * 24 * 3600
- try:
- for filename in os.listdir(OUTPUT_DIR):
- filepath = os.path.join(OUTPUT_DIR, filename)
- if os.path.isfile(filepath):
- mtime = os.path.getmtime(filepath)
- if mtime < cutoff:
- print(f"[CLEANUP] Usuwam stary plik: {filename}")
- os.remove(filepath)
- except Exception as e:
- print(f"[CLEANUP] Błąd podczas czyszczenia: {e}")
- def cleanup_worker():
- """
- Wątek, który co 24h uruchamia cleanup_old_files().
- """
- while True:
- cleanup_old_files()
- time.sleep(24 * 3600) # 24 godziny
- def sanitize_part_code(part_code: str) -> str:
- """
- Czyści kod części tak, by nadawał się do nazwy pliku.
- Zostawiamy tylko cyfry, litery, _, -.
- """
- part_code = part_code.strip()
- if not part_code:
- part_code = "NO_CODE"
- safe = re.sub(r"[^0-9A-Za-z_-]", "_", part_code)
- return safe
- # ================== KLASA DO OBSŁUGI KAMERY ==================
- class CameraRecorder(threading.Thread):
- def __init__(self, name, url):
- super().__init__(daemon=True)
- self.name = name
- self.url = url
- self.cap = None
- self._open_stream()
- fps = self.cap.get(cv2.CAP_PROP_FPS) if self.cap else 0
- if fps <= 0 or fps > 120:
- fps = FPS_DEFAULT
- self.fps = fps
- self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 640)
- self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 480)
- maxlen = int(self.fps * PRE_SECONDS)
- self.buffer = deque(maxlen=maxlen)
- self.recording = False # czy aktualnie nagrywamy post-trigger
- self.record_end_time = 0.0
- self.writer = None
- self.lock = threading.Lock()
- print(f"[{self.name}] Start. FPS={self.fps}, {self.width}x{self.height}, bufor={maxlen} klatek")
- def _open_stream(self):
- if self.cap is not None:
- self.cap.release()
- print(f"[{self.name}] Otwieram strumień RTSP: {self.url}")
- self.cap = cv2.VideoCapture(self.url)
- if not self.cap.isOpened():
- print(f"[{self.name}] ❌ Nie można otworzyć strumienia!")
- def _start_new_writer(self, part_code: str, use_prebuffer: bool):
- """
- Tworzy nowy plik wideo:
- - use_prebuffer = True -> zapisuje bufor (-20 s)
- - use_prebuffer = False -> nagrywa tylko od triggera (bez cofania)
- Nazwa pliku: RRRR_MM_DD_GG_MM_SS_KODCZESCI_KAMERA.mp4
- """
- now = time.time()
- timestamp_str = datetime.fromtimestamp(now).strftime("%Y_%m_%d_%H_%M_%S")
- safe_code = sanitize_part_code(part_code)
- filename = f"{timestamp_str}_{safe_code}_{self.name}.mp4"
- filepath = os.path.join(OUTPUT_DIR, filename)
- print(f"[{self.name}] Nowe nagranie: część={part_code}, plik={filename}, prebuffer={use_prebuffer}")
- writer = cv2.VideoWriter(
- filepath,
- cv2.VideoWriter_fourcc(*"mp4v"),
- self.fps,
- (self.width, self.height)
- )
- if not writer.isOpened():
- print(f"[{self.name}] ❌ Nie można otworzyć pliku do zapisu!")
- return None
- # jeśli mamy użyć bufora wstecznego (-20 s)
- if use_prebuffer:
- print(f"[{self.name}] Zapis bufora (~{PRE_SECONDS} s wstecz, {len(self.buffer)} klatek)...")
- for (_t_frame, frame) in self.buffer:
- writer.write(frame)
- # ustaw parametry nagrywania post-trigger
- self.recording = True
- self.record_end_time = now + POST_SECONDS
- return writer
- def trigger(self, part_code: str):
- """
- Wywoływane, gdy przychodzi trigger z kodem części.
- Logika:
- - jeżeli NIE nagrywamy -> nagranie z buforem (-20 s) + 60 s w przód
- - jeżeli JUŻ nagrywamy -> kończymy poprzednie, nowe nagranie bez bufora,
- 60 s w przód od TEGO momentu.
- Dzięki temu pliki czasowo NIE nachodzą na siebie.
- """
- with self.lock:
- if self.recording and self.writer is not None:
- # trwa nagranie poprzedniej części -> przerywamy je w tym momencie
- print(f"[{self.name}] Przerywam poprzednie nagranie i zamykam plik.")
- try:
- self.writer.release()
- except Exception:
- pass
- self.writer = None
- self.recording = False
- # nowe nagranie -> BEZ bufora (-20), tylko od TEGO momentu
- self.writer = self._start_new_writer(part_code, use_prebuffer=False)
- else:
- # nie nagrywaliśmy nic -> pierwsze nagranie po przerwie
- # nagranie z buforem (-20 s)
- self.writer = self._start_new_writer(part_code, use_prebuffer=True)
- if self.writer is None:
- # nie udało się stworzyć pliku
- self.recording = False
- self.record_end_time = 0.0
- else:
- print(f"[{self.name}] Nagrywam do: {time.ctime(self.record_end_time)}")
- def run(self):
- while True:
- if self.cap is None or not self.cap.isOpened():
- print(f"[{self.name}] Próba ponownego połączenia za 1 s...")
- time.sleep(1)
- self._open_stream()
- continue
- ret, frame = self.cap.read()
- if not ret:
- print(f"[{self.name}] Brak klatki, reconnect...")
- time.sleep(1)
- self._open_stream()
- continue
- now = time.time()
- # zapis do bufora (dla potencjalnego pre-triggera)
- self.buffer.append((now, frame.copy()))
- # jeśli trwa nagrywanie post-trigger -> zapisuj bieżące klatki
- with self.lock:
- if self.recording and self.writer is not None:
- self.writer.write(frame)
- if now >= self.record_end_time:
- print(f"[{self.name}] Koniec nagrania post-trigger, zamykam plik.")
- self.recording = False
- try:
- self.writer.release()
- except Exception:
- pass
- self.writer = None
- # ================== SERWER TCP DO ODBIORU KODU CZĘŚCI ==================
- def tcp_server(camera_objects):
- """
- Prosty serwer TCP:
- - HMI łączy się z IP mini-komputera:TCP_PORT
- - wysyła tekst z kodem części + np. \n
- - serwer wywołuje trigger() dla wszystkich kamer
- """
- srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- srv.bind((TCP_HOST, TCP_PORT))
- srv.listen(5)
- print(f"[TCP] Nasłuch na {TCP_HOST}:{TCP_PORT}")
- while True:
- conn, addr = srv.accept()
- print(f"[TCP] Połączenie od: {addr}")
- with conn:
- try:
- data = conn.recv(1024)
- if not data:
- continue
- part_code = data.decode("utf-8", errors="ignore").strip()
- print(f"[TCP] Otrzymano kod części: '{part_code}'")
- if part_code:
- for cam in camera_objects:
- cam.trigger(part_code)
- except Exception as e:
- print(f"[TCP] Błąd przy obsłudze połączenia: {e}")
- # ================== MAIN ==================
- def main():
- ensure_output_dir()
- # uruchom wątki kamer
- cameras = []
- for name, url in CAMERAS.items():
- cam = CameraRecorder(name, url)
- cam.start()
- cameras.append(cam)
- # wątek czyszczenia starych plików (30 dni)
- cleaner = threading.Thread(target=cleanup_worker, daemon=True)
- cleaner.start()
- # wątek serwera TCP do odbioru kodów części
- server_thread = threading.Thread(target=tcp_server, args=(cameras,), daemon=True)
- server_thread.start()
- print("=== SYSTEM NAGRYWANIA URUCHOMIONY ===")
- print(f"Katalog nagrań: {OUTPUT_DIR}")
- print(f"Retencja: {RETENTION_DAYS} dni")
- print(f"Pre-trigger: {PRE_SECONDS} s, Post-trigger: {POST_SECONDS} s")
- print(f"Nasłuch TCP na porcie: {TCP_PORT}\n")
- # główna pętla utrzymuje proces przy życiu
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- print("Zamykanie...")
- if __name__ == "__main__":
- main()
Advertisement
Comments
-
Comment was deleted
-
Comment was deleted
-
- import socket
- IP = "192.168.1.100" # IP komputera z recorder.py
- PORT = 5000
- code = input("Podaj kod części: ")
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.connect((IP, PORT))
- s.sendall((code + "\n").encode("utf-8"))
- print("Wysłano.")
Add Comment
Please, Sign In to add comment