Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- import pytesseract
- import os
- import re
- import time
- import joblib
- import numpy as np
- from datetime import datetime
- import tkinter as tk
- from tkinter import ttk
- from PIL import Image, ImageTk
- # Load trained model
- from sklearn.ensemble import RandomForestClassifier
- model = joblib.load("model.pkl")
- IMG_SIZE = (32, 32)
- # Tesseract config
- pytesseract.pytesseract.tesseract_cmd = r"C:\\Program Files\\Tesseract-OCR\\tesseract.exe"
- custom_config = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789'
- # Output directories
- output_dir = "detections"
- os.makedirs(output_dir, exist_ok=True)
- feedback_file = "feedback_log.csv"
- if not os.path.exists(feedback_file):
- with open(feedback_file, "w") as f:
- f.write("number,confidence,label,timestamp\n")
- # Globals
- pending_feedback = False
- freeze_mode = False
- last_detected_number = None
- last_detection_time = 0
- cooldown_seconds = 5
- current_number = None
- current_confidence = None
- last_saved_image = None
- # Active Learning
- new_data = []
- ACTIVE_BATCH = 10
- # GUI setup
- root = tk.Tk()
- root.title("Digit Detector (Tesseract + Model)")
- root.geometry("1280x650")
- root.grid_columnconfigure(0, weight=1)
- root.grid_columnconfigure(1, weight=1)
- root.grid_rowconfigure(0, weight=1)
- label_camera = ttk.Label(root)
- label_camera.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")
- label_processed = ttk.Label(root)
- label_processed.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
- label_info = ttk.Label(root, text="🧠 Waiting for number...", font=('Arial', 14))
- label_info.grid(row=1, column=0, columnspan=2)
- # Entry and button for manual correction
- entry_corrected = ttk.Entry(root)
- btn_submit_correction = ttk.Button(root, text="📤 Submit Correction", command=lambda: submit_manual_correction(entry_corrected.get()))
- entry_corrected.grid(row=2, column=0, padx=5, pady=5)
- btn_submit_correction.grid(row=2, column=1, padx=5, pady=5)
- entry_corrected.grid_remove()
- btn_submit_correction.grid_remove()
- # Feedback buttons + Cancel & Retry
- btn_correct = ttk.Button(root, text="✅ Correct", command=lambda: save_feedback(True))
- btn_correct.grid(row=3, column=0, pady=10)
- btn_wrong = ttk.Button(root, text="❌ Wrong", command=lambda: save_feedback(False))
- btn_wrong.grid(row=3, column=1, pady=10)
- btn_cancel = ttk.Button(root, text="🛑 Cancel Freeze", command=lambda: cancel_freeze())
- btn_cancel.grid(row=4, column=0, pady=5)
- btn_retry = ttk.Button(root, text="🔁 Retry", command=lambda: force_retry())
- btn_retry.grid(row=4, column=1, pady=5)
- # Capture
- cap = cv2.VideoCapture(0)
- def predict_with_model(image):
- img_resized = cv2.resize(image, IMG_SIZE)
- img_flat = img_resized.flatten().reshape(1, -1)
- prediction = model.predict(img_flat)[0]
- return str(prediction)
- def show_feedback_prompt():
- label_info.config(text=f"❓ Was {current_number} correct? Use ✅ / ❌")
- def cancel_freeze():
- global freeze_mode, pending_feedback
- freeze_mode = False
- pending_feedback = False
- entry_corrected.grid_remove()
- btn_submit_correction.grid_remove()
- label_info.config(text="⏹️ Freeze canceled. Resuming live...")
- def force_retry():
- global last_detection_time
- # allow immediate retry on last_saved_image
- last_detection_time = 0
- label_info.config(text="🔁 Retry triggered.")
- def save_feedback(correct):
- global current_number, current_confidence, last_saved_image, pending_feedback, freeze_mode, new_data
- if not current_number or last_saved_image is None:
- return
- ts = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
- label = "correct" if correct else "wrong"
- with open(feedback_file, "a") as f:
- f.write(f"{current_number},{current_confidence},{label},{ts}\n")
- # save color crop if correct
- folder = current_number if correct else 'unlabeled'
- os.makedirs(os.path.join("dataset", folder), exist_ok=True)
- cv2.imwrite(os.path.join("dataset", folder, f"{ts}.png"), last_saved_image)
- if correct:
- # active learning
- new_data.append((last_saved_image, int(current_number)))
- if len(new_data) >= ACTIVE_BATCH:
- retrain_model()
- pending_feedback = False
- freeze_mode = False
- label_info.config(text=f"✅ Saved: {current_number} as {label}")
- else:
- # enter freeze+retry mode
- freeze_mode = True
- pending_feedback = True
- label_info.config(text="❌ Wrong detection. Awaiting retry or correction.")
- entry_corrected.delete(0, tk.END)
- entry_corrected.grid()
- btn_submit_correction.grid()
- def submit_manual_correction(val):
- global current_number, last_saved_image, pending_feedback, freeze_mode, new_data
- val = val.strip()
- if not (val.isdigit() or val.lower()=='nothing'):
- label_info.config(text="⚠️ Enter a valid number or 'nothing'")
- return
- ts = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
- folder = val if val.isdigit() else 'unlabeled'
- os.makedirs(os.path.join("dataset", folder), exist_ok=True)
- cv2.imwrite(os.path.join("dataset", folder, f"{ts}.png"), last_saved_image)
- with open(feedback_file, "a") as f:
- f.write(f"{val},{current_confidence},manual,{ts}\n")
- # active learning
- if val.isdigit():
- new_data.append((last_saved_image, int(val)))
- if len(new_data) >= ACTIVE_BATCH:
- retrain_model()
- pending_feedback = False
- freeze_mode = False
- entry_corrected.grid_remove()
- btn_submit_correction.grid_remove()
- label_info.config(text=f"📤 Manual correction saved: {val}")
- def retrain_model():
- global new_data, model
- X, y = [], []
- for lbl in os.listdir("dataset"):
- if lbl == 'unlabeled': continue
- p = os.path.join("dataset", lbl)
- for fn in os.listdir(p):
- img = cv2.imread(os.path.join(p, fn), cv2.IMREAD_GRAYSCALE)
- if img is None: continue
- X.append(cv2.resize(img, IMG_SIZE).flatten())
- y.append(int(lbl))
- for img_arr, lbl in new_data:
- X.append(cv2.resize(img_arr, IMG_SIZE).flatten())
- y.append(lbl)
- if X:
- clf = RandomForestClassifier(n_estimators=100)
- clf.fit(np.array(X), np.array(y))
- joblib.dump(clf, "model.pkl")
- new_data = []
- label_info.config(text="🤖 Model retrained.")
- def update_frames():
- global last_detected_number, last_detection_time
- global current_number, current_confidence, last_saved_image, pending_feedback, freeze_mode
- # Freeze+Retry loop
- if freeze_mode and pending_feedback and last_saved_image is not None:
- # only retry when cooldown passed
- if time.time() - last_detection_time > cooldown_seconds:
- # re-run OCR
- data = pytesseract.image_to_data(last_saved_image, config=custom_config,
- output_type=pytesseract.Output.DICT)
- num, conf = None, 0
- for i, txt in enumerate(data['text']):
- txt = txt.strip()
- try: c = int(data['conf'][i])
- except: c = 0
- if re.fullmatch(r'\d{1,3}', txt) and c >= 60:
- num, conf = txt, c
- break
- if num:
- current_number, current_confidence = num, conf
- label_info.config(text=f"🔁 Retry: Was {num} correct?")
- last_detection_time = time.time()
- root.after(100, update_frames)
- return
- # Live capture
- ret, frame = cap.read()
- if not ret:
- root.after(30, update_frames); return
- frame = cv2.resize(frame, (640, 480))
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- blur = cv2.GaussianBlur(gray, (3,3),0)
- _, thresh = cv2.threshold(blur,170,255,cv2.THRESH_BINARY)
- # skip empty
- if cv2.countNonZero(thresh) < 500:
- label_info.config(text="🧠 No number detected")
- root.after(30, update_frames); return
- # OCR detection
- data = pytesseract.image_to_data(thresh, config=custom_config,
- output_type=pytesseract.Output.DICT)
- num, conf = None, 0
- for i, txt in enumerate(data['text']):
- txt = txt.strip()
- try: c = int(data['conf'][i])
- except: c = 0
- if re.fullmatch(r'\d{1,3}', txt) and c >= 60:
- num, conf = txt, c
- break
- if num and (num != last_detected_number or time.time() - last_detection_time > cooldown_seconds):
- timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
- cv2.imwrite(os.path.join(output_dir, f"detected_{timestamp}.png"), frame)
- last_saved_image = thresh.copy()
- last_detected_number = num
- last_detection_time = time.time()
- current_number, current_confidence = num, conf
- pending_feedback = True
- show_feedback_prompt()
- # update UI
- cam_img = cv2.resize(frame, (600, 450))
- proc_img = cv2.resize(thresh, (600, 450))
- imgtk1 = ImageTk.PhotoImage(Image.fromarray(cv2.cvtColor(cam_img, cv2.COLOR_BGR2RGB)))
- imgtk2 = ImageTk.PhotoImage(Image.fromarray(proc_img))
- label_camera.config(image=imgtk1); label_camera.imgtk = imgtk1
- label_processed.config(image=imgtk2); label_processed.imgtk = imgtk2
- root.after(30, update_frames)
- update_frames()
- root.mainloop()
- cap.release()
- cv2.destroyAllWindows()
Advertisement
Add Comment
Please, Sign In to add comment