wiktortokumpel

wesioa 17.11.12

Nov 17th, 2025 (edited)
29
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import cv2
  2. import time
  3. import threading
  4. from collections import deque
  5. from datetime import datetime
  6. import socket
  7. import os
  8. import re
  9. import queue
  10.  
  11. # ================== KONFIGURACJA ==================
  12.  
  13. CAMERAS = {
  14. "KAMERA1": "rtsp://admin:[email protected]:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif",
  15. }
  16.  
  17. PRE_SECONDS = 10 # ile sekund PRZED triggerem trzymamy W TEORII (realnie: max tyle, ale nie wcześniej niż koniec poprzedniego nagrania)
  18. POST_SECONDS = 15 # ile sekund PO triggerze nagrywamy (max)
  19. FPS_DEFAULT = 25 # używane tylko do VideoWriter
  20.  
  21. OUTPUT_DIR = r"D:\wideo"
  22. RETENTION_DAYS = 1 # na testy, potem np. 30
  23.  
  24. TCP_HOST = "0.0.0.0"
  25. TCP_PORT = 5000
  26.  
  27.  
  28. # ================== POMOCNICZE ==================
  29.  
  30. def ensure_output_dir():
  31. os.makedirs(OUTPUT_DIR, exist_ok=True)
  32.  
  33.  
  34. def cleanup_old_files():
  35. now = time.time()
  36. cutoff = now - RETENTION_DAYS * 24 * 3600
  37. for filename in os.listdir(OUTPUT_DIR):
  38. filepath = os.path.join(OUTPUT_DIR, filename)
  39. if os.path.isfile(filepath):
  40. if os.path.getmtime(filepath) < cutoff:
  41. print(f"[CLEANUP] Usuwam stary plik: {filename}")
  42. os.remove(filepath)
  43.  
  44.  
  45. def cleanup_worker():
  46. while True:
  47. cleanup_old_files()
  48. time.sleep(24 * 3600)
  49.  
  50.  
  51. def sanitize_part_code(part_code: str) -> str:
  52. part_code = part_code.strip()
  53. if not part_code:
  54. part_code = "NO_CODE"
  55. return re.sub(r"[^0-9A-Za-z_-]", "_", part_code)
  56.  
  57.  
  58. def build_filename(part_code: str, cam_name: str) -> str:
  59. now = time.time()
  60. ts_name = datetime.fromtimestamp(now).strftime("%Y_%m_%d_%H_%M_%S")
  61. safe_code = sanitize_part_code(part_code)
  62. filename = f"{ts_name}_{safe_code}_{cam_name}.mp4"
  63. return os.path.join(OUTPUT_DIR, filename)
  64.  
  65.  
  66. # ================== WĄTEK ZAPISU WIDEO ==================
  67.  
  68. class WriterWorker(threading.Thread):
  69. """
  70. Jeden wątek, który TYLKO zapisuje wideo dla jednej kamery.
  71. Capture nigdy nie stoi na czasie zapisu/pre-bufora.
  72. """
  73. def __init__(self, name, q, fps, width, height):
  74. super().__init__(daemon=True)
  75. self.name = name
  76. self.q = q
  77. self.fps = fps
  78. self.width = width
  79. self.height = height
  80.  
  81. def run(self):
  82. writer = None
  83. fourcc = cv2.VideoWriter_fourcc(*"mp4v")
  84.  
  85. while True:
  86. cmd = self.q.get()
  87. if cmd is None:
  88. break
  89.  
  90. typ = cmd.get("cmd")
  91.  
  92. if typ == "open":
  93. filepath = cmd["filepath"]
  94. if writer is not None:
  95. try:
  96. writer.release()
  97. except Exception:
  98. pass
  99. writer = None
  100.  
  101. print(f"[{self.name}][WRITER] Otwieram plik: {os.path.basename(filepath)}")
  102. writer = cv2.VideoWriter(
  103. filepath,
  104. fourcc,
  105. self.fps,
  106. (self.width, self.height)
  107. )
  108. if not writer.isOpened():
  109. print(f"[{self.name}][WRITER] ❌ Nie można otworzyć pliku do zapisu!")
  110. writer = None
  111.  
  112. elif typ == "frame":
  113. frame = cmd["frame"]
  114. if writer is not None:
  115. writer.write(frame)
  116.  
  117. elif typ == "close":
  118. if writer is not None:
  119. print(f"[{self.name}][WRITER] Zamykam plik.")
  120. try:
  121. writer.release()
  122. except Exception:
  123. pass
  124. writer = None
  125.  
  126. elif typ == "stop":
  127. if writer is not None:
  128. try:
  129. writer.release()
  130. except Exception:
  131. pass
  132. break
  133.  
  134.  
  135. # ================== KAMERA – BUFOR + TRIGGER (LOGIKA JAK OPISAŁEŚ) ==================
  136.  
  137. class CameraRecorder(threading.Thread):
  138. def __init__(self, name, url):
  139. super().__init__(daemon=True)
  140. self.name = name
  141. self.url = url
  142.  
  143. self.cap = None
  144. self._open_stream()
  145.  
  146. fps = self.cap.get(cv2.CAP_PROP_FPS) if self.cap else 0
  147. if fps <= 0 or fps > 120:
  148. fps = FPS_DEFAULT
  149. self.fps = fps
  150.  
  151. self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 640)
  152. self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 480)
  153.  
  154. # bufor czasowy (timestamp, frame) – UŻYWANY TYLKO gdy NIE nagrywamy
  155. self.buffer = deque()
  156.  
  157. # stan nagrywania (logika czasu)
  158. self.recording = False
  159. self.record_end_time = 0.0
  160. self.last_record_end_time = 0.0 # kiedy zakończyło się ostatnie nagranie
  161.  
  162. # reconnect
  163. self.fail_count = 0
  164. self.max_fail_before_reopen = int(self.fps * 2) if self.fps > 0 else 50
  165.  
  166. # trigger z innego wątku
  167. self.lock = threading.Lock()
  168. self.pending_trigger_code = None # string lub None
  169.  
  170. # kolejka do wątku zapisującego (limit żeby nie zjechać RAM-em)
  171. self.writer_queue = queue.Queue(maxsize=300)
  172. self.writer_thread = WriterWorker(self.name, self.writer_queue, self.fps, self.width, self.height)
  173. self.writer_thread.start()
  174.  
  175. print(
  176. f"[{self.name}] Start. {self.width}x{self.height} @ {self.fps} fps, "
  177. f"PRE={PRE_SECONDS}s, POST={POST_SECONDS}s"
  178. )
  179.  
  180. # ======== RTSP ========
  181.  
  182. def _open_stream(self):
  183. if self.cap:
  184. self.cap.release()
  185. print(f"[{self.name}] Otwieram RTSP...")
  186. self.cap = cv2.VideoCapture(self.url)
  187. try:
  188. self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
  189. except Exception:
  190. pass
  191.  
  192. if not self.cap.isOpened():
  193. print(f"[{self.name}] ❌ Nie można otworzyć strumienia RTSP!")
  194.  
  195. # ======== komunikacja z writerem ========
  196.  
  197. def _writer_open(self, filepath):
  198. try:
  199. self.writer_queue.put({"cmd": "open", "filepath": filepath}, timeout=0.5)
  200. except queue.Full:
  201. print(f"[{self.name}][WRITER] Kolejka pełna przy open().")
  202.  
  203. def _writer_frame(self, frame):
  204. try:
  205. self.writer_queue.put({"cmd": "frame", "frame": frame}, timeout=0.01)
  206. except queue.Full:
  207. # lepiej zgubić klatkę niż zabić RAM i mieć lagi
  208. pass
  209.  
  210. def _writer_close(self):
  211. try:
  212. self.writer_queue.put({"cmd": "close"}, timeout=0.5)
  213. except queue.Full:
  214. print(f"[{self.name}][WRITER] Kolejka pełna przy close().")
  215.  
  216. # ======== stan nagrywania ========
  217.  
  218. def _close_recording(self, now_ts: float):
  219. if self.recording:
  220. self._writer_close()
  221. self.recording = False
  222. self.record_end_time = 0.0
  223. self.last_record_end_time = now_ts # od tego momentu liczymy ewentualny pre przy następnym triggerze
  224.  
  225. def _start_recording(self, part_code: str, use_prebuffer: bool, now_ts: float):
  226. filepath = build_filename(part_code, self.name)
  227. print(
  228. f"[{self.name}] START nagrania: {os.path.basename(filepath)} | "
  229. f"prebuffer={'TAK' if use_prebuffer else 'NIE'}"
  230. )
  231. self._writer_open(filepath)
  232.  
  233. if use_prebuffer:
  234. # Bierzemy KLATKI:
  235. # - nie starsze niż PRE_SECONDS
  236. # - nie starsze niż koniec poprzedniego nagrania (żeby nie było zakładek)
  237. frames = [
  238. f for (t, f) in self.buffer
  239. if (now_ts - t <= PRE_SECONDS) and (t >= self.last_record_end_time)
  240. ]
  241. print(
  242. f"[{self.name}] Wysyłam pre-bufor do writer'a: {len(frames)} klatek "
  243. f"(realnie od {max(self.last_record_end_time, now_ts - PRE_SECONDS):.3f}s)."
  244. )
  245. for f in frames:
  246. self._writer_frame(f)
  247.  
  248. # Po użyciu pre nie ma sensu trzymać starego bufora – czyścimy
  249. self.buffer.clear()
  250.  
  251. self.recording = True
  252. self.record_end_time = now_ts + POST_SECONDS
  253. print(
  254. f"[{self.name}] Nagrywanie ustawione do {self.record_end_time:.3f} (POST={POST_SECONDS}s)."
  255. )
  256.  
  257. # ======== trigger z zewnątrz ========
  258.  
  259. def trigger(self, part_code: str):
  260. safe_code = sanitize_part_code(part_code)
  261. with self.lock:
  262. self.pending_trigger_code = safe_code
  263. print(f"[{self.name}] Otrzymano trigger z kodem: {safe_code}")
  264.  
  265. # ======== główna pętla kamery ========
  266.  
  267. def run(self):
  268. while True:
  269. if not self.cap or not self.cap.isOpened():
  270. self._open_stream()
  271. time.sleep(1)
  272. continue
  273.  
  274. ret, frame = self.cap.read()
  275. if not ret:
  276. self.fail_count += 1
  277. if self.fail_count >= self.max_fail_before_reopen:
  278. print(f"[{self.name}] Za dużo błędów z rzędu, ponowne otwarcie RTSP.")
  279. self._open_stream()
  280. self.fail_count = 0
  281. time.sleep(0.01)
  282. continue
  283.  
  284. self.fail_count = 0
  285. now_ts = time.time()
  286.  
  287. # ====== BUFOR TYLKO GDY NIE NAGRYWAMY ======
  288. if not self.recording:
  289. # dodajemy klatkę do bufora (bez .copy(), oszczędzamy RAM)
  290. self.buffer.append((now_ts, frame))
  291. # usuwamy to, co starsze niż PRE_SECONDS lub starsze niż koniec ostatniego nagrania
  292. while self.buffer and (
  293. (now_ts - self.buffer[0][0] > PRE_SECONDS) or
  294. (self.buffer[0][0] < self.last_record_end_time)
  295. ):
  296. self.buffer.popleft()
  297.  
  298. # ====== SPRAWDZAMY TRIGGER ======
  299. with self.lock:
  300. trigger_code = self.pending_trigger_code
  301. self.pending_trigger_code = None
  302.  
  303. if trigger_code is not None:
  304. if self.recording:
  305. # TRIGGER W TRAKCIE NAGRANIA -> KOŃCZYMY TO NAGRANIE, NOWE BEZ PRE
  306. print(f"[{self.name}] Trigger w trakcie nagrania -> kończę poprzedni plik, start nowego bez pre.")
  307. self._close_recording(now_ts)
  308. use_prebuffer = False
  309. else:
  310. # BYLIŚMY IDLE -> można użyć PRE (ale max do końca poprzedniego nagrania)
  311. use_prebuffer = True
  312.  
  313. self._start_recording(trigger_code, use_prebuffer, now_ts)
  314.  
  315. # ====== ZAPIS KLATEK JEŚLI NAGRYWAMY ======
  316. if self.recording:
  317. self._writer_frame(frame)
  318.  
  319. if now_ts >= self.record_end_time:
  320. print(f"[{self.name}] Koniec nagrania (minęło POST_SECONDS).")
  321. self._close_recording(now_ts)
  322.  
  323.  
  324. # ================== TCP SERVER ==================
  325.  
  326. def tcp_server(cameras):
  327. srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  328. srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  329. srv.bind((TCP_HOST, TCP_PORT))
  330. srv.listen(5)
  331. print(f"[TCP] Nasłuch na {TCP_HOST}:{TCP_PORT}")
  332.  
  333. while True:
  334. conn, addr = srv.accept()
  335. with conn:
  336. try:
  337. data = conn.recv(1024)
  338. if not data:
  339. continue
  340. code = data.decode(errors="ignore").strip()
  341. print(f"[TCP] Kod części: {code!r} od {addr}")
  342. if not code:
  343. continue
  344. for cam in cameras:
  345. cam.trigger(code)
  346. except Exception as e:
  347. print("[TCP] Błąd:", e)
  348.  
  349.  
  350. # ================== MAIN ==================
  351.  
  352. def main():
  353. ensure_output_dir()
  354.  
  355. cameras = []
  356. for name, url in CAMERAS.items():
  357. cam = CameraRecorder(name, url)
  358. cam.start()
  359. cameras.append(cam)
  360.  
  361. threading.Thread(target=cleanup_worker, daemon=True).start()
  362. threading.Thread(target=tcp_server, args=(cameras,), daemon=True).start()
  363.  
  364. print("=== SYSTEM NAGRYWANIA (PRE TYLKO PO IDLE, BEZ ZAKŁADEK, WRITER THREAD) URUCHOMIONY ===")
  365. print("Katalog:", OUTPUT_DIR)
  366. print(f"Pre max: {PRE_SECONDS} s, Post max: {POST_SECONDS} s, Retencja: {RETENTION_DAYS} dni\n")
  367.  
  368. while True:
  369. time.sleep(1)
  370.  
  371.  
  372. if __name__ == "__main__":
  373. main()
  374.  
Advertisement
Add Comment
Please, Sign In to add comment