Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- import pytesseract
- import os
- import re
- import time
- import joblib
- import numpy as np
- from datetime import datetime
- import tkinter as tk
- from tkinter import ttk
- from PIL import Image, ImageTk
- from sklearn.ensemble import RandomForestClassifier
- # Constants
- IMG_SIZE = (32, 32)
- COOLDOWN = 5 # seconds
- MIN_OCR_CONF = 60
- ACTIVE_LEARNING_BATCH = 10
- SCAN_INTERVAL = 2.0 # seconds between rescans of frozen frame
- DATASET_DIR = "dataset"
- MODEL_PATH = "model.pkl"
- class DigitDetectorApp:
- def __init__(self, model_path=MODEL_PATH):
- # Load or initialize model
- if os.path.exists(model_path):
- self.model = joblib.load(model_path)
- else:
- self.model = RandomForestClassifier(n_estimators=100)
- self.new_data = []
- # Tesseract config
- pytesseract.pytesseract.tesseract_cmd = r"C:\\Program Files\\Tesseract-OCR\\tesseract.exe"
- self.ocr_config = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789'
- # Output dirs
- os.makedirs("detections", exist_ok=True)
- os.makedirs(DATASET_DIR, exist_ok=True)
- self.feedback_file = "feedback_log.csv"
- if not os.path.exists(self.feedback_file):
- with open(self.feedback_file, "w") as f:
- f.write("number,confidence,label,timestamp\n")
- # State
- self.pending_feedback = False
- self.freeze_mode = False
- self.manual_correction_mode = False
- self.last_detected = None
- self.last_time = 0
- self.last_scan_time = 0
- self.history = []
- self.current = None
- # GUI setup
- self.root = tk.Tk()
- self.root.title("Digit Detector App")
- self.root.geometry("1280x650")
- self._build_gui()
- # Video capture
- self.cap = cv2.VideoCapture(0)
- self.update_frames()
- self.root.mainloop()
- def _build_gui(self):
- frame = self.root
- frame.grid_columnconfigure(0, weight=1)
- frame.grid_columnconfigure(1, weight=1)
- frame.grid_rowconfigure(0, weight=1)
- self.label_cam = ttk.Label(frame)
- self.label_cam.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")
- self.label_proc = ttk.Label(frame)
- self.label_proc.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
- self.label_info = ttk.Label(frame, text="🧠 Waiting...", font=('Arial', 14))
- self.label_info.grid(row=1, column=0, columnspan=2)
- btn_frame = ttk.Frame(frame)
- btn_frame.grid(row=2, column=0, columnspan=2)
- ttk.Button(btn_frame, text="✅ Correct", command=lambda: self.save_feedback(True)).pack(side="left", padx=5)
- ttk.Button(btn_frame, text="❌ Wrong", command=lambda: self.save_feedback(False)).pack(side="left", padx=5)
- ttk.Button(btn_frame, text="⟲ Undo", command=self.undo).pack(side="left", padx=5)
- ttk.Button(btn_frame, text="🛑 Cancel Freeze", command=self.cancel_freeze).pack(side="left", padx=5)
- ttk.Button(btn_frame, text="🔁 Retry", command=self.force_retry).pack(side="left", padx=5)
- # Manual correction entry
- self.entry = ttk.Entry(frame)
- self.btn_correct = ttk.Button(frame, text="Submit", command=lambda: self.manual_correction(self.entry.get()))
- self.entry.grid(row=3, column=0, padx=5, pady=5)
- self.btn_correct.grid(row=3, column=1, padx=5, pady=5)
- self.entry.grid_remove()
- self.btn_correct.grid_remove()
- def cancel_freeze(self):
- # Exit freeze mode without logging
- self.freeze_mode = False
- self.pending_feedback = False
- self.manual_correction_mode = False
- self.entry.grid_remove()
- self.btn_correct.grid_remove()
- self.label_info.config(text="⏹️ Freeze canceled. Resuming live detection...")
- def force_retry(self):
- # Force next retry immediately
- self.last_scan_time = 0
- self.label_info.config(text="🔁 Retry triggered")
- def preprocess(self, frame):
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- blur = cv2.GaussianBlur(gray, (3, 3), 0)
- _, thresh = cv2.threshold(blur, 170, 255, cv2.THRESH_BINARY)
- return thresh
- def predict_ocr(self, img):
- data = pytesseract.image_to_data(img, config=self.ocr_config, output_type=pytesseract.Output.DICT)
- for i, txt in enumerate(data['text']):
- txt = txt.strip()
- try:
- conf = int(data['conf'][i])
- except:
- conf = 0
- if re.fullmatch(r'\d{1,3}', txt) and conf >= MIN_OCR_CONF:
- return txt, conf
- return None, 0
- def predict_ml(self, img):
- resized = cv2.resize(img, IMG_SIZE).flatten().reshape(1, -1)
- pred = self.model.predict(resized)[0]
- conf = int(max(self.model.predict_proba(resized)[0]) * 100) if hasattr(self.model, 'predict_proba') else 0
- return str(pred), conf
- def ensemble_predict(self, img):
- # Try OCR first, then ML
- num_o, conf_o = self.predict_ocr(img)
- num_m, conf_m = self.predict_ml(img)
- if num_o and conf_o >= conf_m:
- return num_o, conf_o
- return num_m, conf_m
- def save_feedback(self, correct):
- if not self.current:
- return
- num, conf, img = self.current
- ts = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
- if correct:
- # Log correct feedback
- with open(self.feedback_file, 'a') as f:
- f.write(f"{num},{conf},correct,{ts}\n")
- folder = num
- os.makedirs(os.path.join(DATASET_DIR, folder), exist_ok=True)
- cv2.imwrite(os.path.join(DATASET_DIR, folder, f"{ts}.png"), img)
- self.new_data.append((img, int(num)))
- if len(self.new_data) >= ACTIVE_LEARNING_BATCH:
- self.retrain_model()
- # Unfreeze
- self.freeze_mode = False
- self.pending_feedback = False
- self.manual_correction_mode = False
- self.entry.grid_remove()
- self.btn_correct.grid_remove()
- self.label_info.config(text=f"✅ Saved: {num} as correct")
- else:
- # On wrong, freeze and request correction
- self.freeze_mode = True
- self.pending_feedback = True
- self.manual_correction_mode = True
- self.entry.grid()
- self.btn_correct.grid()
- self.label_info.config(text=f"⚠️ Wrong detection. Please correct.")
- def manual_correction(self, val):
- if not self.current:
- return
- val = val.strip()
- if not (val.isdigit() or val.lower() == 'nothing'):
- self.label_info.config(text="⚠️ Enter a digit or 'nothing'")
- return
- num_old, conf_old, img = self.current
- ts = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
- folder = val if val.isdigit() else 'unlabeled'
- os.makedirs(os.path.join(DATASET_DIR, folder), exist_ok=True)
- cv2.imwrite(os.path.join(DATASET_DIR, folder, f"{ts}.png"), img)
- with open(self.feedback_file, 'a') as f:
- f.write(f"{val},{conf_old},manual,{ts}\n")
- # Update current detection
- self.current = (val, conf_old, img)
- # Unfreeze after manual
- self.freeze_mode = False
- self.pending_feedback = False
- self.manual_correction_mode = False
- self.entry.grid_remove()
- self.btn_correct.grid_remove()
- self.label_info.config(text=f"📤 Correction saved: {val}")
- def undo(self):
- if not self.history:
- self.label_info.config(text="⟲ Nothing to undo")
- return
- num, ts, img = self.history.pop()
- self.label_info.config(text=f"⟲ Undone: {num} at {ts}")
- self.display_images(img)
- def retrain_model(self):
- X, y = [], []
- for label in os.listdir(DATASET_DIR):
- if label == 'unlabeled': continue
- for fname in os.listdir(os.path.join(DATASET_DIR, label)):
- img = cv2.imread(os.path.join(DATASET_DIR, label, fname), cv2.IMREAD_GRAYSCALE)
- if img is None: continue
- X.append(cv2.resize(img, IMG_SIZE).flatten())
- y.append(int(label))
- for img_val, lbl in self.new_data:
- X.append(cv2.resize(img_val, IMG_SIZE).flatten())
- y.append(lbl)
- if X:
- self.model = RandomForestClassifier(n_estimators=100)
- self.model.fit(np.array(X), np.array(y))
- joblib.dump(self.model, MODEL_PATH)
- self.new_data.clear()
- self.label_info.config(text="🤖 Model retrained")
- def display_images(self, proc):
- frame = cv2.resize(self.last_frame, (600,450))
- cam_p = ImageTk.PhotoImage(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))
- pr_p = ImageTk.PhotoImage(Image.fromarray(proc))
- self.label_cam.config(image=cam_p)
- self.label_cam.imgtk = cam_p
- self.label_proc.config(image=pr_p)
- self.label_proc.imgtk = pr_p
- def update_frames(self):
- now = time.time()
- # Freeze mode: retry on last frame
- if self.freeze_mode and self.current:
- if now - self.last_scan_time > SCAN_INTERVAL:
- num_old, conf_old, img = self.current
- num_new, conf_new = self.ensemble_predict(img)
- self.current = (num_new, conf_new, img)
- self.label_info.config(text=f"🔁 Retry: Was {num_new} correct?")
- self.last_scan_time = now
- if self.manual_correction_mode:
- self.entry.grid()
- self.btn_correct.grid()
- self.display_images(self.current[2])
- self.root.after(30, self.update_frames)
- return
- # Live capture mode
- ret, frame = self.cap.read()
- if not ret:
- self.root.after(30, self.update_frames)
- return
- self.last_frame = frame.copy()
- proc = self.preprocess(frame)
- now = time.time()
- # Skip low-content frames
- if cv2.countNonZero(proc) < 500 and not self.pending_feedback:
- self.label_info.config(text="📷 Weak image, no digit detected.")
- self.display_images(proc)
- self.root.after(30, self.update_frames)
- return
- # Start new detection
- if not self.pending_feedback:
- num, conf = self.ensemble_predict(proc)
- if num and (num != self.last_detected or now - self.last_time > COOLDOWN):
- ts = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
- cv2.imwrite(f"detections/detected_{ts}.png", frame)
- self.history.append((num, ts, proc))
- self.current = (num, conf, proc)
- self.last_detected, self.last_time = num, now
- self.pending_feedback = True
- self.freeze_mode = True
- self.manual_correction_mode = False
- self.label_info.config(text=f"❓ Was {num} correct?")
- self.entry.delete(0, tk.END)
- self.entry.grid()
- self.btn_correct.grid()
- self.display_images(proc)
- self.root.after(30, self.update_frames)
- if __name__ == "__main__":
- DigitDetectorApp()
Advertisement
Add Comment
Please, Sign In to add comment