Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- import pytesseract
- import os
- import re
- import time
- import joblib
- import numpy as np
- from datetime import datetime
- import tkinter as tk
- from tkinter import ttk
- from PIL import Image, ImageTk
- from sklearn.ensemble import RandomForestClassifier
- # Constants
- IMG_SIZE = (32, 32)
- COOLDOWN = 5 # seconds
- MIN_OCR_CONF = 60
- MIN_ML_CONF = 50
- ACTIVE_LEARNING_BATCH = 10
- SCAN_INTERVAL = 2.0 # seconds between retries
- MIN_WHITE_PIXELS = 500 # skip low-content frames
- DATASET_DIR = "dataset"
- MODEL_PATH = "model.pkl"
- class DigitDetectorApp:
- def __init__(self, model_path=MODEL_PATH):
- # Load or initialize model
- if os.path.exists(model_path):
- self.model = joblib.load(model_path)
- else:
- self.model = RandomForestClassifier(n_estimators=100)
- self.new_data = []
- # Tesseract config
- pytesseract.pytesseract.tesseract_cmd = r"C:\\Program Files\\Tesseract-OCR\\tesseract.exe"
- self.ocr_config = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789'
- # Output dirs
- os.makedirs("detections", exist_ok=True)
- os.makedirs(DATASET_DIR, exist_ok=True)
- self.feedback_file = "feedback_log.csv"
- if not os.path.exists(self.feedback_file):
- with open(self.feedback_file, "w") as f:
- f.write("number,confidence,label,timestamp\n")
- # State
- self.pending_feedback = False
- self.freeze_mode = False
- self.manual_mode = False
- self.last_detected = None
- self.last_time = 0
- self.last_scan_time = 0
- self.history = []
- self.current = None
- self.current_frame_raw = None
- self.current_thresh = None
- # Setup GUI
- self.root = tk.Tk()
- self.root.title("Digit Detector App")
- self.root.geometry("1280x650")
- self._build_gui()
- # Video capture
- self.cap = cv2.VideoCapture(0)
- self.update_frames()
- self.root.mainloop()
- def _build_gui(self):
- frame = self.root
- frame.grid_columnconfigure(0, weight=1)
- frame.grid_columnconfigure(1, weight=1)
- frame.grid_rowconfigure(0, weight=1)
- self.label_cam = ttk.Label(frame)
- self.label_cam.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")
- self.label_proc = ttk.Label(frame)
- self.label_proc.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
- self.label_info = ttk.Label(frame, text="đ§ Waiting for valid frame...", font=('Arial', 14))
- self.label_info.grid(row=1, column=0, columnspan=2)
- btn_frame = ttk.Frame(frame)
- btn_frame.grid(row=2, column=0, columnspan=2)
- ttk.Button(btn_frame, text="â Correct", command=lambda: self.save_feedback(True)).pack(side="left", padx=5)
- ttk.Button(btn_frame, text="â Wrong", command=lambda: self.save_feedback(False)).pack(side="left", padx=5)
- ttk.Button(btn_frame, text="đ Cancel Freeze", command=self.cancel_freeze).pack(side="left", padx=5)
- ttk.Button(btn_frame, text="đ Retry", command=self.force_retry).pack(side="left", padx=5)
- ttk.Button(btn_frame, text="ⲠUndo", command=self.undo).pack(side="left", padx=5)
- self.entry = ttk.Entry(frame)
- self.btn_submit = ttk.Button(frame, text="Submit", command=lambda: self.manual_correction(self.entry.get()))
- self.entry.grid(row=3, column=0, padx=5, pady=5)
- self.btn_submit.grid(row=3, column=1, padx=5, pady=5)
- self.entry.grid_remove(); self.btn_submit.grid_remove()
- def cancel_freeze(self):
- self.freeze_mode = False
- self.pending_feedback = False
- self.manual_mode = False
- self.entry.grid_remove(); self.btn_submit.grid_remove()
- self.label_info.config(text="âšī¸ Freeze canceled. Resuming live detection.")
- def force_retry(self):
- self.last_scan_time = 0
- self.label_info.config(text="đ Retry triggered.")
- def preprocess(self, frame):
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- blur = cv2.GaussianBlur(gray, (3,3),0)
- _, thresh = cv2.threshold(blur,170,255,cv2.THRESH_BINARY)
- return thresh
- def predict_ocr(self, img):
- data = pytesseract.image_to_data(img, config=self.ocr_config, output_type=pytesseract.Output.DICT)
- for i, txt in enumerate(data['text']):
- txt = txt.strip()
- try: conf = int(data['conf'][i])
- except: conf = 0
- if re.fullmatch(r'\d{1,3}', txt) and conf >= MIN_OCR_CONF:
- return txt, conf
- return None, 0
- def predict_ml(self, img):
- resized = cv2.resize(img, IMG_SIZE).flatten().reshape(1,-1)
- pred = self.model.predict(resized)[0]
- conf = int(max(self.model.predict_proba(resized)[0]) *100) if hasattr(self.model,'predict_proba') else 0
- return str(pred), conf
- def ensemble_predict(self, img):
- # Validate thresholds
- num_o, conf_o = self.predict_ocr(img)
- num_m, conf_m = self.predict_ml(img)
- valid_o = num_o and conf_o >= MIN_OCR_CONF
- valid_m = num_m and conf_m >= MIN_ML_CONF
- if not(valid_o or valid_m):
- return None, 0
- if valid_o and conf_o>=conf_m:
- return num_o, conf_o
- return num_m, conf_m
- def save_feedback(self, correct):
- if not self.current: return
- num, conf, thresh = self.current
- ts = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
- if correct:
- # log and unfreeze
- with open(self.feedback_file,'a') as f:
- f.write(f"{num},{conf},correct,{ts}\n")
- folder=os.path.join(DATASET_DIR,num)
- os.makedirs(folder,exist_ok=True)
- cv2.imwrite(os.path.join(folder,f"{ts}.png"), self.current_frame_raw)
- self.new_data.append((thresh,int(num)))
- if len(self.new_data)>=ACTIVE_LEARNING_BATCH: self.retrain_model()
- self.freeze_mode=False; self.pending_feedback=False; self.manual_mode=False
- self.entry.grid_remove(); self.btn_submit.grid_remove()
- self.label_info.config(text=f"â Saved correct: {num}")
- else:
- # freeze and enter manual mode
- self.freeze_mode=True; self.pending_feedback=True; self.manual_mode=False
- self.label_info.config(text=f"â Wrong. Will retry or await correction.")
- def manual_correction(self,val):
- if not self.current: return
- val=val.strip()
- if not(val.isdigit() or val.lower()=='nothing'):
- self.label_info.config(text="â ī¸ Enter digit or 'nothing'")
- return
- num_old,conf_old,thresh=self.current
- ts=datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
- folder=val if val.isdigit() else 'unlabeled'
- path=os.path.join(DATASET_DIR,folder)
- os.makedirs(path,exist_ok=True)
- cv2.imwrite(os.path.join(path,f"{ts}.png"),self.current_frame_raw)
- with open(self.feedback_file,'a') as f:
- f.write(f"{val},{conf_old},manual,{ts}\n")
- # update
- self.current=(val,conf_old,self.current_thresh)
- self.freeze_mode=False; self.pending_feedback=False; self.manual_mode=False
- self.entry.grid_remove(); self.btn_submit.grid_remove()
- self.label_info.config(text=f"đ¤ Correction saved: {val}")
- def undo(self):
- if not self.history: return
- num,ts,thresh=self.history.pop()
- self.label_info.config(text=f"ⲠUndone: {num} at {ts}")
- self.display_images(thresh)
- def retrain_model(self):
- X,y=[],[]
- for label in os.listdir(DATASET_DIR):
- if label=='unlabeled': continue
- for fn in os.listdir(os.path.join(DATASET_DIR,label)):
- img=cv2.imread(os.path.join(DATASET_DIR,label,fn),cv2.IMREAD_GRAYSCALE)
- if img is None: continue
- X.append(cv2.resize(img,IMG_SIZE).flatten()); y.append(int(label))
- for img,lbl in self.new_data:
- X.append(img.flatten()); y.append(lbl)
- if X:
- self.model=RandomForestClassifier(n_estimators=100)
- self.model.fit(np.array(X),np.array(y))
- joblib.dump(self.model,MODEL_PATH)
- self.new_data.clear()
- self.label_info.config(text="đ¤ Model retrained.")
- def display_images(self,proc):
- frame=cv2.resize(self.last_frame,(600,450))
- cam=ImageTk.PhotoImage(Image.fromarray(cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)))
- pr=ImageTk.PhotoImage(Image.fromarray(proc))
- self.label_cam.config(image=cam); self.label_cam.imgtk=cam
- self.label_proc.config(image=pr); self.label_proc.imgtk=pr
- def update_frames(self):
- now=time.time()
- # Freeze & retry
- if self.freeze_mode and self.current and not self.manual_mode:
- if now-self.last_scan_time>SCAN_INTERVAL:
- # retry detection on saved thresh
- num,conf=self.ensemble_predict(self.current_thresh)
- self.current=(num,conf,self.current_thresh)
- self.label_info.config(text=f"đ Retry: Was {num} correct?")
- self.last_scan_time=now
- # after first retry allow manual
- self.manual_mode=True
- self.entry.grid(); self.btn_submit.grid()
- self.display_images(self.current[2])
- self.root.after(30,self.update_frames)
- return
- # live capture
- ret,frame=self.cap.read()
- if not ret:
- self.root.after(30,self.update_frames); return
- self.last_frame=frame
- proc=self.preprocess(frame)
- self.current_frame_raw=frame.copy()
- self.current_thresh=proc
- # skip low-content
- if cv2.countNonZero(proc)<MIN_WHITE_PIXELS:
- self.label_info.config(text="đˇ Weak image, skipping.")
- self.display_images(proc)
- self.root.after(30,self.update_frames)
- return
- # initial detection
- if not self.pending_feedback:
- # validate detection
- num_o,conf_o=self.predict_ocr(proc)
- num_m,conf_m=self.predict_ml(proc)
- valid_o=num_o and conf_o>=MIN_OCR_CONF
- valid_m=num_m and conf_m>=MIN_ML_CONF
- if not(valid_o or valid_m):
- self.root.after(30,self.update_frames); return
- num,conf=(num_o,conf_o) if valid_o and conf_o>=conf_m else (num_m,conf_m)
- # freeze and prompt
- ts=datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
- cv2.imwrite(f"detections/detected_{ts}.png",frame)
- self.history.append((num,ts,proc))
- self.current=(num,conf,proc)
- self.last_detected, self.last_time = num, now
- self.pending_feedback=True; self.freeze_mode=True; self.manual_mode=False
- self.label_info.config(text=f"â Was {num} correct?")
- self.entry.delete(0,tk.END); self.entry.grid(); self.btn_submit.grid()
- self.display_images(proc)
- self.root.after(30,self.update_frames)
- if __name__=="__main__":
- DigitDetectorApp()
Advertisement
Add Comment
Please, Sign In to add comment