Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- import time
- import threading
- from collections import deque
- from datetime import datetime
- import socket
- import os
- import re
- # ================== KONFIGURACJA ==================
- CAMERAS = {
- "KAMERA1": "rtsp://admin:[email protected]:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif",
- }
- PRE_SECONDS = 10 # ile sekund PRZED triggerem w buforze
- POST_SECONDS = 15 # ile sekund PO triggerze nagrywać
- FPS_DEFAULT = 25 # jeśli kamera nie poda sensownego FPS
- OUTPUT_DIR = r"D:\wideo"
- RETENTION_DAYS = 1 # na testy, potem np. 30
- TCP_HOST = "0.0.0.0"
- TCP_PORT = 5000
- # ================== POMOCNICZE ==================
- def ensure_output_dir():
- os.makedirs(OUTPUT_DIR, exist_ok=True)
- def cleanup_old_files():
- now = time.time()
- cutoff = now - RETENTION_DAYS * 24 * 3600
- for filename in os.listdir(OUTPUT_DIR):
- filepath = os.path.join(OUTPUT_DIR, filename)
- if os.path.isfile(filepath):
- if os.path.getmtime(filepath) < cutoff:
- print(f"[CLEANUP] Usuwam stary plik: {filename}")
- os.remove(filepath)
- def cleanup_worker():
- while True:
- cleanup_old_files()
- time.sleep(24 * 3600)
- def sanitize_part_code(part_code: str) -> str:
- part_code = part_code.strip()
- if not part_code:
- part_code = "NO_CODE"
- return re.sub(r"[^0-9A-Za-z_-]", "_", part_code)
- # ================== KAMERA – BUFOR + TRIGGER (LICZONY NA KLATKACH) ==================
- class CameraRecorder(threading.Thread):
- def __init__(self, name, url):
- super().__init__(daemon=True)
- self.name = name
- self.url = url
- self.cap = None
- self._open_stream()
- fps = self.cap.get(cv2.CAP_PROP_FPS) if self.cap else 0
- if fps <= 0 or fps > 120:
- fps = FPS_DEFAULT
- self.fps = fps
- self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 640)
- self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 480)
- # ile klatek trzymamy w buforze
- self.pre_frames = int(self.fps * PRE_SECONDS)
- # ile klatek nagrywamy po triggerze
- self.post_frames_target = int(self.fps * POST_SECONDS)
- # bufor z ostatnimi klatkami
- self.buffer = deque(maxlen=self.pre_frames)
- self.lock = threading.Lock()
- # stan nagrywania
- self.recording = False
- self.writer = None
- self.post_frames_left = 0 # ile klatek jeszcze zapisać po triggerze
- # do reconnectów
- self.fail_count = 0
- self.max_fail_before_reopen = int(self.fps * 2) # ok. 2 sekundy błędów z rzędu
- print(
- f"[{self.name}] Start. {self.width}x{self.height} @ {self.fps} fps, "
- f"bufor={self.pre_frames} klatek, post={self.post_frames_target} klatek"
- )
- def _open_stream(self):
- if self.cap:
- self.cap.release()
- print(f"[{self.name}] Otwieram RTSP...")
- # ograniczamy buforowanie po stronie OpenCV (jeśli jest obsługiwane)
- self.cap = cv2.VideoCapture(self.url)
- try:
- self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- except Exception:
- pass
- if not self.cap.isOpened():
- print(f"[{self.name}] ❌ Nie można otworzyć strumienia RTSP!")
- def _start_new_writer(self, part_code: str, use_prebuffer: bool):
- """
- Tworzy nowy plik:
- - use_prebuffer=True -> przed bieżącymi klatkami dopisuje cały bufor (ostatnie PRE_SECONDS)
- - use_prebuffer=False -> tylko od TERAZ, bez cofania
- Nazwa: RRRR_MM_DD_GG_MM_SS_KOD_KAMERA.mp4
- """
- now = time.time()
- ts_name = datetime.fromtimestamp(now).strftime("%Y_%m_%d_%H_%M_%S")
- safe_code = sanitize_part_code(part_code)
- filename = f"{ts_name}_{safe_code}_{self.name}.mp4"
- filepath = os.path.join(OUTPUT_DIR, filename)
- print(f"[{self.name}] Start nagrania: {filename} | prebuffer={use_prebuffer}")
- writer = cv2.VideoWriter(
- filepath,
- cv2.VideoWriter_fourcc(*"mp4v"),
- self.fps,
- (self.width, self.height)
- )
- if not writer.isOpened():
- print(f"[{self.name}] ❌ Nie można otworzyć pliku do zapisu!")
- return None
- if use_prebuffer:
- frames = list(self.buffer) # snapshot, żeby nie iterować po żywym deque
- print(f"[{self.name}] Zapis pre-buffer: {len(frames)} klatek")
- for frame in frames:
- writer.write(frame)
- self.recording = True
- self.writer = writer
- self.post_frames_left = self.post_frames_target # koniec = post_frames_left == 0
- return writer
- def trigger(self, part_code: str):
- with self.lock:
- if self.recording and self.writer:
- # przerywamy stare nagranie natychmiast
- print(f"[{self.name}] Kończę poprzednie nagranie (nowy trigger).")
- try:
- self.writer.release()
- except Exception:
- pass
- self.writer = None
- self.recording = False
- self.post_frames_left = 0
- # nowe nagranie bez pre-buffer
- self._start_new_writer(part_code, use_prebuffer=False)
- else:
- # pierwsze nagranie po przerwie – z PRE_SECONDS wstecz
- self._start_new_writer(part_code, use_prebuffer=True)
- if self.writer:
- print(
- f"[{self.name}] Nagrywanie: pre={self.pre_frames} klatek, "
- f"post={self.post_frames_target} klatek"
- )
- else:
- print(f"[{self.name}] ❌ Nie udało się rozpocząć nagrania.")
- def run(self):
- while True:
- if not self.cap or not self.cap.isOpened():
- self._open_stream()
- time.sleep(1)
- continue
- ret, frame = self.cap.read()
- if not ret:
- # nie panikujemy od razu, dajemy trochę błędów pod rząd
- self.fail_count += 1
- if self.fail_count >= self.max_fail_before_reopen:
- print(f"[{self.name}] Za dużo błędów z rzędu, ponowne otwarcie RTSP.")
- self._open_stream()
- self.fail_count = 0
- # krótka pauza, ale bardzo mała
- time.sleep(0.01)
- continue
- self.fail_count = 0 # mamy poprawną klatkę
- with self.lock:
- # zawsze aktualizujemy bufor PRE_SECONDS
- self.buffer.append(frame.copy())
- if self.recording and self.writer:
- # zapisujemy bieżącą klatkę
- self.writer.write(frame)
- self.post_frames_left -= 1
- if self.post_frames_left <= 0:
- print(f"[{self.name}] Koniec nagrania (wyczerpane post-klatki).")
- try:
- self.writer.release()
- except Exception:
- pass
- self.writer = None
- self.recording = False
- self.post_frames_left = 0
- # ================== TCP SERVER ==================
- def tcp_server(cameras):
- srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- srv.bind((TCP_HOST, TCP_PORT))
- srv.listen(5)
- print(f"[TCP] Nasłuch na {TCP_HOST}:{TCP_PORT}")
- while True:
- conn, addr = srv.accept()
- with conn:
- try:
- data = conn.recv(1024)
- if not data:
- continue
- code = data.decode(errors="ignore").strip()
- print(f"[TCP] Kod części: {code!r} od {addr}")
- if not code:
- continue
- for cam in cameras:
- cam.trigger(code)
- except Exception as e:
- print("[TCP] Błąd:", e)
- # ================== MAIN ==================
- def main():
- ensure_output_dir()
- cameras = []
- for name, url in CAMERAS.items():
- cam = CameraRecorder(name, url)
- cam.start()
- cameras.append(cam)
- threading.Thread(target=cleanup_worker, daemon=True).start()
- threading.Thread(target=tcp_server, args=(cameras,), daemon=True).start()
- print("=== SYSTEM NAGRYWANIA (KLATKOWY, ZMINIMALIZOWANE SKIPY) URUCHOMIONY ===")
- print("Katalog:", OUTPUT_DIR)
- print(f"Pre: {PRE_SECONDS} s, Post: {POST_SECONDS} s, Retencja: {RETENTION_DAYS} dni\n")
- while True:
- time.sleep(1)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment