Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- import time
- import threading
- from collections import deque
- from datetime import datetime
- import socket
- import os
- import re
- import queue
- # ================== KONFIGURACJA ==================
- CAMERAS = {
- "KAMERA1": "rtsp://admin:[email protected]:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif",
- }
- PRE_SECONDS = 10 # ile sekund PRZED triggerem trzymamy W TEORII (realnie: max tyle, ale nie wcześniej niż koniec poprzedniego nagrania)
- POST_SECONDS = 15 # ile sekund PO triggerze nagrywamy (max)
- FPS_DEFAULT = 25 # używane tylko do VideoWriter
- OUTPUT_DIR = r"D:\wideo"
- RETENTION_DAYS = 1 # na testy, potem np. 30
- TCP_HOST = "0.0.0.0"
- TCP_PORT = 5000
- # ================== POMOCNICZE ==================
- def ensure_output_dir():
- os.makedirs(OUTPUT_DIR, exist_ok=True)
- def cleanup_old_files():
- now = time.time()
- cutoff = now - RETENTION_DAYS * 24 * 3600
- for filename in os.listdir(OUTPUT_DIR):
- filepath = os.path.join(OUTPUT_DIR, filename)
- if os.path.isfile(filepath):
- if os.path.getmtime(filepath) < cutoff:
- print(f"[CLEANUP] Usuwam stary plik: {filename}")
- os.remove(filepath)
- def cleanup_worker():
- while True:
- cleanup_old_files()
- time.sleep(24 * 3600)
- def sanitize_part_code(part_code: str) -> str:
- part_code = part_code.strip()
- if not part_code:
- part_code = "NO_CODE"
- return re.sub(r"[^0-9A-Za-z_-]", "_", part_code)
- def build_filename(part_code: str, cam_name: str) -> str:
- now = time.time()
- ts_name = datetime.fromtimestamp(now).strftime("%Y_%m_%d_%H_%M_%S")
- safe_code = sanitize_part_code(part_code)
- filename = f"{ts_name}_{safe_code}_{cam_name}.mp4"
- return os.path.join(OUTPUT_DIR, filename)
- # ================== WĄTEK ZAPISU WIDEO ==================
- class WriterWorker(threading.Thread):
- """
- Jeden wątek, który TYLKO zapisuje wideo dla jednej kamery.
- Capture nigdy nie stoi na czasie zapisu/pre-bufora.
- """
- def __init__(self, name, q, fps, width, height):
- super().__init__(daemon=True)
- self.name = name
- self.q = q
- self.fps = fps
- self.width = width
- self.height = height
- def run(self):
- writer = None
- fourcc = cv2.VideoWriter_fourcc(*"mp4v")
- while True:
- cmd = self.q.get()
- if cmd is None:
- break
- typ = cmd.get("cmd")
- if typ == "open":
- filepath = cmd["filepath"]
- if writer is not None:
- try:
- writer.release()
- except Exception:
- pass
- writer = None
- print(f"[{self.name}][WRITER] Otwieram plik: {os.path.basename(filepath)}")
- writer = cv2.VideoWriter(
- filepath,
- fourcc,
- self.fps,
- (self.width, self.height)
- )
- if not writer.isOpened():
- print(f"[{self.name}][WRITER] ❌ Nie można otworzyć pliku do zapisu!")
- writer = None
- elif typ == "frame":
- frame = cmd["frame"]
- if writer is not None:
- writer.write(frame)
- elif typ == "close":
- if writer is not None:
- print(f"[{self.name}][WRITER] Zamykam plik.")
- try:
- writer.release()
- except Exception:
- pass
- writer = None
- elif typ == "stop":
- if writer is not None:
- try:
- writer.release()
- except Exception:
- pass
- break
- # ================== KAMERA – BUFOR + TRIGGER (LOGIKA JAK OPISAŁEŚ) ==================
- class CameraRecorder(threading.Thread):
- def __init__(self, name, url):
- super().__init__(daemon=True)
- self.name = name
- self.url = url
- self.cap = None
- self._open_stream()
- fps = self.cap.get(cv2.CAP_PROP_FPS) if self.cap else 0
- if fps <= 0 or fps > 120:
- fps = FPS_DEFAULT
- self.fps = fps
- self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 640)
- self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 480)
- # bufor czasowy (timestamp, frame) – UŻYWANY TYLKO gdy NIE nagrywamy
- self.buffer = deque()
- # stan nagrywania (logika czasu)
- self.recording = False
- self.record_end_time = 0.0
- self.last_record_end_time = 0.0 # kiedy zakończyło się ostatnie nagranie
- # reconnect
- self.fail_count = 0
- self.max_fail_before_reopen = int(self.fps * 2) if self.fps > 0 else 50
- # trigger z innego wątku
- self.lock = threading.Lock()
- self.pending_trigger_code = None # string lub None
- # kolejka do wątku zapisującego (limit żeby nie zjechać RAM-em)
- self.writer_queue = queue.Queue(maxsize=300)
- self.writer_thread = WriterWorker(self.name, self.writer_queue, self.fps, self.width, self.height)
- self.writer_thread.start()
- print(
- f"[{self.name}] Start. {self.width}x{self.height} @ {self.fps} fps, "
- f"PRE={PRE_SECONDS}s, POST={POST_SECONDS}s"
- )
- # ======== RTSP ========
- def _open_stream(self):
- if self.cap:
- self.cap.release()
- print(f"[{self.name}] Otwieram RTSP...")
- self.cap = cv2.VideoCapture(self.url)
- try:
- self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- except Exception:
- pass
- if not self.cap.isOpened():
- print(f"[{self.name}] ❌ Nie można otworzyć strumienia RTSP!")
- # ======== komunikacja z writerem ========
- def _writer_open(self, filepath):
- try:
- self.writer_queue.put({"cmd": "open", "filepath": filepath}, timeout=0.5)
- except queue.Full:
- print(f"[{self.name}][WRITER] Kolejka pełna przy open().")
- def _writer_frame(self, frame):
- try:
- self.writer_queue.put({"cmd": "frame", "frame": frame}, timeout=0.01)
- except queue.Full:
- # lepiej zgubić klatkę niż zabić RAM i mieć lagi
- pass
- def _writer_close(self):
- try:
- self.writer_queue.put({"cmd": "close"}, timeout=0.5)
- except queue.Full:
- print(f"[{self.name}][WRITER] Kolejka pełna przy close().")
- # ======== stan nagrywania ========
- def _close_recording(self, now_ts: float):
- if self.recording:
- self._writer_close()
- self.recording = False
- self.record_end_time = 0.0
- self.last_record_end_time = now_ts # od tego momentu liczymy ewentualny pre przy następnym triggerze
- def _start_recording(self, part_code: str, use_prebuffer: bool, now_ts: float):
- filepath = build_filename(part_code, self.name)
- print(
- f"[{self.name}] START nagrania: {os.path.basename(filepath)} | "
- f"prebuffer={'TAK' if use_prebuffer else 'NIE'}"
- )
- self._writer_open(filepath)
- if use_prebuffer:
- # Bierzemy KLATKI:
- # - nie starsze niż PRE_SECONDS
- # - nie starsze niż koniec poprzedniego nagrania (żeby nie było zakładek)
- frames = [
- f for (t, f) in self.buffer
- if (now_ts - t <= PRE_SECONDS) and (t >= self.last_record_end_time)
- ]
- print(
- f"[{self.name}] Wysyłam pre-bufor do writer'a: {len(frames)} klatek "
- f"(realnie od {max(self.last_record_end_time, now_ts - PRE_SECONDS):.3f}s)."
- )
- for f in frames:
- self._writer_frame(f)
- # Po użyciu pre nie ma sensu trzymać starego bufora – czyścimy
- self.buffer.clear()
- self.recording = True
- self.record_end_time = now_ts + POST_SECONDS
- print(
- f"[{self.name}] Nagrywanie ustawione do {self.record_end_time:.3f} (POST={POST_SECONDS}s)."
- )
- # ======== trigger z zewnątrz ========
- def trigger(self, part_code: str):
- safe_code = sanitize_part_code(part_code)
- with self.lock:
- self.pending_trigger_code = safe_code
- print(f"[{self.name}] Otrzymano trigger z kodem: {safe_code}")
- # ======== główna pętla kamery ========
- def run(self):
- while True:
- if not self.cap or not self.cap.isOpened():
- self._open_stream()
- time.sleep(1)
- continue
- ret, frame = self.cap.read()
- if not ret:
- self.fail_count += 1
- if self.fail_count >= self.max_fail_before_reopen:
- print(f"[{self.name}] Za dużo błędów z rzędu, ponowne otwarcie RTSP.")
- self._open_stream()
- self.fail_count = 0
- time.sleep(0.01)
- continue
- self.fail_count = 0
- now_ts = time.time()
- # ====== BUFOR TYLKO GDY NIE NAGRYWAMY ======
- if not self.recording:
- # dodajemy klatkę do bufora (bez .copy(), oszczędzamy RAM)
- self.buffer.append((now_ts, frame))
- # usuwamy to, co starsze niż PRE_SECONDS lub starsze niż koniec ostatniego nagrania
- while self.buffer and (
- (now_ts - self.buffer[0][0] > PRE_SECONDS) or
- (self.buffer[0][0] < self.last_record_end_time)
- ):
- self.buffer.popleft()
- # ====== SPRAWDZAMY TRIGGER ======
- with self.lock:
- trigger_code = self.pending_trigger_code
- self.pending_trigger_code = None
- if trigger_code is not None:
- if self.recording:
- # TRIGGER W TRAKCIE NAGRANIA -> KOŃCZYMY TO NAGRANIE, NOWE BEZ PRE
- print(f"[{self.name}] Trigger w trakcie nagrania -> kończę poprzedni plik, start nowego bez pre.")
- self._close_recording(now_ts)
- use_prebuffer = False
- else:
- # BYLIŚMY IDLE -> można użyć PRE (ale max do końca poprzedniego nagrania)
- use_prebuffer = True
- self._start_recording(trigger_code, use_prebuffer, now_ts)
- # ====== ZAPIS KLATEK JEŚLI NAGRYWAMY ======
- if self.recording:
- self._writer_frame(frame)
- if now_ts >= self.record_end_time:
- print(f"[{self.name}] Koniec nagrania (minęło POST_SECONDS).")
- self._close_recording(now_ts)
- # ================== TCP SERVER ==================
- def tcp_server(cameras):
- srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- srv.bind((TCP_HOST, TCP_PORT))
- srv.listen(5)
- print(f"[TCP] Nasłuch na {TCP_HOST}:{TCP_PORT}")
- while True:
- conn, addr = srv.accept()
- with conn:
- try:
- data = conn.recv(1024)
- if not data:
- continue
- code = data.decode(errors="ignore").strip()
- print(f"[TCP] Kod części: {code!r} od {addr}")
- if not code:
- continue
- for cam in cameras:
- cam.trigger(code)
- except Exception as e:
- print("[TCP] Błąd:", e)
- # ================== MAIN ==================
- def main():
- ensure_output_dir()
- cameras = []
- for name, url in CAMERAS.items():
- cam = CameraRecorder(name, url)
- cam.start()
- cameras.append(cam)
- threading.Thread(target=cleanup_worker, daemon=True).start()
- threading.Thread(target=tcp_server, args=(cameras,), daemon=True).start()
- print("=== SYSTEM NAGRYWANIA (PRE TYLKO PO IDLE, BEZ ZAKŁADEK, WRITER THREAD) URUCHOMIONY ===")
- print("Katalog:", OUTPUT_DIR)
- print(f"Pre max: {PRE_SECONDS} s, Post max: {POST_SECONDS} s, Retencja: {RETENTION_DAYS} dni\n")
- while True:
- time.sleep(1)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment