Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from FSEvents import *
- import objc
- import os, sys
- class Autotester(object):
- def __init__(self):
- self.snapshot = {}
- self.target = os.path.abspath(os.curdir)
- def run(self):
- if self.__check_changes():
- self.__run_nose()
- def __remove_from(self, l, items):
- for i in items:
- l.remove(i)
- def __check_changes(self):
- changes = []
- for root, dirs, files in os.walk(self.target):
- self.__remove_from(dirs, [d for d in dirs if d.startswith('.')])
- self.__remove_from(files, [f for f in files if not f.endswith('.py')])
- for name in files:
- f = os.path.join(root, name)
- last_update = os.path.getmtime(f)
- last_known = self.snapshot.setdefault(f, last_update)
- if last_update != last_known:
- changes.append(f)
- self.snapshot[f] = last_update
- return changes
- def __run_nose(self):
- try:
- if os.system('nosetests ' + self.target):
- os.system('growlnotify -n autotest -p 0 -m "what the hell dude?"')
- else:
- os.system('growlnotify -n autotest -p 0 -m "yeah, boi"')
- except:
- pass
- tester = Autotester()
- def timer_callback(timer, stream):
- FSEventStreamFlushAsync(stream)
- def fsevents_callback(stream, clientInfo, numEvents, eventPaths, eventMasks, eventIDs):
- tester.run()
- def createStream(full_path):
- allocator = kCFAllocatorDefault
- callback = fsevents_callback
- path = full_path
- since_when = kFSEventStreamEventIdSinceNow
- latency = 1.0
- flags = 0
- stream = FSEventStreamCreate(kCFAllocatorDefault,
- callback,
- path,
- [path],
- since_when,
- latency,
- flags)
- assert stream, "ERROR: FSEVentStreamCreate() => NULL"
- return stream
- def run_loop(stream):
- FSEventStreamScheduleWithRunLoop(stream, CFRunLoopGetCurrent(), kCFRunLoopDefaultMode)
- assert FSEventStreamStart(stream), "Failed to start stream"
- timer = CFRunLoopTimerCreate(kCFAllocatorDefault,
- CFAbsoluteTimeGetCurrent() + 1.0,
- 1.0,
- 0,
- 0,
- timer_callback,
- stream)
- CFRunLoopAddTimer(CFRunLoopGetCurrent(), timer, kCFRunLoopDefaultMode)
- try:
- CFRunLoopRun()
- finally:
- FSEventStreamStop(stream)
- FSEventStreamInvalidate(stream)
- FSEventStreamRelease(stream)
- def main():
- tester.target = sys.argv[1]
- tester.run()
- abspath = os.path.join(os.path.abspath(os.curdir), tester.target)
- run_loop(createStream(abspath))
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement