Advertisement
rfmonk

baxter_analog_io_copy.py

Jan 29th, 2014
116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.91 KB | None | 0 0
  1. #!/usr/bin/env python
  2.  
  3. # copyright rethink robotics
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions are met
  6.  
  7. import errno
  8. import rospy
  9. import baxter_dataflow
  10. from baxter_core_msgs.msg import (
  11.     AnalogIOState,
  12.     AnalogOutputCommand,
  13. )
  14.  
  15.  
  16. class AnalogIO(object):
  17.     """
  18.    Interface class for a simple Analog Input and/or Output on the
  19.    Baxter robot.
  20.  
  21.    Input   -   read input state
  22.    Output  -   set new output state
  23.            -   read current output state
  24.    """
  25.     def __init__(self, component_id):
  26.         """
  27.        @param component_id: unique id of analog component
  28.        """
  29.         self._id = component_id
  30.         self._component_type = 'analog_io'
  31.         self._is_output = False
  32.  
  33.         self._state = dict()
  34.  
  35.         type_ns = '/robot/' + self._component_type
  36.         topic_base = type_ns + '/' + self._id
  37.  
  38.         self._sub_state = rospy.Subscriber(
  39.             topic_base + '/state',
  40.             AnalogIOState,
  41.             self._on_io_state)
  42.  
  43.         baxter_dataflow.wait_for(
  44.             lambda: len(self._state.keys()) != 0,
  45.             timeout=2.0,
  46.             timeout_msg="Failed to get current analog_io state from %s"
  47.             % (topic_base,),
  48.         )
  49.  
  50.         # check if output-capable before creating publisher
  51.         if self._is_output:
  52.             self._pub_output = rospy.Publisher(
  53.                 type_ns + '/command',
  54.                 AnalogOutputCommand)
  55.  
  56.     def _on_io_state(self, msg):
  57.         """
  58.        Updates the internally stored state of the Analog Input/Output.
  59.        """
  60.         self._is_output = not msg.isInputOnly
  61.         self._state['value'] = msg.value
  62.  
  63.     def state(self):
  64.         """
  65.        Return the latest value of the Analog Input/Output.
  66.        """
  67.         return self._state['value']
  68.  
  69.     def is_output(self):
  70.         """
  71.        Accessor to check if IO is capable of output.
  72.        """
  73.         return self._is_output
  74.  
  75.     def set_output(self, value, timeout=2.0):
  76.         """
  77.        Control the state of the Analog Output
  78.  
  79.        @type value: uint16
  80.        @param value: new state of the Output.
  81.        @type timeout: float
  82.        @param timeout: Seconds to wait for the io to reflect command.
  83.                        if 0, just command once and return. [0]
  84.        """
  85.         if not self._is_output:
  86.             raise IOError(errno.EACCES, "Component is not an output [%s: %s]" %
  87.                           (self._component_type, self._id))
  88.         cmd = AnalogOutputCommand()
  89.         cmd.name = self._id
  90.         cmd.value = value
  91.         self._pub_output.publish(cmd)
  92.  
  93.         if not timeout == 0:
  94.             baxter_dataflow.wait_for(
  95.                 test=lambda: self.state() == value,
  96.                 timeout=timeout,
  97.                 rate=100,
  98.                 timeout_msg=("Failed to command analog io to: %d" % (value,)),
  99.                 body=lambda: self._pub_output.publish(cmd)
  100.             )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement