Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # -*- Python -*-
- """
- @file Ministick.py
- @brief Phidget ministick sensor component
- @date $Date$
- """
- import sys
- import time
- sys.path.append(".")
- # Import RTM module
- import RTC
- import OpenRTM_aist
- import math
- import spidev
- # Import Service implementation class
- # <rtc-template block="service_impl">
- # </rtc-template>
- # Import Service stub modules
- # <rtc-template block="consumer_import">
- # </rtc-template>
- # This module's spesification
- # <rtc-template block="module_spec">
- ministick_spec = ["implementation_id", "Ministick",
- "type_name", "Ministick",
- "description", "Phidget ministick sensor component",
- "version", "1.0.0",
- "vendor", "AIST",
- "category", "Input Devic",
- "activity_type", "STATIC",
- "max_instance", "1",
- "language", "Python",
- "lang_type", "SCRIPT",
- "conf.default.scaling", "1.0",
- "conf.default.tread", "0.2",
- "conf.default.print_xy", "NO",
- "conf.default.print_vel", "NO",
- "conf.default.print_wvel", "NO",
- "conf.__widget__.scaling", "slider.0.1",
- "conf.__widget__.tread", "slider.0.01",
- "conf.__widget__.print_xy", "radio",
- "conf.__widget__.print_vel", "radio",
- "conf.__widget__.print_wvel", "radio",
- "conf.__constraints__.scaling", "0.0<=x<=10.0",
- "conf.__constraints__.tread", "0.0<=x<=1.0",
- "conf.__constraints__.print_xy", "(YES,NO)",
- "conf.__constraints__.print_vel", "(YES,NO)",
- "conf.__constraints__.print_wvel", "(YES,NO)",
- ""]
- # </rtc-template>
- ##
- # @class Ministick
- # @brief Phidget ministick sensor component
- #
- #
- class Ministick(OpenRTM_aist.DataFlowComponentBase):
- ##
- # @brief constructor
- # @param manager Maneger Object
- #
- def __init__(self, manager):
- OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
- self._d_pos = RTC.TimedFloatSeq(RTC.Time(0,0),[])
- """
- ジョイスティックのX-Y位置データ
- - Semantics: data[0]:x位置, data[1]:y位置
- """
- self._posOut = OpenRTM_aist.OutPort("pos", self._d_pos)
- self._d_vel = RTC.TimedVelocity2D(RTC.Time(0,0),0)
- """
- 移動ロボットの速度ベクトル
- - Semantics: vx:並進速度, vy:0.0m va:角速度
- - Unit: vx[m/s], va[rad/s]
- """
- self._velOut = OpenRTM_aist.OutPort("vel", self._d_vel)
- self._d_whell_vel = RTC.TimedFloatSeq(RTC.Time(0,0),[])
- """
- 車輪速度
- - Semantics: data[0]:左車輪角速度, data[1]:右車輪角速度
- - Unit: [rad/s]
- """
- self._whell_velOut = OpenRTM_aist.OutPort("whell_vel", self._d_whell_vel)
- # initialize of configuration-data.
- # <rtc-template block="init_conf_param">
- """
- スケーリングファクタ
- - Name: scaling
- - DefaultValue: 1.0
- """
- self._scaling = [1.0]
- """
- 移動ロボットのトレッド値
- - Name: tread
- - DefaultValue: 0.2
- """
- self._tread = [0.2]
- """
- - Name: print_xy
- - DefaultValue: NO
- """
- self._print_xy = ['NO']
- """
- velデータのデバッグプリントフラグ
- - Name: print_vel
- - DefaultValue: NO
- """
- self._print_vel = ['NO']
- """
- wheel_velデータのデバッグプリントフラグ
- - Name: print_wvel
- - DefaultValue: NO
- """
- self._print_wvel = ['NO']
- # </rtc-template>
- self.x = 0.0
- self.y = 0.0
- self.spi = spidev.SpiDev()
- self.spi.open(0, 0)
- def get_adc(self, channel):
- sned_ch = [0x00, 0x08, 0x10, 0x18]
- if ((channel > 3) or (channel < 0)):
- return -1
- r = self.spi.xfer2([sned_ch[channel], 0, 0, 0])
- ret = ((r[2] << 6 ) & 0x300) | ((r[2] << 6) & 0xc0) | ((r[3] >> 2) & 0x3f)
- return ret
- def xy_to_wvel(self, x, y):
- th = math.atan2(y, x)
- v = math.hypot(x, y)
- vl = v * math.cos(th - (math.pi/4.0))
- vr = v * math.sin(th - (math.pi/4.0))
- return (vl, vr)
- def wvel_to_vel2d(self, vl, vr):
- v = (vr + vl) / 2.0
- if v < 0.0:
- w = - (vr - vl) / self._tread[0]
- else:
- w = (vr - vl) / self._tread[0]
- return RTC.Velocity2D(v, 0.0, w)
- ##
- #
- # The initialize action (on CREATED->ALIVE transition)
- # formaer rtc_init_entry()
- #
- # @return RTC::ReturnCode_t
- #
- #
- def onInitialize(self):
- # Bind variables and configuration variable
- self.bindParameter("scaling", self._scaling, "1.0")
- self.bindParameter("tread", self._tread, "0.2")
- self.bindParameter("print_xy", self._print_xy, "NO")
- self.bindParameter("print_vel", self._print_vel, "NO")
- self.bindParameter("print_wvel", self._print_wvel, "NO")
- # Set InPort buffers
- # Set OutPort buffers
- self.addOutPort("pos",self._posOut)
- self.addOutPort("vel",self._velOut)
- self.addOutPort("whell_vel",self._whell_velOut)
- # Set service provider to Ports
- # Set service consumers to Ports
- # Set CORBA Service Ports
- self.x_offset_v = 0.0
- self.y_offset_v = 0.0
- for i in range(1, 100):
- self.x_offset_v += self.get_adc(0)
- self.y_offset_v += self.get_adc(1)
- self.x_offset_v = self.x_offset_v / 100.0
- self.y_offset_v = self.y_offset_v / 100.0
- return RTC.RTC_OK
- ##
- #
- # The finalize action (on ALIVE->END transition)
- # formaer rtc_exiting_entry()
- #
- # @return RTC::ReturnCode_t
- #
- def onFinalize(self):
- return RTC.RTC_OK
- # ##
- # #
- # # The startup action when ExecutionContext startup
- # # former rtc_starting_entry()
- # #
- # # @param ec_id target ExecutionContext Id
- # #
- # # @return RTC::ReturnCode_t
- # #
- # #
- #def onStartup(self, ec_id):
- #
- # return RTC.RTC_OK
- # ##
- # #
- # # The shutdown action when ExecutionContext stop
- # # former rtc_stopping_entry()
- # #
- # # @param ec_id target ExecutionContext Id
- # #
- # # @return RTC::ReturnCode_t
- # #
- # #
- #def onShutdown(self, ec_id):
- #
- # return RTC.RTC_OK
- ##
- #
- # The activated action (Active state entry action)
- # former rtc_active_entry()
- #
- # @param ec_id target ExecutionContext Id
- #
- # @return RTC::ReturnCode_t
- #
- #
- #def onActivated(self, ec_id):
- # return RTC.RTC_OK
- ##
- #
- # The deactivated action (Active state exit action)
- # former rtc_active_exit()
- #
- # @param ec_id target ExecutionContext Id
- #
- # @return RTC::ReturnCode_t
- #
- #
- def onDeactivated(self, ec_id):
- return RTC.RTC_OK
- ##
- #
- # The execution action that is invoked periodically
- # former rtc_active_do()
- #
- # @param ec_id target ExecutionContext Id
- #
- # @return RTC::ReturnCode_t
- #
- #
- def onExecute(self, ec_id):
- self.x = - (self.get_adc(0) - self.x_offset_v) * self._scaling[0] / 1000.0
- self.y = (self.get_adc(1) - self.y_offset_v) * self._scaling[0] / 1000.0
- if self._print_xy[0] != "NO":
- print "(x, y) = ", self.x, self.y
- self._d_pos.data = [self.x, self.y]
- self._d_whell_vel.data = self.xy_to_wvel(self.x, self.y)
- if self._print_wvel[0] != "NO":
- print "(vl, vr) = ", self._d_whell_vel.data[0], self._d_whell_vel.data[1]
- self._d_vel.data = self.wvel_to_vel2d(self._d_whell_vel.data[0],
- self._d_whell_vel.data[1])
- if self._print_vel[0] != "NO":
- print "(vx, va) = ", self._d_vel.data.vx, self._d_vel.data.va
- self._posOut.write()
- self._velOut.write()
- self._whell_velOut.write()
- return RTC.RTC_OK
- # ##
- # #
- # # The aborting action when main logic error occurred.
- # # former rtc_aborting_entry()
- # #
- # # @param ec_id target ExecutionContext Id
- # #
- # # @return RTC::ReturnCode_t
- # #
- # #
- #def onAborting(self, ec_id):
- #
- # return RTC.RTC_OK
- # ##
- # #
- # # The error action in ERROR state
- # # former rtc_error_do()
- # #
- # # @param ec_id target ExecutionContext Id
- # #
- # # @return RTC::ReturnCode_t
- # #
- # #
- #def onError(self, ec_id):
- #
- # return RTC.RTC_OK
- # ##
- # #
- # # The reset action that is invoked resetting
- # # This is same but different the former rtc_init_entry()
- # #
- # # @param ec_id target ExecutionContext Id
- # #
- # # @return RTC::ReturnCode_t
- # #
- # #
- #def onReset(self, ec_id):
- #
- # return RTC.RTC_OK
- # ##
- # #
- # # The state update action that is invoked after onExecute() action
- # # no corresponding operation exists in OpenRTm-aist-0.2.0
- # #
- # # @param ec_id target ExecutionContext Id
- # #
- # # @return RTC::ReturnCode_t
- # #
- # #
- #def onStateUpdate(self, ec_id):
- #
- # return RTC.RTC_OK
- # ##
- # #
- # # The action that is invoked when execution context's rate is changed
- # # no corresponding operation exists in OpenRTm-aist-0.2.0
- # #
- # # @param ec_id target ExecutionContext Id
- # #
- # # @return RTC::ReturnCode_t
- # #
- # #
- #def onRateChanged(self, ec_id):
- #
- # return RTC.RTC_OK
- def MinistickInit(manager):
- profile = OpenRTM_aist.Properties(defaults_str=ministick_spec)
- manager.registerFactory(profile,
- Ministick,
- OpenRTM_aist.Delete)
- def MyModuleInit(manager):
- MinistickInit(manager)
- # Create a component
- comp = manager.createComponent("Ministick")
- def main():
- mgr = OpenRTM_aist.Manager.init(sys.argv)
- mgr.setModuleInitProc(MyModuleInit)
- mgr.activateManager()
- mgr.runManager()
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement