Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # copyright rethink robotics
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions are met
- import errno
- import rospy
- import baxter_dataflow
- from baxter_core_msgs.msg import (
- AnalogIOState,
- AnalogOutputCommand,
- )
- class AnalogIO(object):
- """
- Interface class for a simple Analog Input and/or Output on the
- Baxter robot.
- Input - read input state
- Output - set new output state
- - read current output state
- """
- def __init__(self, component_id):
- """
- @param component_id: unique id of analog component
- """
- self._id = component_id
- self._component_type = 'analog_io'
- self._is_output = False
- self._state = dict()
- type_ns = '/robot/' + self._component_type
- topic_base = type_ns + '/' + self._id
- self._sub_state = rospy.Subscriber(
- topic_base + '/state',
- AnalogIOState,
- self._on_io_state)
- baxter_dataflow.wait_for(
- lambda: len(self._state.keys()) != 0,
- timeout=2.0,
- timeout_msg="Failed to get current analog_io state from %s"
- % (topic_base,),
- )
- # check if output-capable before creating publisher
- if self._is_output:
- self._pub_output = rospy.Publisher(
- type_ns + '/command',
- AnalogOutputCommand)
- def _on_io_state(self, msg):
- """
- Updates the internally stored state of the Analog Input/Output.
- """
- self._is_output = not msg.isInputOnly
- self._state['value'] = msg.value
- def state(self):
- """
- Return the latest value of the Analog Input/Output.
- """
- return self._state['value']
- def is_output(self):
- """
- Accessor to check if IO is capable of output.
- """
- return self._is_output
- def set_output(self, value, timeout=2.0):
- """
- Control the state of the Analog Output
- @type value: uint16
- @param value: new state of the Output.
- @type timeout: float
- @param timeout: Seconds to wait for the io to reflect command.
- if 0, just command once and return. [0]
- """
- if not self._is_output:
- raise IOError(errno.EACCES, "Component is not an output [%s: %s]" %
- (self._component_type, self._id))
- cmd = AnalogOutputCommand()
- cmd.name = self._id
- cmd.value = value
- self._pub_output.publish(cmd)
- if not timeout == 0:
- baxter_dataflow.wait_for(
- test=lambda: self.state() == value,
- timeout=timeout,
- rate=100,
- timeout_msg=("Failed to command analog io to: %d" % (value,)),
- body=lambda: self._pub_output.publish(cmd)
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement