Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import dbus
- import dbus.decorators
- import dbus.glib
- import cv2.cv as cv
- import time
- import sys
- class MotionDetectorInstantaneous():
- def onChange(self, val): #callback when the user change the detection threshold
- self.threshold = val
- def __init__(self, threshold=8, showWindows=True, callback=None):
- self.callback = callback
- self.writer = None
- self.font = None
- self.show = showWindows #Either or not show the 2 windows
- self.frame = None
- self.verbose = True
- self.capture = cv.CaptureFromCAM(0)
- self.frame = cv.QueryFrame(self.capture) #Take a frame to init recorder
- self.frame1gray = cv.CreateMat(self.frame.height, self.frame.width, cv.CV_8U) #Gray frame at t-1
- cv.CvtColor(self.frame, self.frame1gray, cv.CV_RGB2GRAY)
- #Will hold the thresholded result
- self.res = cv.CreateMat(self.frame.height, self.frame.width, cv.CV_8U)
- self.frame2gray = cv.CreateMat(self.frame.height, self.frame.width, cv.CV_8U) #Gray frame at t
- self.width = self.frame.width
- self.height = self.frame.height
- self.nb_pixels = self.width * self.height
- self.threshold = threshold
- self.isRecording = False
- self.trigger_time = 0 #Hold timestamp of the last detection
- if showWindows:
- cv.NamedWindow("MoveDetection")
- cv.CreateTrackbar("Detection treshold: ", "MoveDetection", self.threshold, 100, self.onChange)
- def processImage(self, frame):
- cv.CvtColor(frame, self.frame2gray, cv.CV_RGB2GRAY)
- #Absdiff to get the difference between to the frames
- cv.AbsDiff(self.frame1gray, self.frame2gray, self.res)
- #Remove the noise and do the threshold
- cv.Smooth(self.res, self.res, cv.CV_BLUR, 5,5)
- cv.MorphologyEx(self.res, self.res, None, None, cv.CV_MOP_OPEN)
- cv.MorphologyEx(self.res, self.res, None, None, cv.CV_MOP_CLOSE)
- cv.Threshold(self.res, self.res, 10, 255, cv.CV_THRESH_BINARY_INV)
- def somethingHasMoved(self):
- nb=0 #Will hold the number of black pixels
- for x in range(self.height): #Iterate the hole image
- for y in range(self.width):
- if self.res[x,y] == 0.0: #If the pixel is black keep it
- nb += 1
- avg = (nb*100.0)/self.nb_pixels #Calculate the average of black pixel in the image
- if avg > self.threshold:#If over the ceiling trigger the alarm
- return True
- else:
- return False
- def iteration(self):
- curframe = cv.QueryFrame(self.capture)
- self.processImage(curframe) #Process the image
- if self.somethingHasMoved():
- if self.callback:
- self.callback()
- if self.show:
- cv.ShowImage("MoveDetection", curframe)
- #cv.ShowImage("Res", self.res)
- cv.Copy(self.frame2gray, self.frame1gray)
- c = cv.WaitKey(1) % 0x100
- #Break if user enters 'Esc'.
- if c == 27 or c == 10:
- return True
- class FaceActivityMonitor():
- def __init__(self, appear_callback, disappear_callback, capture=None):
- self.init_variables()
- self.init_face_detect(capture)
- self.appear_callback = appear_callback
- self.disappear_callback = disappear_callback
- def cleanup(self):
- if self.show:
- cv.DestroyWindow("FaceDetection")
- def init_variables(self):
- self.cascade_name = "/usr/share/opencv/haarcascades/haarcascade_frontalface_alt.xml"
- self.device = 0
- self.show = True
- self.verbose = False
- self.face_threshold = 2
- self.noface_threshold = 5
- # Parameters for haar detection
- # From the API:
- # The default parameters (scale_factor=1.1, min_neighbors=3, flags=0) are tuned
- # for accurate yet slow object detection. For a faster operation on real video
- # images the settings are:
- # scale_factor=1.2, min_neighbors=2, flags=CV_HAAR_DO_CANNY_PRUNING,
- # min_size=<minimum possible face size
- self.min_size = (50, 50)
- self.image_scale = 1.2
- self.haar_scale = 1.2
- self.min_neighbors = 2
- self.haar_flags = cv.CV_HAAR_DO_CANNY_PRUNING
- self.cascase = None
- self.capture = None
- self.storage = None
- self.last_time = 0
- self.frame_copy = None
- self.count_face = 0
- self.count_noface = 0
- def init_face_detect(self, capture):
- # the OpenCV API says this function is obsolete, but we can't
- # cast the output of cvLoad to a HaarClassifierCascade, so use this anyways
- # the size parameter is ignored
- #self.cascade = cv.LoadHaarClassifierCascade(self.cascade_name, (1, 1))
- self.cascade = cv.Load(self.cascade_name)
- if not self.cascade:
- print "ERROR: Could not load classifier cascade"
- sys.exit(-1)
- if not capture:
- self.capture = cv.CreateCameraCapture(int(self.device))
- else:
- self.capture = capture
- self.storage = cv.CreateMemStorage(0)
- if self.show:
- cv.NamedWindow("FaceDetection", 1)
- def detect_and_draw(self, image):
- gray = cv.CreateImage((image.width, image.height), 8, 1)
- small_image = cv.CreateImage((cv.Round(image.width/self.image_scale),
- cv.Round(image.height/self.image_scale)),
- 8, 1)
- cv.CvtColor(image, gray, cv.CV_BGR2GRAY)
- cv.Resize(gray, small_image, cv.CV_INTER_LINEAR)
- cv.EqualizeHist(small_image, small_image)
- #cv.ClearMemStorage(self.storage)
- faces = cv.HaarDetectObjects(small_image,
- self.cascade,
- self.storage,
- self.haar_scale,
- self.min_neighbors,
- self.haar_flags,
- self.min_size)
- if self.show and faces:
- for ((x, y, w, h), n) in faces:
- pt1 = (int(x*self.image_scale), int(y*self.image_scale))
- pt2 = (int((x+w)*self.image_scale), int((y+h)*self.image_scale))
- cv.Rectangle(image, pt1, pt2, cv.CV_RGB(255,0,0), 3, 8, 0)
- cv.ShowImage("FaceDetection", image)
- return len(faces) > 0
- def iteration(self):
- frame = cv.QueryFrame(self.capture)
- if not frame:
- return False
- if frame.origin == cv.IPL_ORIGIN_TL:
- user_detected = self.detect_and_draw(frame)
- else:
- if not self.frame_copy:
- self.frame_copy = cv.CreateImage((frame.width,frame.height),
- cv.IPL_DEPTH_8U, frame.nChannels)
- cv.Flip(frame, self.frame_copy, 0)
- user_detected = self.detect_and_draw(self.frame_copy)
- self.inform_workrave(user_detected)
- c = cv.WaitKey(1) % 0x100
- if c == 27 or c == 10:
- return True
- def inform_workrave(self, user_detected):
- if user_detected:
- self.count_face += 1
- self.count_noface = 0
- else:
- self.count_noface += 1
- self.count_face = 0
- if self.count_face >= self.face_threshold:
- if self.verbose and self.count_face == self.face_threshold:
- print "Reporting user presence"
- now = time.time()
- if now > self.last_time + 5:
- self.appear_callback()
- self.last_time = now
- if self.count_noface == self.noface_threshold:
- if self.verbose:
- print "Reporting user absence"
- self.disappear_callback()
- class RingWorkrave():
- def __init__(self):
- self.verbose = True
- self.motion_detector = MotionDetectorInstantaneous(callback=self.move)
- self.face_detection = FaceActivityMonitor(self.face_appeared, self.disappear, self.motion_detector.capture)
- self.ignore = False
- self.init_dbus()
- def init_dbus(self):
- bus = dbus.SessionBus()
- obj = bus.get_object("org.workrave.Workrave", "/org/workrave/Workrave/Core")
- self.workrave = dbus.Interface(obj, "org.workrave.CoreInterface")
- self.workrave.connect_to_signal("MicrobreakChanged",
- self.microbreak_signal, sender_keyword='sender')
- self.workrave.connect_to_signal("RestbreakChanged",
- self.restbreak_signal, sender_keyword='sender')
- self.workrave.connect_to_signal("DailylimitChanged",
- self.dailylimit_signal, sender_keyword='sender')
- def move(self):
- if self.ignore:
- return
- if self.verbose:
- print "Motion detected"
- self.workrave.ReportActivity("facedetect", True)
- def face_appeared(self):
- if self.ignore:
- return
- if self.verbose:
- print "Face detected"
- self.workrave.ReportActivity("facedetect", True)
- def disappear(self):
- if self.ignore:
- return
- if self.verbose:
- print "Face disappeared"
- self.workrave.ReportActivity("facedetect", False)
- def microbreak_signal(self, progress, sender=None):
- self.break_signal("microbreak", progress)
- def restbreak_signal(self, progress, sender=None):
- self.break_signal("restbreak", progress)
- def dailylimit_signal(self, progress, sender=None):
- self.break_signal("dailylimit", progress)
- def break_signal(self, breakid, progress, sender=None):
- if progress != "none":
- self.ignore = True
- self.workrave.ReportActivity("facedetect", False)
- else:
- self.ignore = False
- if self.verbose:
- if progress == "prelude":
- print "Ignoring user presence: %s warning" % breakid
- elif progress == "break":
- print "Ignoring user presence: %s started" % breakid
- elif progress == "none":
- print "Resuming presence monitoring: Break %s idle" % breakid
- else:
- print "Unknown progress for %s: %s" % (breakid, progress)
- def run(self):
- while True:
- if self.motion_detector:
- if self.motion_detector.iteration():
- break
- if self.face_detection:
- if self.face_detection.iteration():
- break
- if __name__ == "__main__":
- ring = RingWorkrave()
- ring.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement