Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- import time
- import threading
- from collections import deque
- from datetime import datetime
- import socket
- import os
- import re
- # ================== KONFIGURACJA ==================
- CAMERAS = {
- "KAMERA1": "rtsp://admin:[email protected]:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif",
- }
- PRE_SECONDS = 10 # ile sekund PRZED triggerem ma być w nagraniu
- POST_SECONDS = 15 # ile sekund PO triggerze ma być w nagraniu
- FPS_DEFAULT = 25 # stały fps do pliku
- OUTPUT_DIR = r"D:\wideo"
- RETENTION_DAYS = 1 # na testy 1 dzień, potem możesz dać 30
- TCP_HOST = "0.0.0.0"
- TCP_PORT = 5000
- # ================== POMOCNICZE ==================
- def ensure_output_dir():
- os.makedirs(OUTPUT_DIR, exist_ok=True)
- def cleanup_old_files():
- now = time.time()
- cutoff = now - RETENTION_DAYS * 24 * 3600
- for filename in os.listdir(OUTPUT_DIR):
- filepath = os.path.join(OUTPUT_DIR, filename)
- if os.path.isfile(filepath):
- if os.path.getmtime(filepath) < cutoff:
- print(f"[CLEANUP] Usuwam stary plik: {filename}")
- os.remove(filepath)
- def cleanup_worker():
- while True:
- cleanup_old_files()
- time.sleep(24 * 3600)
- def sanitize_part_code(part_code: str) -> str:
- part_code = part_code.strip()
- if not part_code:
- part_code = "NO_CODE"
- return re.sub(r"[^0-9A-Za-z_-]", "_", part_code)
- # ================== KAMERA ==================
- class CameraRecorder(threading.Thread):
- def __init__(self, name, url):
- super().__init__(daemon=True)
- self.name = name
- self.url = url
- self.lock = threading.Lock()
- self.cap = None
- self._open_stream()
- self.fps = FPS_DEFAULT
- self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 640)
- self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 480)
- # bufor: (timestamp, frame)
- self.buffer = deque()
- self.buffer_max_seconds = PRE_SECONDS + 2 # mały zapas
- self.recording = False
- self.record_end_time = 0.0
- self.writer = None
- print(f"[{self.name}] Start. W={self.width} H={self.height}, fps={self.fps}")
- def _open_stream(self):
- if self.cap:
- self.cap.release()
- print(f"[{self.name}] Otwieram RTSP...")
- self.cap = cv2.VideoCapture(self.url)
- if not self.cap.isOpened():
- print(f"[{self.name}] ❌ Nie można otworzyć strumienia RTSP!")
- def _trim_buffer(self, now: float):
- cutoff = now - self.buffer_max_seconds
- while self.buffer and self.buffer[0][0] < cutoff:
- self.buffer.popleft()
- def _start_new_writer(self, part_code: str, use_prebuffer: bool):
- """
- Tworzy nowy plik i:
- - jeśli use_prebuffer=True -> zapisuje dokładnie PRE_SECONDS wstecz,
- a POTEM ustawia record_end_time = teraz + POST_SECONDS
- - jeśli use_prebuffer=False -> od razu ustawia record_end_time = teraz + POST_SECONDS
- """
- # czas wyzwolenia do nazwy pliku
- trigger_time_for_name = time.time()
- ts_name = datetime.fromtimestamp(trigger_time_for_name).strftime("%Y_%m_%d_%H_%M_%S")
- code = sanitize_part_code(part_code)
- filename = f"{ts_name}_{code}_{self.name}.mp4"
- filepath = os.path.join(OUTPUT_DIR, filename)
- print(f"[{self.name}] Start nagrania: {filename} | pre={use_prebuffer}")
- writer = cv2.VideoWriter(
- filepath,
- cv2.VideoWriter_fourcc(*"mp4v"),
- self.fps,
- (self.width, self.height)
- )
- if not writer.isOpened():
- print(f"[{self.name}] ❌ Błąd otwierania pliku!")
- return None
- # najpierw zapisujemy pre-buffer (10s wstecz)
- if use_prebuffer:
- # cutoff liczymy od momentu triggera (sprzed chwili)
- cutoff = trigger_time_for_name - PRE_SECONDS
- frames_to_write = [frame for (t, frame) in list(self.buffer) if t >= cutoff]
- print(f"[{self.name}] Zapisuję pre-buffer: {len(frames_to_write)} klatek (~{PRE_SECONDS}s)")
- for frame in frames_to_write:
- writer.write(frame)
- # UWAGA: koniec nagrywania liczymy OD TEGO MOMENTU,
- # czyli po zapisaniu pre-buffer (żeby nie ucinało po 5s).
- self.recording = True
- self.record_end_time = time.time() + POST_SECONDS
- return writer
- def trigger(self, part_code: str):
- with self.lock:
- now = time.time()
- if self.recording and self.writer:
- print(f"[{self.name}] Kończę poprzednie nagranie (nowy trigger).")
- try:
- self.writer.release()
- except Exception:
- pass
- self.writer = None
- self.recording = False
- # nowe nagranie BEZ pre-buffer
- self.writer = self._start_new_writer(part_code, use_prebuffer=False)
- else:
- # pierwsze / po przerwie – z pre-buffer
- self.writer = self._start_new_writer(part_code, use_prebuffer=True)
- if self.writer:
- print(f"[{self.name}] Nagrywam do: {time.ctime(self.record_end_time)}")
- else:
- print(f"[{self.name}] ❌ Nie udało się rozpocząć nagrania.")
- def run(self):
- while True:
- if not self.cap.isOpened():
- time.sleep(1)
- self._open_stream()
- continue
- ret, frame = self.cap.read()
- if not ret:
- time.sleep(0.5)
- self._open_stream()
- continue
- now = time.time()
- with self.lock:
- # aktualizacja bufora
- self.buffer.append((now, frame.copy()))
- self._trim_buffer(now)
- # zapis post-trigger
- if self.recording and self.writer:
- self.writer.write(frame)
- if now >= self.record_end_time:
- print(f"[{self.name}] Koniec nagrania (post {POST_SECONDS}s).")
- try:
- self.writer.release()
- except Exception:
- pass
- self.writer = None
- self.recording = False
- # ================== TCP SERVER ==================
- def tcp_server(cameras):
- srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- srv.bind((TCP_HOST, TCP_PORT))
- srv.listen(5)
- print(f"[TCP] Nasłuch na {TCP_HOST}:{TCP_PORT}")
- while True:
- conn, addr = srv.accept()
- with conn:
- try:
- data = conn.recv(1024)
- if not data:
- continue
- code = data.decode(errors="ignore").strip()
- print(f"[TCP] Kod części: {code!r}")
- if not code:
- continue
- for cam in cameras:
- cam.trigger(code)
- except Exception as e:
- print("[TCP] Błąd:", e)
- # ================== MAIN ==================
- def main():
- ensure_output_dir()
- cameras = []
- for name, url in CAMERAS.items():
- cam = CameraRecorder(name, url)
- cam.start()
- cameras.append(cam)
- threading.Thread(target=cleanup_worker, daemon=True).start()
- threading.Thread(target=tcp_server, args=(cameras,), daemon=True).start()
- print("=== SYSTEM NAGRYWANIA URUCHOMIONY ===")
- print("Katalog:", OUTPUT_DIR)
- print(f"Pre: {PRE_SECONDS} s, Post: {POST_SECONDS} s")
- print(f"Retencja: {RETENTION_DAYS} dni\n")
- while True:
- time.sleep(1)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment