Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from dcps import *
- from comedi import *
- import logging
- import urllib
- import socket
- import os
- import time
- import platform
- """rwi (real-world interface) device wrapper. For this to work, comedi and the python comedi bindings must be installed"""
- class device( bt.device ):
- SWITCHES=["Hall", "Study", "Porch", "Lounge", "Terrace", "Trap", "NA", "NA", "NA", "Bathroom", "Kitchen", "Laundry", "Shower", "NA", "Showerroom", "Apex"]
- """The rwi device wrapper itself - provides various commands for switches on and off manipulations."""
- def __init__( self, universe, ip, boards ):
- """Initialise the rwi device wrapper"""
- self.ip = ip
- self.boards = boards
- self.switches_index = list()
- for switch in self.SWITCHES:
- self.switches_index.append(switch.upper())
- bt.device.__init__( self, universe, ip )
- def id( self ):
- """Return an id"""
- return ( 'rwi', 'rwi at %s' % self.ip )
- def keygen( self, key ):
- """Generate a unique key for this instance of the rwi device"""
- return 'rwi_%s_%s' % ( self.ip, key )
- def get_info( self, client, boardnum ):
- """Get some information about the given board"""
- result=self.validate_arg(boardnum)
- if result=="OK":
- client.write(self.boards[int(boardnum)].get_info())
- return bt.SUCCESS
- else:
- client.write(result)
- return bt.FAILURE
- def get_board_name( self, client, boardnum ):
- """Get the name of the given board"""
- result=self.validate_arg(boardnum)
- if result=="OK":
- client.write(self.boards[int(boardnum)].get_board_name())
- return bt.SUCCESS
- else:
- client.write(result)
- return bt.FAILURE
- def get_driver_name( self, client, boardnum ):
- """Get the name of the driver for the given board"""
- result=self.validate_arg(boardnum)
- if result=="OK":
- client.write(self.boards[int(boardnum)].get_driver_name())
- return bt.SUCCESS
- else:
- client.write(result)
- return bt.FAILURE
- def channel (self, key ):
- """Tries to make sense of a channel param, first looks to see if it's a number, then looks up in the list of switches"""
- try:
- # channel number entered?
- key_ind=int(key)
- except:
- # not a number, so look up channel number in the lights array
- key_ind=self.switches_index.index(key.upper())
- return key_ind
- def off( self, client, arg ):
- """Turn off the given switch(es)"""
- args = arg.rsplit(",")
- for arg in args:
- arg = self.channel(arg)
- if arg < 0:
- return bt.FAILURE
- board, channel = self.get_board_channel(arg)
- self.boards[board].off(channel)
- return bt.SUCCESS
- def on( self, client, arg ):
- """Turn on the given switch(es)"""
- args = arg.rsplit(",")
- for arg in args:
- arg = self.channel(arg)
- if arg < 0:
- return bt.FAILURE
- board, channel = self.get_board_channel(arg)
- self.boards[board].on(channel)
- return bt.SUCCESS
- def get_board_channel( self, switch ):
- """Given an absolute switch number, works out which board the switch is on and which switch it is on that board
- If we have a switch number of 14, with 8 switches on the first board and 8 on the second
- we'll return 1 for the board (it's on the 2nd board) and 6 for the switch on that board"""
- if switch < 0:
- return -1, -1
- # go through the boards
- totchans = 0
- for idx in range(len(self.boards)):
- dout = self.boards[idx].find_dout_dev()
- numchans = self.boards[idx].get_num_chans(dout)
- if switch < totchans + numchans:
- return idx, switch - totchans
- totchans = totchans + numchans
- return -1, -1
- def get_switches( self, client ):
- """Return a list of all the defined switches"""
- for idx in range(len(self.SWITCHES)):
- # for switch in self.SWITCHES:
- #client.write(switch)
- client.write('%d, %s' % (idx, self.SWITCHES[idx]))
- return bt.SUCCESS
- def find_din_device( self, client, boardnum ):
- """Find the DIN subdevice on the given board"""
- result=self.validate_arg(boardnum)
- if result=="OK":
- client.write(str(self.boards[int(boardnum)].find_din_dev()))
- return bt.SUCCESS
- else:
- client.write(result)
- return bt.FAILURE
- def find_dout_device( self, client, boardnum ):
- """Find the DOUT subdevice on the given board"""
- result=self.validate_arg(boardnum)
- if result=="OK":
- client.write(str(self.boards[int(boardnum)].find_dout_dev()))
- return bt.SUCCESS
- else:
- client.write(result)
- return bt.FAILURE
- def get_num_subdevices( self, client, boardnum ):
- """Get the number of subdevices on the given board"""
- result=self.validate_arg(boardnum)
- if result=="OK":
- client.write(str(self.boards[int(boardnum)].get_num_subdevices()))
- return bt.SUCCESS
- else:
- client.write(result)
- return bt.FAILURE
- def get_num_channels( self, client, boardnum, subdevice ):
- """Get the number of channels on the given subdevice of the given board"""
- result=self.validate_arg(boardnum)
- if result=="OK":
- client.write(str(self.boards[int(boardnum)].get_num_chans(subdevice)))
- return bt.SUCCESS
- else:
- client.write(result)
- return bt.FAILURE
- def get_dout_state( self, client ):
- """Get the binary state of the DOUT device on the given board(s) as a String"""
- all_states=""
- for idx in range (len(self.boards)):
- dout = self.boards[idx].find_dout_dev()
- all_states = all_states + str(self.boards[idx].get_state(dout))
- client.write(all_states)
- return bt.SUCCESS
- def get_din_state( self, client ):
- """Get the binary state of the DIN device on the given board(s) as a String"""
- all_states=""
- for idx in range (len(self.boards)):
- din = self.boards[idx].find_din_dev()
- all_states = all_states + str(self.boards[idx].get_state(din))
- client.write(all_states)
- return bt.SUCCESS
- def get_location( self, client, boardnum ):
- """Get the location of the given board (/dev/comedi%)"""
- if self.validate_arg(boardnum)=="OK":
- client.write(self.boards[int(boardnum)].get_location())
- return bt.SUCCESS
- def toggle( self, client, boardnum, arg ):
- """On the given board, toggle the given light(s)"""
- result=self.validate_arg(boardnum)
- if result=="OK":
- args = arg.rsplit(",")
- for arg in args:
- self.boards[int(boardnum)].flip(arg)
- return bt.SUCCESS
- else:
- client.write(result)
- return bt.FAILURE
- def validate_arg( self, arg ):
- """Try to ensure that the given boardnum corresponds to an actual board"""
- try:
- idx=int(arg)
- except:
- return "Invalid argument, must be number"
- if idx < 0 or idx + 1 > len(self.boards):
- return "Invalid argument, have %d boards" % (len(self.boards))
- return "OK"
- def map( self, client ):
- """Device ui map"""
- client.write( 'type, switchmap' )
- client.write( 'item_count, %d' % len(self.SWITCHES) )
- self.get_switches(client)
- return bt.SUCCESS
- class board():
- DOUT=4
- DIN=3
- def __init__( self, dev, location ):
- """Initialise the board wrapper"""
- logging.debug("Into __init__() on the board class")
- self.dev = dev
- self.subdevices = {}
- self.location = location
- def get_info( self ):
- """Get some info"""
- info_txt = "Board: %s\n" % (self.get_board_name())
- info_txt = info_txt + "Driver: %s\n" % (self.get_driver_name())
- info_txt = info_txt + "Located at: %s\n" % (self.get_location())
- numsubdev=self.get_num_subdevices()
- info_txt = info_txt + "Number of sub-devices: %s\n" % (numsubdev)
- for subdevind in range(0, numsubdev):
- subdevtyp=comedi_get_subdevice_type(self.dev, subdevind)
- numchan=comedi_get_n_channels(self.dev, subdevind)
- state=""
- if (subdevtyp==3 or subdevtyp==4):
- for chan in range(0, numchan):
- res, value = comedi_dio_read(self.dev, subdevind, chan)
- if (res==1):
- state=state + str(value)
- else:
- state=state + "*"
- state="and has state: " + state
- info_txt = info_txt + "Sub device %d has sub device type %d and %d channels %s\n" % (subdevind, subdevtyp, numchan, state)
- return info_txt
- def get_board_name( self ):
- return comedi_get_board_name(self.dev)
- def get_driver_name( self ):
- return comedi_get_driver_name(self.dev)
- def off( self, channel):
- return comedi_dio_write( self.dev, 3, channel, 0)
- def on( self, channel):
- return comedi_dio_write( self.dev, 3, channel, 1)
- def find_din_dev( self ):
- """(internal) Find the DIN subdevice"""
- return comedi_find_subdevice_by_type(self.dev, self.DIN, 0)
- def find_dout_dev( self ):
- """(internal) Find the DOUT subdevice"""
- return comedi_find_subdevice_by_type(self.dev, self.DOUT, 0)
- def get_num_subdevices( self ):
- """(internal) Get the number of subdevices"""
- return comedi_get_n_subdevices(self.dev)
- def get_num_chans( self, arg ):
- """(internal) Get the number of channels on the given subdevice"""
- return comedi_get_n_channels(self.dev, int(arg))
- def get_state( self, arg ):
- """(internal) Get the binary state of the given subdevice as a String"""
- numchan=self.get_num_chans(arg)
- logging.debug("numchan: %d" % (numchan))
- state = ""
- for chan in range(0, numchan):
- res, value = comedi_dio_read(self.dev, int(arg), chan)
- logging.debug("res: %d and value: %d" % (res, value))
- if (res==1):
- state=state + str(value)
- else:
- state=state + "*"
- return state
- def get_location(self):
- return self.location
- def flip(self, subdev, chan):
- """(internal) Flips the given channel on the given subdevice"""
- res, value = comedi_dio_read(self.dev, subdev, chan)
- if (res==1):
- if (value==1):
- res = comedi_dio_write(self.dev, subdev, chan, 0)
- else:
- res = comedi_dio_write(self.dev, subdev, chan, 1)
- def instantiate( universe, ip ):
- """Attempt to instantiate a device wrapper for the specified ip addreess."""
- logging.debug( "rwi instantiate: started" )
- try:
- if ip == '127.0.0.1':
- logging.debug( "rwi instantiate: is localhost" )
- if platform.system( ) != 'Windows':
- logging.debug( "rwi instantiate: is NOT Windoze!" )
- # Sanity check to see if comedi is installed
- check_comedi = os.system( 'which comedi_test >/dev/null 2>&1' )
- logging.debug( "rwi instantiate: check_comedi is: %s" % (check_comedi) )
- if check_comedi == 0:
- try:
- logging.debug( "rwi instantiate: check_comedi is 0 so continuing..." )
- boards = find_boards()
- logging.debug( "rwi instantiate: find_boards done" )
- logging.debug( "rwi instantiate: find_boards done, found %d" % (len(boards)) )
- #logging.debug( "rwi instantiate: number subdevices: %d" % (get_num_subdevices(dev)) )
- device( universe, ip, boards )
- logging.debug( "rwi instantiate: device done" )
- logging.debug( "rwi instantiate returning TRUE!!!" )
- return True
- except Exception, e:
- logging.error( "Got exception in instantiate of rwi device: " + e )
- else:
- logging.debug( "No comedi on Windoze yet..." )
- except Exception, e:
- logging.error( 'rwi failure on %s: %s' % ( ip, e ) )
- return False
- def find_boards():
- logging.debug("Into find_boards")
- boards = list()
- for dev_idx in range(16):
- location = "/dev/comedi" + str(dev_idx)
- dev=comedi_open(location)
- if comedi_get_n_subdevices(dev) != -1:
- logging.debug("Creating board from device found at %s" % (location))
- boards.append(board(dev, location))
- return boards
Advertisement
Add Comment
Please, Sign In to add comment