Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- import numpy as np
- import os
- import pandas as pd
- from io import BytesIO
- import mysql.connector
- import time
- import io
- from datetime import datetime, timedelta
- from flask import Flask, render_template, request, redirect, url_for, session, Response, send_from_directory, jsonify, send_file
- from db_config import get_db_connection
- from openpyxl import Workbook
- from ultralytics import YOLO
- from tracker import CentroidTracker
- import threading
- # Ganti source ke webcam (0) atau RTSP string
- CAMERA_SOURCE = 0 # atau "rtsp://user:pass@ip:554/..."
- SNAPSHOT_DIR = 'static/snapshots'
- os.makedirs(SNAPSHOT_DIR, exist_ok=True)
- app = Flask(__name__)
- app.secret_key = 'your_secret_key'
- # Untuk anti double counting
- recent_faces = []
- TIMEOUT_SEC = 30
- DIST_THRESHOLD = 50
- # Load model
- age_net = cv2.dnn.readNet('models/age_googlenet.onnx')
- gender_net = cv2.dnn.readNet('models/gender_googlenet.onnx')
- face_model = YOLO("models/best2.pt")
- face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")
- AGE_LIST = ['(0-2)', '(4-6)', '(8-12)', '(15-20)', '(25-32)', '(38-43)', '(48-53)', '(60-100)']
- GENDER_LIST = ['Male', 'Female']
- # --- Tracker ---
- tracker = CentroidTracker(max_disappeared=30, dist_threshold=50)
- recent_ids = {}
- RECENT_ID_TIMEOUT = 30 # detik, cache ID agar tidak double insert
- def insert_detection_async(*args):
- # Insert ke DB jalan di thread baru
- threading.Thread(target=insert_detection, args=args, daemon=True).start()
- def insert_detection(name, gender, age_predict, timestamp, snapshot_path):
- conn = get_db_connection()
- cursor = conn.cursor()
- sql = """
- INSERT INTO detections (name, gender, age_predict, timestamp, snapshots)
- VALUES (%s, %s, %s, %s, %s)
- """
- cursor.execute(sql, (name, gender, age_predict, timestamp, snapshot_path))
- conn.commit()
- cursor.close()
- conn.close()
- def gen_frames():
- cap = cv2.VideoCapture(CAMERA_SOURCE)
- cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- if not cap.isOpened():
- print("Gagal membuka kamera/video.")
- return
- frame_count = 0
- FRAME_SKIP = 3
- recent_ids = {} # object_id: last_saved_time
- while True:
- ret, frame = cap.read()
- if not ret:
- print("Frame tidak terbaca!")
- break
- frame_count += 1
- if frame_count % FRAME_SKIP != 0:
- continue
- now = datetime.now()
- ts = now.strftime('%Y-%m-%d %H:%M:%S')
- now_ts = time.time()
- # YOLO detection
- results = face_model(frame, verbose=False)
- boxes = []
- for r in results:
- for box, conf, cls in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.conf.cpu().numpy(), r.boxes.cls.cpu().numpy()):
- if int(cls) == 0 and conf >= 0.45:
- x1, y1, x2, y2 = map(int, box)
- boxes.append((x1, y1, x2, y2))
- # Tracking!
- objects = tracker.update(boxes)
- for object_id, centroid in objects.items():
- # Get corresponding bbox for this ID (find the nearest one)
- bbox = None
- min_dist = float("inf")
- for (x1, y1, x2, y2) in boxes:
- cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
- dist = np.linalg.norm(np.array([cx, cy]) - np.array(centroid))
- if dist < min_dist:
- min_dist = dist
- bbox = (x1, y1, x2, y2)
- if bbox is None:
- continue
- x1, y1, x2, y2 = bbox
- face_img = frame[y1:y2, x1:x2]
- if face_img.size == 0 or (x2-x1)<40 or (y2-y1)<40:
- continue
- # Anti double insert for object_id
- last_time = recent_ids.get(object_id, 0)
- if now_ts - last_time > TIMEOUT_SEC:
- # Predict gender/age
- blob = cv2.dnn.blobFromImage(face_img, 1.0, (224, 224), (104, 117, 123), swapRB=True)
- gender_net.setInput(blob)
- gender_preds = gender_net.forward()
- gender = GENDER_LIST[gender_preds[0].argmax()]
- age_net.setInput(blob)
- age_preds = age_net.forward()
- age_predict = AGE_LIST[age_preds[0].argmax()]
- # Save snapshot
- snap_name = f'snap_{now.strftime("%Y%m%d%H%M%S%f")}.jpg'
- snap_path = os.path.join(SNAPSHOT_DIR, snap_name)
- cv2.imwrite(snap_path, face_img)
- insert_detection_async("Unknown", gender, age_predict, ts, snap_name)
- recent_ids[object_id] = now_ts
- # Draw box & id
- cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
- cv2.putText(frame, f'ID:{object_id}', (x1, y1-20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2)
- # Remove expired IDs from cache
- for oid in list(recent_ids):
- if oid not in objects:
- if now_ts - recent_ids[oid] > TIMEOUT_SEC:
- del recent_ids[oid]
- # Streaming to browser
- ret, buffer = cv2.imencode('.jpg', frame)
- frame_bytes = buffer.tobytes()
- yield (b'--frame\r\n'
- b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
- continue
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if request.method == 'POST':
- username = request.form['username']
- password = request.form['password']
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
- cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password))
- user = cursor.fetchone()
- cursor.close()
- conn.close()
- if user:
- session['logged_in'] = True
- session['username'] = username
- return redirect(url_for('dashboard'))
- else:
- return render_template('login.html', error="Invalid credentials")
- return render_template('login.html')
- @app.route('/logout')
- def logout():
- session.clear()
- return redirect(url_for('login'))
- @app.route('/export-hourly-gender')
- def export_hourly_gender():
- # --- Dapatkan data yang sama persis dengan halaman hourly-gender ---
- conn = mysql.connector.connect(
- host='localhost', user='root', password='', database='bec_counting')
- cursor = conn.cursor(dictionary=True)
- cursor.execute("""
- SELECT
- HOUR(timestamp) AS hour,
- gender,
- COUNT(*) AS total
- FROM detections
- WHERE DATE(timestamp) = CURDATE()
- GROUP BY hour, gender
- """)
- results = cursor.fetchall()
- conn.close()
- # Siapkan data: urut jam 0-23, selalu ada (meski kosong)
- hours = {h: {'Male': 0, 'Female': 0} for h in range(24)}
- for row in results:
- h = row['hour']
- g = row['gender']
- if g in ('Male', 'Female'):
- hours[h][g] = row['total']
- table = []
- for h in range(24):
- label = datetime.strptime(f"{h:02d}:00", "%H:%M").strftime("%I:00 %p")
- table.append({
- 'hour': label,
- 'male': hours[h]['Male'],
- 'female': hours[h]['Female']
- })
- # --- Buat file XLSX dengan openpyxl ---
- output = io.BytesIO()
- wb = Workbook()
- ws = wb.active
- ws.title = "People Counting per Hour"
- ws.append(["Date/Time", "Male", "Female"])
- for row in table:
- ws.append([row['hour'], row['male'], row['female']])
- # Formatting lebar kolom
- for col in ws.columns:
- max_length = 0
- for cell in col:
- try:
- if len(str(cell.value)) > max_length:
- max_length = len(str(cell.value))
- except:
- pass
- col_letter = col[0].column_letter
- ws.column_dimensions[col_letter].width = max_length + 2
- for cell in ws[1]:
- cell.font = cell.font.copy(bold=True)
- wb.save(output)
- output.seek(0)
- today = datetime.now().strftime("%Y-%m-%d")
- return send_file(output,
- download_name=f"people_counting_hourly_{today}.xlsx",
- as_attachment=True,
- mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
- @app.route('/hourly-gender')
- def hourly_gender():
- conn = mysql.connector.connect(
- host='localhost', user='root', password='', database='bec_counting')
- cursor = conn.cursor(dictionary=True)
- cursor.execute("""
- SELECT
- HOUR(timestamp) AS hour,
- gender,
- COUNT(*) AS total
- FROM detections
- WHERE DATE(timestamp) = CURDATE()
- GROUP BY hour, gender
- ORDER BY hour DESC
- """)
- results = cursor.fetchall()
- conn.close()
- # Siapkan dict kosong: {hour: {Male: n, Female: m}}
- hours = {}
- for row in results:
- h = row['hour']
- g = row['gender']
- if h not in hours:
- hours[h] = {'Male': 0, 'Female': 0}
- hours[h][g] = row['total']
- # Konversi ke list untuk tabel, urut dari jam 23 ke 0
- table = []
- for h in reversed(range(24)):
- male = hours.get(h, {}).get('Male', 0)
- female = hours.get(h, {}).get('Female', 0)
- table.append({
- 'hour': f"{h:02d}:00",
- 'male': male,
- 'female': female
- })
- date_str = datetime.now().strftime('%d %B %Y')
- return render_template('hourly_gender.html', table=table)
- @app.route('/chartdata')
- def chartdata():
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
- # Donut: Gender hari ini
- cursor.execute("""
- SELECT gender, COUNT(*) as count
- FROM detections
- WHERE DATE(timestamp) = CURDATE()
- GROUP BY gender
- """)
- donut = cursor.fetchall()
- # Bar: Per jam hari ini (Male)
- cursor.execute("""
- SELECT HOUR(timestamp) AS hour, COUNT(*) as count
- FROM detections
- WHERE DATE(timestamp) = CURDATE() AND gender='Male'
- GROUP BY hour
- """)
- hourly_male = cursor.fetchall()
- # Bar: Per jam hari ini (Female)
- cursor.execute("""
- SELECT HOUR(timestamp) AS hour, COUNT(*) as count
- FROM detections
- WHERE DATE(timestamp) = CURDATE() AND gender='Female'
- GROUP BY hour
- """)
- hourly_female = cursor.fetchall()
- # Bar: Per hari (mingguan, 7 hari terakhir)
- cursor.execute("""
- SELECT DATE(timestamp) AS date,
- SUM(gender='Male') as male,
- SUM(gender='Female') as female
- FROM detections
- WHERE timestamp >= CURDATE() - INTERVAL 6 DAY
- GROUP BY date
- ORDER BY date
- """)
- weekly = cursor.fetchall()
- # Bar: Per hari di bulan berjalan
- cursor.execute("""
- SELECT DAY(timestamp) AS day,
- SUM(gender='Male') as male,
- SUM(gender='Female') as female
- FROM detections
- WHERE MONTH(timestamp) = MONTH(CURDATE()) AND YEAR(timestamp) = YEAR(CURDATE())
- GROUP BY day
- ORDER BY day
- """)
- monthly = cursor.fetchall()
- cursor.close()
- conn.close()
- return jsonify({
- "donut": donut,
- "hourly_male": hourly_male,
- "hourly_female": hourly_female,
- "weekly": weekly,
- "monthly": monthly
- })
- @app.route('/')
- def dashboard():
- if not session.get('logged_in'):
- return redirect(url_for('login'))
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
- cursor.execute("SELECT COUNT(*) as total FROM detections")
- total = cursor.fetchone()['total']
- cursor.execute("SELECT gender, COUNT(*) as count FROM detections GROUP BY gender")
- gender_stats = cursor.fetchall()
- cursor.execute("SELECT age_predict, COUNT(*) as count FROM detections GROUP BY age_predict")
- age_stats = cursor.fetchall()
- cursor.close()
- conn.close()
- return render_template('dashboard.html', total=total, gender_stats=gender_stats, age_stats=age_stats)
- @app.route('/stats')
- def stats():
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
- cursor.execute("SELECT COUNT(*) as total FROM detections")
- total = cursor.fetchone()['total']
- cursor.execute("SELECT gender, COUNT(*) as count FROM detections GROUP BY gender")
- gender_stats = cursor.fetchall()
- cursor.execute("SELECT age_predict, COUNT(*) as count FROM detections GROUP BY age_predict")
- age_stats = cursor.fetchall()
- cursor.close()
- conn.close()
- return jsonify({
- "total": total,
- "gender": gender_stats,
- "age": age_stats
- })
- @app.route('/logs')
- def logs():
- if not session.get('logged_in'):
- return redirect(url_for('login'))
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
- cursor.execute("SELECT * FROM detections ORDER BY timestamp DESC")
- detections = cursor.fetchall()
- cursor.close()
- conn.close()
- return render_template('logs.html', detections=detections)
- @app.route('/video_feed')
- def video_feed():
- return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
- @app.route('/static/snapshots/<filename>')
- def snapshot(filename):
- return send_from_directory(SNAPSHOT_DIR, filename)
- @app.route("/export/<string:mode>")
- def export_xlsx(mode):
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
- today = datetime.now().strftime('%Y-%m-%d')
- if mode == "today":
- cursor.execute("""
- SELECT gender, COUNT(*) as count
- FROM detections
- WHERE DATE(timestamp) = CURDATE()
- GROUP BY gender
- """)
- rows = cursor.fetchall()
- df = pd.DataFrame(rows)
- df = df.rename(columns={"gender": "Gender", "count": "Total"})
- elif mode == "hourly":
- cursor.execute("""
- SELECT HOUR(timestamp) as hour, gender, COUNT(*) as count
- FROM detections
- WHERE DATE(timestamp) = CURDATE()
- GROUP BY hour, gender
- ORDER BY hour
- """)
- # Format tabel: jam, Male, Female
- results = cursor.fetchall()
- data = []
- for h in range(24):
- row_m = next((r for r in results if r["hour"] == h and r["gender"] == "Male"), None)
- row_f = next((r for r in results if r["hour"] == h and r["gender"] == "Female"), None)
- data.append({
- "Hour": f"{h:02d}:00",
- "Male": row_m["count"] if row_m else 0,
- "Female": row_f["count"] if row_f else 0
- })
- df = pd.DataFrame(data)
- elif mode == "weekly":
- cursor.execute("""
- SELECT DATE(timestamp) AS date,
- SUM(gender='Male') as Male,
- SUM(gender='Female') as Female
- FROM detections
- WHERE timestamp >= CURDATE() - INTERVAL 6 DAY
- GROUP BY date
- ORDER BY date
- """)
- rows = cursor.fetchall()
- df = pd.DataFrame(rows)
- df = df.rename(columns={"date": "Date"})
- elif mode == "monthly":
- cursor.execute("""
- SELECT WEEK(timestamp, 1) AS week,
- SUM(gender='Male') as Male,
- SUM(gender='Female') as Female
- FROM detections
- WHERE YEAR(timestamp) = YEAR(CURDATE())
- AND MONTH(timestamp) = MONTH(CURDATE())
- GROUP BY week
- ORDER BY week
- """)
- rows = cursor.fetchall()
- df = pd.DataFrame(rows)
- df = df.rename(columns={"week": "Week"})
- else:
- cursor.close()
- conn.close()
- return "Invalid mode", 400
- cursor.close()
- conn.close()
- # Konversi DataFrame ke XLSX (in-memory)
- output = BytesIO()
- df.to_excel(output, index=False)
- output.seek(0)
- return send_file(
- output,
- download_name=f"people_counting_{mode}_{today}.xlsx",
- as_attachment=True,
- mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
- )
- @app.route("/export/monthly")
- def export_xlsx_monthly():
- conn = get_db_connection()
- cursor = conn.cursor(dictionary=True)
- # Hitung jumlah total Male & Female dalam bulan berjalan
- cursor.execute("""
- SELECT
- DATE_FORMAT(timestamp, '%M %Y') AS Month,
- SUM(gender='Male') as Male,
- SUM(gender='Female') as Female
- FROM detections
- WHERE YEAR(timestamp) = YEAR(CURDATE())
- AND MONTH(timestamp) = MONTH(CURDATE())
- """)
- row = cursor.fetchone() # Hanya satu baris (total bulan ini)
- df = pd.DataFrame([row])
- output = BytesIO()
- df.to_excel(output, index=False)
- output.seek(0)
- cursor.close()
- conn.close()
- today = datetime.now().strftime('%Y-%m-%d')
- return send_file(
- output,
- download_name=f"people_counting_monthly_{today}.xlsx",
- as_attachment=True,
- mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
- )
- if __name__ == '__main__':
- app.run(debug=True, threaded=True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement