Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- from __future__ import print_function
- import sys
- import time
- import re
- # import pyuarm
- import yaml
- from pprint import pformat
- from uf.utils.log import logger_init, logging
- from uf.wrapper.swift_api import SwiftAPI
- from uf.wrapper.uarm_api import UarmAPI
- SPEED_DEFAULT = 200
- Z_DEFAULT = 150
- SIMPLE_RESPONSE_REGEX = r"ok V(?P<response>\S+)"
- TEMP_RESPONSE_REGEX = r"ok T:(?P<response>\S+)"
- def main():
- config = {}
- with open("config.yaml", 'r') as ymlfile:
- config = yaml.load(ymlfile)
- debug = config.get('debug')
- if debug:
- logger_init(logging.DEBUG)
- arm = None
- try:
- arm = get_api(config)
- except Exception as exc:
- if debug:
- import pudb; pudb.set_trace()
- raise exc
- print("Error connecting to the arm", file=sys.stderr)
- exit(1)
- time.sleep(2)
- device_info = arm.get_device_info()
- logging.info('device info:\n%s' % pformat(device_info))
- fw_version = tuple(int(number) for number in device_info[2].split('.'))
- logging.info('firmware version:\n%s' % (fw_version, ))
- mode_response = arm.send_cmd_sync("M2400 S1")
- logging.info('mode response:\n%s' % mode_response)
- if not high5(arm, config):
- print("Error excuting one of the operations.", file=sys.stderr)
- exit(2)
- arm.set_servo_detach(wait=True)
- def get_api(config):
- device = config.get('device', 'swift')
- if device == 'swift':
- logging.info('setup swift ...')
- api = SwiftAPI()
- elif device == 'metal':
- logging.info('setup metal ...')
- api = UarmAPI()
- return api
- def high5(arm, config):
- """
- Perform high-five actions on the uarm using the API instance `arm`
- """
- wait = config.get('wait', False)
- z = config.get('z', Z_DEFAULT)
- speed = config.get('speed', SPEED_DEFAULT)
- if not home_arm(arm, config):
- # fails fast instead of failing at the end
- return
- movements = [
- # raise arm
- {'x':150, 'y':0, 'z':z, 'speed':speed, 'wait':wait},
- ]
- direction = config.get('direction', 'cw')
- if direction == 'cw':
- movements.extend([
- # backswing
- {'x':150, 'y':70, 'z':z, 'speed':speed, 'wait':wait},
- # in
- {'x':150, 'y':-70, 'z':z, 'speed':speed, 'wait':wait},
- ])
- else:
- movements.extend([
- # backswing
- {'x':150, 'y':-70, 'z':z, 'speed':speed, 'wait':wait},
- # in
- {'x':150, 'y':70, 'z':z, 'speed':speed, 'wait':wait},
- ])
- for movement in movements:
- if not perform_movement(arm, **movement):
- return
- time.sleep(0.1)
- while arm.get_is_moving():
- time.sleep(0.1)
- time.sleep(0.3)
- if not home_arm(arm, config):
- return
- return True
- def home_arm(arm, config={}):
- wait = config.get('wait', False)
- speed = config.get('speed', SPEED_DEFAULT)
- z = config.get('z', Z_DEFAULT)
- if not perform_movement(arm, x=150, y=0, z=z, speed=speed, wait=wait):
- return
- return True
- def perform_movement(arm, **movement):
- logging.info('set ' + ' '.join([
- "%s:%s" % (key.upper(), value) for key, value in movement.items()
- ]))
- if not arm.set_position(**movement):
- return
- time.sleep(0.1)
- while arm.get_is_moving():
- time.sleep(0.1)
- return True
- def send_cmd_sync_ok(arm, command, response_regex=None):
- """
- Send a command and wait for it to complete. Optionally parse the response.
- """
- logging.debug("sending command \"%s\"" % command)
- response = arm.send_cmd_sync(command)
- logging.debug("command response \"%s\"" % response)
- if not response.startswith("ok"):
- raise RuntimeError("command \"%s\" failed: %s" % (command, response))
- if response_regex:
- response_match = re.search(response_regex, response)
- if response_match and 'response' in response_match.groupdict():
- response = response_match.groupdict()['response']
- else:
- raise ValueError(
- "response \"%s\" for command \"%s\" did not match regex \"%s\"" %
- (response, command, response_regex)
- )
- return response
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment