Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime, optparse, socket
- import sys
- import logging
- logger = logging.getLogger('CstaXmlNoSoap_SnomOne')
- logger_handler = logging.StreamHandler() # logging.FileHandler('/var/tmp/myapp.log')
- logger_formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
- logger_handler.setFormatter ( logger_formatter )
- logger.addHandler ( logger_handler )
- logger.setLevel ( logging.DEBUG ) # DEBUG / INFO / WARN / ERROR / CRITICAL
- def parse_args():
- usage = """usage: %prog hostname[:port] user[@domain] pass
- default port is 80
- default domain is the hostname
- """
- hostport = sys.argv[1]
- username = sys.argv[2]
- password = sys.argv[3]
- if not hostport or not username or not password:
- print usage
- logger.shutdown()
- sys.exit(-1)
- if ':' not in hostport:
- host = hostport
- port = '80'
- else:
- host, port = hostport.split(':', 1)
- if '@' not in username:
- domain = host
- else:
- username, domain = username.split('@', 1)
- logger.debug ( 'host=%s, port=%s, user=%s, pass=%s, domain=%s' % ( host, port, username, password, domain ) )
- if not port.isdigit():
- logger.critical ( 'Ports must be integers.' )
- sys.exit(-1)
- return host, int(port), username, password, domain
- class XmlGen:
- def __init__(me):
- me.prevTags = []
- me.inTag = False
- me.buf = ''
- def tag ( me, pTag ):
- if me.inTag:
- me.buf += '>'
- me.inTag = False
- me.prevTags.append ( pTag )
- me.buf += '<' + pTag
- me.inTag = True
- def attr ( me, pName, pValue ):
- if not me.inTag:
- raise Exception ( "Cannot create an attribute when not in a tag" )
- me.buf += ' ' + pName + '="' + pValue + '"'
- def text ( me, pText ):
- if me.inTag:
- me.buf += '>'
- me.inTag = False
- me.buf += str(pText)
- def endTag ( me, allowSelfTerminate = None ):
- if allowSelfTerminate == None:
- allowSelfTerminate = True
- if me.inTag and allowSelfTerminate:
- me.buf += '/>'
- else:
- if me.inTag:
- me.buf += '>'
- me.buf += '</' + me.prevTags[-1] + '>'
- me.inTag = False
- #print "endTag.prevTags.pre:", me.prevTags
- me.prevTags = me.prevTags[:-1]
- #print "endTag.prevTags.aft:", me.prevTags
- def tagText ( me, pTag, pText ):
- me.tag ( pTag )
- me.text ( pText )
- me.endTag()
- def toString ( me ):
- return me.buf
- # class XmlGen
- def TypeName ( x ):
- if type(x).__name__ == 'instance':
- return x.__class__.__name__
- else:
- return type(x).__name__
- class Timer:
- def __init__ ( me ):
- me.reset()
- def reset ( me ):
- import time
- me.mStart = time.time()
- def elapsed ( me ):
- import time
- return time.time() - me.mStart
- # class Timer
- def binStrN2Short ( buf ):
- return ord(buf[0])<<8 + ord(buf[1])
- def binStrH2Short ( buf ):
- return ord(buf[1])<<8 + ord(buf[0])
- def short2binStrN ( d ):
- return chr((d>>8) & 255) + chr(d & 255)
- def short2binStrH ( d ):
- return chr(d & 255) + chr((d>>8) & 255)
- def xmlDrill ( x, path ):
- import xml.dom.minidom
- ar = path.split('/')
- e = x
- for i in range(len(ar)):
- if i > 0 or ar[i] != '?xml':
- #logger.debug ( "xmlDrill looking for: %s" % ar[i] )
- if i == len(ar)-1 and ar[i] == "*":
- e = e.childNodes # TODO FIXME - process/return arrays
- else:
- e = e.getElementsByTagName(ar[i]) # TODO FIXME - process/return arrays
- #logger.debug ( "found type: %s" % TypeName(e) )
- if TypeName(e) == "NodeList":
- e = e[0]
- #logger.debug ( "NodeList -> %s" % TypeName(e) )
- if e == None: return e
- return e
- def xmlDrillValue ( x, path ):
- ar = []
- e = xmlDrill ( x, path )
- e.nodeValue = xmlGetText ( e )
- #logger.debug ( "xmlDrillValue name=%s value=%s" % ( e.nodeName, e.nodeValue ) )
- return e.nodeValue
- def xmlGetText(x):
- from xml.dom import Node
- import xml.dom.minidom
- rc = []
- if x.nodeType == Node.ELEMENT_NODE:
- x = x.childNodes
- for e in x:
- if e.nodeType == Node.TEXT_NODE:
- rc.append(e.data)
- return ''.join(rc)
- class CstaXmlNoSoap_SnomOne:
- #' this class implements the SnomONE variant of CSTA3/XML Annex J2
- #' see Ecma-323, ECMA-354 and specifically TR-087
- #' The final word on the CTI api's is the source code, but hopefully I'll keep this up to date:
- #'
- #' CTI INTERFACE API...
- #' Function Connect(ByVal pHost$, ByVal pPort%, ByVal pUser%, ByVal pPass$, ByVal pDomain$, ByRef pCallBack) As Boolean
- #' Function MakeCall(...)
- #' Sub KeepAlive() ' reminds cti to keep session going, if necessary
- #' Function Answer() As Boolean ' answer the ringing call
- #'
- #' CTI PROPERTIES
- #' Property User As Short
- #'
- #' CTI CALLBACK API
- #' Sub ctiError(ByVal info$) ' for reporting errors
- #' Sub ctiCallEstablished(ByVal ani$, ByVal dnis$, ByVal cid$) ' ani = caller-id, dnis = dialed-number, cid = unique call id
- #' Sub ctiCallEnded(ByVal byWhom$, ByVal cid$)
- # info from user...
- #Private mHost$, mPort%, mUser%, mDomain$
- # info from server...
- #Private mProtocolVersion$, mSessionId$, mSessionDuration&, mMonitorCrossRefId$, mAni$, mDnis$, mCid$
- #' internal objects...
- #Private WithEvents mTcp As dhRichClient3.cTCPClient
- #Private mSocket&, mCallBack, mBuf$, mUseShortTags As Boolean
- def __init__ ( me ):
- me.mProtocolVersion = "http://www.ecma-international.org/standards/ecma-323/csta/ed3"
- me.mUseShortTags = False
- me.mAllowTrace = True
- me.mBuf = ''
- def connect ( me, pHost, pPort, pUser, pPass, pDomain, pCallBack ):
- logger.debug ( TypeName(me) + ".Connect called for " + pUser + "@" + pDomain )
- me.mTcp = socket.socket ( socket.AF_INET, socket.SOCK_STREAM )
- me.mHost = pHost
- me.mPort = pPort
- me.mUser = pUser
- me.mPass = pPass
- me.mDomain = pDomain
- me.mCallBack = pCallBack
- logger.info ( 'connecting to %s:%d' % ( me.mHost, me.mPort ) )
- try:
- me.mTcp.connect ( ( me.mHost, me.mPort ) )
- except:
- print 'Error connecting to %s:%d' % ( me.mHost, me.mPort )
- raise
- me.mSessionId = ""
- # now send StartApplicationSession...
- me._cstaStartApplicationSession ( TypeName(me), me.mUser, me.mPass, me.mDomain )
- # wait for reply...
- t = Timer()
- while t.elapsed() < 0.5 and 0 == len(me.mSessionId):
- me.doRecv()
- if me.mSessionId == "":
- # TODO FIXME: detect error response specifically to this request...
- me.mCallBack.ctiError ( "Timeout waiting for Application Session initialization" )
- return False
- logger.info ( "got StartApplicationSession.sessionId: " + me.mSessionId )
- me.mMonitorCrossRefId = ""
- # now start monitor...
- me._cstaMonitorStart ( pUser, True, True )
- # wait for reply
- t.reset()
- while t.elapsed() < 0.5 and 0 == len(me.mMonitorCrossRefId):
- me.doRecv()
- if 0 == len(me.mMonitorCrossRefId):
- # TODO FIXME: detect error response specifically to this request...
- me.mCallBack.ctiError ( "Timeout waiting for MonitorStart confirmation" )
- return False
- logger.info ( "got MonitorStart.MonitorCrossRefId: " + me.mMonitorCrossRefId )
- # all ready!
- return True
- def KeepAlive ( me ):
- me._cstaResetApplicationSessionTimer()
- def Answer ( me ):
- cstaAnswerCall ( me.mCid )
- def User ( me ):
- return me.mUser
- def mTopTag ( me, xgen, pTag ):
- xgen.tag ( "?xml" )
- xgen. attr ( "version", "1.0" )
- xgen. attr ( "encoding", "UTF-8" )
- xgen.tag ( pTag )
- xgen. attr ( "xmlns", me.mProtocolVersion )
- xgen. attr ( "xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance" )
- def _mSend ( me, pnInvokeId, pData ):
- if me.mAllowTrace:
- logger.debug ( TypeName(me) + " Raw Send: " + str(pData) )
- if False: # prepend a Csta/XML No-Soap header?
- # SnomONE just won't take these packets!!!!
- data = Short2BinStrN(0) + Short2BinStrN(Len(pData) + 8) + ("000" + nInvokeId)[-4:] + pData
- if True:
- for i in range(8):
- logger.debug ( "mSend[%d]=0x%x '%s'" % ( i, ord(pData[i]), pData[i] ) )
- me.mTcp.send ( pData )
- def _cstaStartApplicationSession ( me, pApplicationId, pUserId, pPass, pDomain ):
- if 0 == len(pApplicationId):
- pApplicationId = TypeName(me)
- me.mDomain = pDomain
- x = XmlGen()
- me.mTopTag ( x, "StartApplicationSession" )
- x. tag ( "applicationInfo" )
- x. tagText ( "applicationID", pApplicationId )
- x. tag ( "applicationSpecificInfo" )
- x. tag ( "tns:SessionLoginInfo" )
- x. attr ( "xmlns:tns", "http://www.pbxnsip.com/schemas/csta/login" )
- x. tagText ( "tns:userName", pUserId )
- x. tagText ( "tns:password", pPass )
- x. tagText ( "tns:domain", pDomain )
- x. tagText ( "tns:sessionCleanupDelay", 60 )
- x. endTag() # tns:SessionLoginInfo
- x. endTag() # applicationSpecificInfo
- x. endTag() # applicationInfo
- x. tag ( "requestedProtocolVersions" )
- x. tagText ( "protocolVersion", "http://www.ecma-international.org/standards/ecma-323/csta/ed3" )
- x. tagText ( "protocolVersion", "http://www.ecma-international.org/standards/ecma-323/csta/ed4" )
- x. endTag() # requestedProtocolVersions
- x. tagText ( "requestedSessionDuration", 180 )
- x.endTag() # StartApplicationSession
- me._mSend ( 1, x.toString() )
- def _cstaResetApplicationSessionTimer ( me ):
- #<?xml version="1.0" encoding="UTF-8"?>
- #<ResetApplicationSessionTimer
- #xmlns = "http://www.ecma-international.org/standards/ecma-354/appl_session"
- #xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- # <sessionID>AEF12111212</sessionID>
- # <requestedSessionDuration>500</requestedSessionDuration>
- #</ResetApplicationSessionTimer>
- logger.debug ( "sending keep-alive" )
- x = XmlGen()
- me.mTopTag ( x, "ResetApplicationSessionTimer" )
- x. tagText ( "sessionID", me.mSessionId )
- x. tagText ( "requestedSessionDuration", 180 )
- x.endTag() # ResetApplicationSessionTimer
- me.mAllowTrace = False
- me._mSend ( 0, x.toString() )
- def _cstaMonitorStart ( me, pnDevice, pbVoice, pbIm ):
- x = XmlGen()
- me.mTopTag ( x, "MonitorStart" )
- x. tag ( "monitorObject")
- # formatting of deviceObject is correct for SnomONE, but seems to conflict w/ spec (which wants sip:)
- x. tagText ( "deviceObject", pnDevice + "@" + me.mDomain ) # or mHost?
- x. endTag() # monitorObject
- x. tagText ( "monitorType", "device" ) # call | device
- x. tag ( "requestedMonitorMediaClass" )
- x. tagText ( "voice", "true" if pbVoice else "false" )
- x. tagText ( "im", "true" if pbIm else "false" )
- x. endTag() # requestedMonitorMediaClass
- x.endTag() # MonitorStart
- me._mSend ( pnDevice, x.toString() )
- def _cstaMonitorStop ( me, pnCrossRefId ):
- #<?xml version="1.0" encoding="UTF-8"?>
- #<MonitorStop xmlns="http://www.ecma-international.org/standards/ecma-323/csta/ed3"
- #xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- # <monitorCrossRefID>5665621</monitorCrossRefID>
- #</MonitorStop>
- x = XmlGen()
- me.mTopTag ( x, "MonitorStop" )
- x. tagText ( "monitorCrossRefID", pnCrossRefId )
- x.endTag() # MonitorStop
- me._mSend ( pnCrossRefId, x.ToString() )
- #AlternateCall
- def _cstaAnswerCall ( me, pCid ):
- #<?xml version="1.0" encoding="UTF-8"?>
- #<AnswerCall xmlns="http://www.ecma-international.org/standards/ecma-323/csta/ed3">
- # <callToBeAnswered>
- # <callID>call-id:123456789, remote-tag=1222, local-tag=7777</callID>
- # <deviceID>sip:tom@domain.com</deviceID>
- # </callToBeAnswered>
- #</AnswerCall>
- x = XmlGen()
- me.mTopTag ( x, "AnswerCall" )
- x. tag ( "callToBeAnswered" )
- x. tagText ( "callID", "call-id:" + pCid )
- #x. TagText ( "deviceId", mDnis )
- x. endTag() # callToBeAnswered
- x.endTag() # AnswerCall
- me._mSend ( 0, x.ToString() )
- #ClearConnection
- #ConsultationCall
- #DeflectCall
- #GenerateDigits
- #HoldCall
- def _cstaMakeCall ( me, pnFrom, psTo ):
- #<?xml version="1.0" encoding="UTF-8"?>
- #<MakeCall xmlns="http://www.ecma-international.org/standards/ecma-323/csta/ed3">
- # <callingDevice>sip:ua1@123.123.123.123</callingDevice>
- # <calledDirectoryNumber>sip:alice@domain.com</calledDirectoryNumber>
- #</MakeCall>
- x = XmlGen()
- me.mTopTag ( x, "MakeCall" )
- x. tagText ( "callingDevice", nFrom + "@" + mHost )
- x. tagText ( "calledDirectoryNumber", sTo )
- x.endTag # MakeCall
- me._mSend ( 0, x.ToString() )
- #ReconnectCall
- #RetrieveCall
- #SingleStepTransferCall
- #TransferCall
- #DirectedPickupCall
- def doRecv ( me ):
- me.mTcp.settimeout ( 0.05 )
- try:
- me.mBuf += me.mTcp.recv(1024)
- except socket.timeout:
- #logger.debug ( 'len(me.mBuf) = %d' % len(me.mBuf) )
- pass
- while len(me.mBuf) > 8:
- if False:
- for i in range(8):
- logger.debug ( "mRecv[%d]=0x%x '%s'" % ( i, ord(me.mBuf[i]), me.mBuf[i] ) )
- hdr = binStrN2Short ( me.mBuf[0:2] )
- nPkt = binStrN2Short ( me.mBuf[2:4] )
- nInvokeId = 0
- if hdr == 0: # CSTA/XML over TCP w/o SOAP
- data = me.mBuf[8:nPkt]
- #nInvokeId = binStrN2Long(me.mBuf[5:9])
- #Debug.Print "got invokeId " & nInvokeId + "(" + me.mBuf[5:8] + ")"
- elif hdr == 1: # CSTA/XML over TCP w/ SOAP
- # TODO FIXME - unimplemented
- data = ""
- else:
- me.mCallBack.ctiError ( "bad header, flushing buffer" )
- data = ""
- nPkt = len(me.mBuf) # flush entire contents of buffer I guess...
- me.mBuf = me.mBuf[nPkt:]
- if data <> "":
- from xml.dom.minidom import parseString
- me.mAllowTrace = True
- try:
- x = parseString ( data )
- except:
- me.mCallBack.ctiError ( "xml parsing error: " + data )
- else:
- # get first ( i.e. top-level ) element
- e = xmlDrill(x,"?xml/*")
- f = getattr(me, "handle_%s" % e.nodeName, None)
- if f is None:
- me.mCallBack.ctiError ( "CSTA Unrecognized response object: " + e.nodeName )
- else:
- f ( e )
- x.unlink() # encourage memory cleanup
- if me.mAllowTrace:
- logger.debug ( TypeName(me) + " Raw Recv: " + data )
- def handle_CSTAErrorCode ( me, e ):
- e = drillXml(e,"operation")
- if e is None:
- me.mCallBack.ctiError ( "CSTAErrorCode should have <operation>: " + data )
- else:
- me.mCallBack.ctiError ( "CSTA Error Code: " + e.Value )
- def handle_StartApplicationSessionPosResponse ( me, e ):
- #mProtocolVersion = e.DrillValue("actualProtocolVersion")
- me.mSessionId = xmlDrillValue(e,"sessionID") # FYI not necessarily numeric
- me.mSessionDuration = long(xmlDrillValue(e,"actualSessionDuration"))
- logger.debug ( "StartApplicationSessionPosResponse(%s,%s)" % ( me.mSessionId, me.mSessionDuration ) )
- def handle_ResetApplicationSessionTimerPosResponse ( me, e ):
- me.mAllowTrace = False
- logger.warn ( "ignoring ResetApplicationSessionTimerPosResponse" )
- def handle_MonitorStartResponse ( me, e ):
- me.mMonitorCrossRefId = xmlDrillValue ( e, "monitorCrossRefID" )
- if 0 == len(me.mMonitorCrossRefId):
- me.mCallBack.ctiError ( "<MonitorStartResponse> should have a <monitorCrossRefID>: " + data )
- def handle_MakeCallResponse ( me, e ):
- e = xmlDrill(e,"callingDevice")
- me.mCallBack.ctiMakingCall ( xmlDrillValue(e,"callID"), xmlDrillValue(e,"deviceID") )
- def handle_ServiceInitiatedEvent ( me, e ):
- # a call has been requested to be established... just ignore for now...
- logger.warn ( "ignoring ServiceInitiatedEvent" )
- def handle_OriginatedEvent ( me, e ):
- # caller has gone off-hook to make the call they requested... just ignore for now
- logger.warn ( "ignoring OriginatedEvent" )
- def handle_DeliveredEvent ( me, e ):
- # call is ringing
- me.mAni = me.ParseUser(x.DrillValue("callingDevice/deviceIdentifier"))
- me.mDnis = me.ParseUser(x.DrillValue("calledDevice/deviceIdentifier"))
- me.mCid = x.DrillValue("connection/callID")
- me.mCallBack.ctiCallRinging ( me.mAni, me.mDnis, me.mCid )
- def handle_EstablishedEvent ( me, e ):
- me.mAni = me.ParseUser(x.DrillValue("callingDevice/deviceIdentifier"))
- me.mDnis = me.ParseUser(x.DrillValue("calledDevice/deviceIdentifier"))
- me.mCid = x.DrillValue("establishedConnection/callID")
- me.mCallBack.ctiCallEstablished ( me.mAni, me.mDnis, me.mCid )
- def handle_ConnectionClearedEvent ( me, e ):
- by = me.ParseUser(xmlDrillValue(e,"releasingDevice/deviceIdentifier"))
- me.mCid = xmlDrillValue(e,"droppedConnection/callID")
- me.mCallBack.ctiCallEnded ( by, me.mCid )
- me.mAni = ""
- me.mDnis = ""
- me.mCid = ""
- def handle_BackInServiceEvent ( me, e ):
- logger.warn ( "ignoring BackInServiceEvent (all is good)" )
- def mTcp_SockError ( me, hSocket, ErrString ):
- me.mCallBack.ctiError ( "Socket Error: " + ErrString )
- def mTcp_TCPDisConnect ( me, hSocket ):
- me.mCallBack.ctiError ( "Socked Closed" )
- def ParseUser ( me, userHost ):
- if userHost.find("@"):
- return userHost.split("@",1)[0]
- return userHost
- # class CstaXmlNoSoap_SnomOne
- def format_address(address):
- host, port = address
- return '%s:%s' % (host or '127.0.0.1', port)
- class CtiCallBack:
- def ctiError ( me, s ):
- logger.error ( s )
- def ctiCallRinging ( me, ani, dnis, cid ):
- logger.info ( "call ringing: ani=%s, dnis=%s, cid=%s" % ( ani, dnis, cid ) )
- def ctiCallEstablished ( me, ani, dnis, cid ):
- logger.info ( "call ans'd: ani=%s, dnis=%s, cid=%s" % ( ani, dnis, cid ) )
- def ctiCallEnded ( me, byWhom, cid ):
- logger.info ( "call ended: by=%s, cid=%s" % ( byWhom, cid ) )
- def main():
- host, port, username, password, domain = parse_args()
- cb = CtiCallBack()
- cti = CstaXmlNoSoap_SnomOne()
- if not cti.connect ( host, port, username, password, domain, cb ):
- logger.critical ( "Connect failed, quitting" )
- sys.exit(-1)
- t = Timer()
- while True:
- while t.elapsed() < 60:
- cti.doRecv()
- cti.KeepAlive()
- t.reset()
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement