Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- """Watches the RC car for movement."""
- from collections import deque
- import argparse
- import datetime
- import math
- import os
- import socket
- import subprocess
- import sys
- import threading
- import time
- from common import format_command
- from common import server_up
- # pylint: disable=superfluous-parens
- # pylint: disable=global-statement
- CHANNELS_49_MHZ = (49.830, 49.845, 49.860, 49.875, 49.890)
- CHANNELS_27_MHZ = (26.995, 27.045, 27.095, 27.145, 27.195, 27.255)
- def normalize(img, bit_depth=None):
- """Linear normalization and conversion to grayscale of an image."""
- from PIL import ImageOps
- img = ImageOps.grayscale(img)
- img = ImageOps.autocontrast(img)
- if bit_depth is not None:
- img = ImageOps.posterize(img, bit_depth)
- return img
- def mean(values):
- """Calculate mean of the values."""
- return sum(values) / len(values)
- def standard_deviation(values, mean_=None):
- """Calculate standard deviation."""
- if mean_ is None:
- mean_ = mean(values)
- size = len(values)
- sum_ = 0.0
- for value in values:
- sum_ += math.sqrt((value - mean_) ** 2)
- return math.sqrt((1.0 / (size - 1)) * (sum_ / size))
- class PictureWrapper(threading.Thread):
- """Threaded wrapper to access images. Running image capture commands
- can take multiple seconds, so rather than try a command code and wait
- for an image, this will run in another thread and capture images
- continually. This might not give us the exact command code that
- produced a result, but that shoud be fine.
- """
- def __init__(self, command_tuple):
- super(PictureWrapper, self).__init__()
- self._command_tuple = command_tuple
- self._image = None
- self._run = True
- self._lock = threading.Lock()
- def run(self):
- """Runs in a thread, continually captures images."""
- while self._run:
- start = time.time()
- new_image = self._get_picture_from_command()
- with self._lock:
- self._image = new_image
- # Limit the capture to once per second, in case it's really fast
- while time.time() < start + 1.0:
- time.sleep(0.1)
- def stop(self):
- """Stop capturing images."""
- self._run = False
- def get_picture(self):
- """Returns the most recent picture."""
- while self._image is None:
- time.sleep(0.1)
- with self._lock:
- return self._image.copy()
- def _get_picture_from_command(self):
- """Saves a picture from stdout generated by running the command."""
- from PIL import Image
- import StringIO
- # Use pipes to avoid writing to the disk
- pipe = subprocess.Popen(self._command_tuple, stdout=subprocess.PIPE)
- file_buffer = StringIO.StringIO(pipe.stdout.read())
- pipe.stdout.close()
- image = Image.open(file_buffer)
- return image
- def percent_difference(image_1, image_2):
- """Returns the percent difference between two images."""
- assert image_1.mode == image_2.mode, 'Different kinds of images.'
- assert image_1.size == image_2.size, 'Different sizes.'
- # TODO: Stop doing this in Python. It's incredibly slow, and on the
- # Rapberry Pi, I had to reduce the image size a lot to get it to process
- # with under a 1 second interval per command code. Maybe see
- # http://help.simplecv.org/question/2192/absolute-difference-between-images/
- pairs = zip(image_1.getdata(), image_2.getdata())
- if len(image_1.getbands()) == 1:
- # for gray-scale jpegs
- diff = sum(abs(p1 - p2) for p1, p2 in pairs)
- else:
- diff = sum(abs(c1 - c2) for p1, p2 in pairs for c1, c2 in zip(p1, p2))
- ncomponents = image_1.size[0] * image_1.size[1] * 3
- return (diff / 255.0 * 100.0) / ncomponents
- def command_iterator(frequency):
- """Iterates through the frequencies and commands."""
- for useconds in range(100, 1201, 100):
- for sync_multiplier in range(2, 7):
- for sync_repeats in range(2, 7):
- for signal_repeats in range(5, 50):
- yield (
- frequency,
- useconds,
- sync_multiplier,
- sync_repeats,
- signal_repeats,
- )
- def search_for_command_codes(
- host,
- port,
- frequencies,
- get_picture_function=None,
- bit_depth=None
- ):
- """Iterates through commands and looks for changes in the webcam."""
- if get_picture_function is not None:
- diffs = deque()
- pictures = deque()
- base = normalize(get_picture_function(), bit_depth=bit_depth)
- try:
- base.save('normalized-test.png')
- except Exception:
- pass
- time.sleep(1)
- print('Filling base photos for difference analysis')
- for _ in range(20):
- recent = normalize(get_picture_function(), bit_depth=bit_depth)
- diff = percent_difference(base, recent)
- time.sleep(1)
- diffs.append(diff)
- pictures.append(recent)
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- print(
- 'Searching for command codes on {frequencies}'.format(
- frequencies=', '.join((str(f) for f in frequencies))
- )
- )
- command_tuple_description = ('freq', 'useconds', 'multiplier', 'sync_repeats', 'signal_repeats')
- start = time.time()
- previous_image = None
- if get_picture_function is not None:
- previous_image = get_picture_function()
- for frequency in frequencies:
- for command_tuple in command_iterator(frequency):
- # pylint: disable=star-args
- command = format_command(*command_tuple)
- if sys.version_info.major == 3:
- command = bytes(command, 'utf-8')
- sock.sendto(command, (host, port))
- if get_picture_function is not None:
- # Normalizing and processing the image takes a long time, so
- # we'll do the normalization while we're waiting for the
- # command to broadcast for a full second
- recent = normalize(previous_image, bit_depth=bit_depth)
- # Let's compare the most recent photo to the oldest one,
- # in case a cloud passes over and the brightness changes
- diff = percent_difference(pictures[0], recent)
- std_dev = standard_deviation(diffs)
- mean_ = mean(diffs)
- while time.time() < start + 1.0:
- time.sleep(0.1)
- start = time.time()
- if get_picture_function is None:
- print(
- ' '.join((
- (desc + ':' + str(value)) for desc, value in zip(
- command_tuple_description,
- command_tuple
- )
- ))
- )
- else:
- previous_image = get_picture_function()
- # I should be doing a z-test or something here... eh
- if abs(diff - mean_) > (std_dev * 3.0) and diff > 2.0:
- print('Found substantially different photo, saving...')
- print(
- 'diff={diff}, mean={mean}, std dev={std_dev}'
- ' at {time}'.format(
- diff=diff,
- mean=mean_,
- std_dev=std_dev,
- time=str(datetime.datetime.now())
- )
- )
- file_name = '-'.join(
- (desc + '-' + str(value)) for desc, value in zip(
- command_tuple_description,
- command_tuple
- )
- )
- try:
- image = get_picture_function()
- image.save(file_name + '.png')
- except Exception as exc:
- print(file_name)
- print('Unable to save photo: ' + str(exc))
- time.sleep(2)
- diffs.popleft()
- diffs.append(diff)
- pictures.popleft()
- pictures.append(recent)
- def make_parser():
- """Builds and returns an argument parser."""
- parser = argparse.ArgumentParser(
- description='Iterates through and broadcasts command codes and'
- ' monitors the webcam to watch for movement from the RC car.'
- )
- parser.add_argument(
- '-p',
- '--port',
- dest='port',
- help='The port to send control commands to.',
- default=12345,
- type=int
- )
- parser.add_argument(
- '-s',
- '--server',
- dest='server',
- help='The server to send control commands to.',
- default='127.1'
- )
- parser.add_argument(
- '-f',
- '--frequency',
- dest='frequency',
- help='The frequency to broadcast commands on.',
- default=49,
- type=float
- )
- def bit_depth_checker(bit_depth):
- """Checks that the bit depth argument is valid."""
- try:
- bit_depth = int(bit_depth)
- except:
- raise argparse.ArgumentTypeError('Bit depth must be an int')
- if not 1 <= bit_depth <= 8:
- raise argparse.ArgumentTypeError(
- 'Bit depth must be between 1 and 8 inclusive'
- )
- return bit_depth
- parser.add_argument(
- '-b',
- '--bit-depth',
- dest='bit_depth',
- help='The bit depth to reduce images to.',
- type=bit_depth_checker,
- default=1
- )
- parser.add_argument(
- '--no-camera',
- dest='no_camera',
- help='Disable the camera and image recognition. You will need to watch'
- ' the RC car manually.',
- action='store_true',
- default=False
- )
- parser.add_argument(
- '--raspberry-pi-camera',
- dest='raspi_camera',
- help='Force the use of the Raspberry Pi camera.',
- action='store_true',
- default=False
- )
- parser.add_argument(
- '--webcam',
- dest='webcam',
- help='Force the use of a webcam.',
- action='store_true',
- default=False
- )
- return parser
- def main():
- """Parses command line arguments and runs the interactive controller."""
- parser = make_parser()
- args = parser.parse_args()
- # Remove the default image to make sure that we're not processing images
- # from a previous run
- try:
- os.remove('photo.png')
- except OSError:
- pass
- if not server_up(args.server, args.port, args.frequency):
- print(
- '''Server does not appear to be listening for messages, aborting.
- Did you run pi_pcm?'''
- )
- return
- webcam = args.webcam
- raspi_camera = args.raspi_camera
- if not args.no_camera:
- try:
- import PIL as _
- except ImportError:
- sys.stderr.write(
- '''Using the camera to detect movement requires the Python PIL libraries. You
- can install them by running:
- apt-get install python-imaging
- Or, you can use the `--no-camera` option and just watch the RC car for
- movement. When it does, hit <Ctrl> + C and use the last printed values.
- '''
- )
- sys.exit(1)
- if webcam and raspi_camera:
- sys.stderr.write(
- 'You can only specify one of --webcam and --rasberry-pi-camera.'
- )
- sys.exit(1)
- if not webcam and not raspi_camera:
- # Find which to use
- raspi_exists = subprocess.call(
- ('/usr/bin/which', 'raspistill'),
- stdout=open('/dev/null', 'w')
- )
- if raspi_exists == 0:
- raspi_camera = True
- else:
- raspi_camera = False
- webcam = not raspi_camera
- # RC cars in the 27 and 49 MHz spectrum typically operate on one of a
- # several channels in that frequency, but most toy RC cars that I've
- # seen only list the major frequency on the car itself. If someone
- # enters a major frequency, search each channel.
- if args.frequency == 49:
- frequencies = CHANNELS_49_MHZ
- elif args.frequency == 27:
- frequencies = CHANNELS_27_MHZ
- else:
- frequencies = [args.frequency]
- print('Sending commands to ' + args.server + ':' + str(args.port))
- picture_thread = None
- try:
- if args.no_camera:
- picture_function = None
- elif webcam:
- print('Using images from webcam')
- picture_thread = PictureWrapper(
- ('streamer', '-f', 'jpeg', '-s', '640x480', '-o', '/dev/stdout')
- )
- picture_thread.start()
- picture_function = picture_thread.get_picture
- elif raspi_camera:
- print('Using images from Raspberry Pi camera module')
- # Use a smaller size here, because computing the % difference
- # on the Pi is incredibly slow
- picture_thread = PictureWrapper(
- ('raspistill', '-w', '320', '-h', '240', '-t', '100', '-o', '-')
- )
- picture_thread.start()
- picture_function = picture_thread.get_picture
- else:
- print('Not using camera')
- picture_function = None
- search_for_command_codes(
- args.server,
- args.port,
- frequencies,
- get_picture_function=picture_function,
- bit_depth=args.bit_depth,
- )
- # pylint: disable=broad-except
- except (KeyboardInterrupt, Exception) as exc:
- print('Caught exception, exiting')
- print(str(exc))
- if picture_thread is not None:
- picture_thread.stop()
- picture_thread.join()
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement