Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/local/bin/python
- # -*- coding: utf-8 -*-
- from asterisk import manager
- from twisted.internet import reactor
- from twisted.internet.protocol import DatagramProtocol
- import time, logging
- from threading import Thread, Lock
- logging.basicConfig()
- class TAPI(DatagramProtocol):
- def __init__( self, logfile ):
- self.commandCallbacks = {}
- self.defferedCallbacks = {}
- self.users = {}
- self.lock = Lock()
- self.logfile = logfile
- self.log('TAPI socket started.')
- def log( self, message ):
- try:
- #logger = open(self.logfile,'a')
- #logger.write('[ %s ] %s\n' % (time.strftime('%Y-%m-%d %H:%M:%S'), message))
- print message
- #logger.close()
- except Exception, error:
- print "Exception on logger caught // message %s" % str(error)
- def RegisterCallback( self, command, function ):
- if not self.commandCallbacks.has_key( command ):
- self.commandCallbacks[ command ] = function
- self.log('Registering callback for "%s".' % command )
- else:
- self.log('Callback for %s already registered.' % command )
- def datagramReceived(self, data, (host, port)):
- options = self.getData(data)
- message = {'event':options[:1][0],'host':host,'port':port,'body':options[1:]}
- self.dispatch( message )
- def dispatch(self, message ):
- # Main handler for received message
- if message.has_key('event'):
- key = message['event']
- callback = self.commandCallbacks.get( key )
- if callback:
- try:
- callback(self, message)
- except Exception, e:
- self.log('Could\'t dispatch command, data dump: %s, callback address: %s, error: %s' % (message, callback, str(e)))
- else:
- self.log('Unknown packet received. Dump data: %s' % message)
- def sendMessage(self, host, port, message):
- # Simple function for writing to socket
- self.transport.write(message,(host,port))
- def getData(self, data):
- # Parsing received message
- try:
- result = data.split('|')
- except Exception, error:
- self.log("Some error occured, while processing a message: %s" % error)
- return result
- class t_node(Thread):
- def __init__(self, node):
- Thread.__init__(self)
- self.node = node
- self.id = node['id']
- self.channels = {}
- def run( self ):
- f = manager.AMIFactory(self.node['user'],self.node['password'])
- f.onFailConnection( self.AMIConnectionFail )
- df = f.login(ip = self.node['host'], port = self.node['port'])
- df.addCallback(self.onLogin)
- def AMIConnectionFail(self, connector, reason):
- with n_mtx:
- nodes[self.node['id']]['status'] = 'offline'
- nodes[self.node['id']]['protocol'] = None
- try:
- reactor.callLater(5, self.run)
- except Exception, error:
- print error
- pass
- def onLogin( self, p ):
- self.channels = {}
- with n_mtx:
- nodes[self.node['id']]['status'] = 'online'
- nodes[self.node['id']]['protocol'] = p
- p.registerEvent('Newchannel',self.newchannel)
- p.registerEvent('Hangup',self.hangup)
- p.registerEvent('Dial',self.dial)
- p.registerEvent('Newstate',self.newstate)
- p.registerEvent('NewAccountCode',self.newacct)
- def newchannel ( self, p, m ):
- chan = m['channel']
- exten = None
- if m['exten'] and not m['exten'] == "s":
- exten = m['exten']
- cid_name = m['calleridname']
- cid_num = m['calleridnum']
- if not self.channels.has_key( chan ):
- self.channels[chan] = {'name' : chan, 'exten' : exten, 'cid_name' : cid_name, 'cid_num' : cid_num, 'direction' : None, 'state' : None }
- def dial ( self, p, m ):
- if m['subevent'] == "Begin":
- dst = m['destination']
- if self.channels.has_key(dst):
- chan = self.channels[dst]
- pa = self.channels[m['channel']]
- chan['cid_name'] = pa['cid_name']
- chan['cid_num'] = pa['cid_num']
- chan['direction'] = "INPUT"
- # Exten ? for input call we used it!
- if not chan['exten']:
- chan['exten'] = pa['exten']
- self.send_stat( p, chan )
- # Transfer
- if not chan['cid_name'] and not chan['cid_num']:
- chan['cid_num'] = m['calleridnum']
- chan['cid_name'] = m['calleridname']
- self.send_stat( p, chan )
- def hangup ( self, p, m ):
- print nodes[self.node['id']]['thread'].channels
- if m['channel'].endswith('<ZOMBIE>'):
- m['channel'] = m['channel'][:len(m['channel'])-len('<ZOMBIE>')]
- chan = m['channel']
- if self.channels.has_key(chan):
- self.channels[chan]['state'] = "Hangup"
- self.send_stat( p, self.channels[chan] )
- del self.channels[chan]
- #print "Removing channel:",chan," from store"
- def newstate ( self, p, m ):
- if self.channels.has_key( m['channel'] ):
- chan = self.channels[m['channel']]
- chan['state'] = m['channelstatedesc']
- if (not chan['exten'] or chan['exten'] == "s") and not chan['cid_num'] and m['calleridnum'] and m['calleridname'] and not chan['direction']:
- chan['cid_name'] = m['calleridname']
- chan['cid_num'] = m['calleridnum']
- chan['direction'] = "INPUT"
- c = m['channel']
- member = c[c.find('SIP/'):c.find('-')]
- if member.replace('SIP/','').isdigit():
- chan['exten'] = member.replace('SIP/','')
- if chan['direction']:
- self.send_stat( p, chan )
- def newacct ( self, p, m ):
- global directions
- chan = m['channel']
- if self.channels.has_key(chan) and directions.has_key(m['accountcode']):
- if not self.channels[chan]['direction']:
- self.channels[chan]['direction'] = directions[m['accountcode']]
- chan = self.channels[chan]
- self.send_stat( p, chan )
- def send_stat( self, p, m ):
- print m
- if m['exten'] and m['cid_num'] and m['direction'] and m['state']:
- if m['direction'] == "OUTPUT":
- sendto = m['cid_num']
- phone = m['exten']
- else:
- sendto = m['exten']
- phone = m['cid_num']
- if tapi.users.has_key(sendto):
- packet = "%s|%s|%s|%s|%s" % ( m['direction'], m['state'].upper(), m['name'], phone,m['cid_name'])
- with tapi.lock:
- host = tapi.users[sendto]['host']
- port = tapi.users[sendto]['port']
- tapi.sendMessage(host, port, packet)
- tapi.log('Presence: sendto: %s, call direction: %s, status: %s' % (tapi.users[sendto]['interface'],m['direction'],m['state'].upper()))
- tapi.log(' channel: %s, Phone: %s, callerID: %s' % (m['name'], phone, m['cid_name']))
- def tapi_reg ( self, msg ):
- options = msg['body']
- try:
- password = options[0]
- interface = options[1]
- phone = interface[interface.find('/')+1:]
- opername = options[2]
- except Exception, error:
- self.log("Unknown packet received, data dump: %s" % error)
- if password == tapi_password:
- with self.lock:
- self.users[phone] = { 'host': msg['host'], 'port' : msg['port'],'opername': opername, 'interface': interface, 'uptime': int(time.time()) }
- self.log("Received new registration from: %s with interface: %s, port: %s" % (msg['host'], interface, msg['port']))
- else:
- tapi.log('Unauthorized access detected for host: %s' % msg['host'])
- def tapi_qadd ( self, msg ):
- try:
- password = msg['body'][0]
- interface = msg['body'][1]
- queue = msg['body'][2]
- penalty = msg['body'][3]
- if password == manager_password:
- node = select_node()
- with node['lock']:
- self.log('Request for adding to queue: %s interface: %s with penalty: %s // selected node: %s' % (queue, interface, penalty, node['host']))
- node['protocol'].queueAdd(queue, interface, penalty=penalty, paused=False )
- else:
- self.log('Unauthorized access detected for host: %s' % msg['host'])
- except:
- pass
- def tapi_qpause( self, msg ):
- try:
- password = msg['body'][0]
- action = msg['body'][1]
- interface = msg['body'][2]
- queue = msg['body'][3]
- if action == '0':
- action = False
- ph = 'Unpausing'
- if action == '1':
- action = True
- ph = 'Pausing'
- if password == manager_password:
- node = select_node()
- with node['lock']:
- node['protocol'].queuePause(queue,interface, paused = action)
- self.log('%s interface %s in queue %s // selected node: %s' % (ph, interface, queue, node['host']))
- else:
- self.log('Unauthorized access detected for host: %s' % msg['host'])
- except:
- pass
- def tapi_qdel ( self, msg ):
- try:
- password = msg['body'][0]
- interface = msg['body'][1]
- queue = msg['body'][2]
- if password == manager_password:
- node = select_node()
- with node['lock']:
- node['protocol'].queueRemove(queue, interface)
- self.log('Request for deleting interface: %s from queue: %s // selected node: %s' % (interface, queue, node['host']))
- else:
- self.log('Unauthorized access detected for host: %s' % msg['host'])
- except:
- pass
- def tapi_dial( self, msg ):
- try:
- #print msg
- host = msg['host']
- port = msg['port']
- password = msg['body'][0]
- type = msg['body'][1]
- channel = msg['body'][2]
- context = msg['body'][3]
- exten = msg['body'][4]
- priority = '1'
- caller = msg['body'][5]
- if type == '0':
- pass
- if type == '1':
- channel = "%sd" % channel
- if context == '0':
- context = 'office'
- elif context == '1':
- context = 'operators'
- callerid = "%s dialer <%s>" % (caller,exten)
- variable = "SIPADDHEADER=Call-info:\;answer-after=0"
- timeout = "20000"
- async = "n"
- if password == manager_password:
- node = select_node()
- with node['lock']:
- node['protocol'].originate(channel, context=context, exten=exten, priority=priority, timeout=timeout, callerid=callerid, variable=variable, async=async)
- self.log('Origination %s <%s> context: <%s> from %s (%s:%s) // selected node: %s' % (exten, caller, context, channel, host, port, node['host']))
- else:
- self.log('Unauthorized access detected for host: %s' % msg['host'])
- except Exception, error:
- self.log('Unauthorized access to dial command or message looks like shit! %s' % error)
- def tapi_unreg( self, msg ):
- host = msg['host']
- port = msg['port']
- with self.lock:
- for user in self.users:
- try:
- if self.users[user]['host'] == host and self.users[user]['port'] == port:
- self.log("Unregistering user:%s with host: %s and port: %s" % (self.users[user]['interface'], host, self.users[user]['port']))
- del self.users[user]
- break
- except Exception, error:
- self.log(error)
- def tapi_status( self, msg ):
- self.log("Status notification received")
- try:
- channel = msg['body'][0]
- for id in nodes:
- if nodes[id]['thread'].channels.has_key(channel):
- nodes[id]['thread'].send_stat(nodes[id]['protocol'], nodes[id]['thread'].channels[channel])
- except:
- pass
- def select_node():
- node = None
- while not node:
- with rr['lock']:
- if len(rr['store']) == 0:
- rr['pool'].reverse()
- rr['store'] = rr['pool']
- rr['pool'] = []
- id = rr['store'].pop()
- rr['pool'].append(id)
- if nodes[id]['status'] == 'online' and nodes[id]['protocol']:
- node = nodes[id]
- return node
- nodes = {}
- nodes[1] = {'id' : 1, 'host' : '10.1.3.10', 'port' : 5038, 'user' : 'manager', 'password' : 'klopokakikus', 'status' : 'offline', 'protocol' : None, 'thread' : None}
- nodes[2] = {'id' : 2, 'host' : '10.1.3.11', 'port' : 5038, 'user' : 'manager', 'password' : 'klopokakikus', 'status' : 'offline', 'protocol' : None, 'thread' : None}
- n_mtx = Lock()
- rr = {'store' : nodes.keys(), 'pool' : [], 'lock' : Lock() }
- logfile='/var/log/tapi_tapi.log'
- pidfile='/var/run/tapi.pid'
- tapi_password = "12345678"
- manager_password = "12345678"
- tapi = TAPI(logfile)
- tapi.RegisterCallback('REG',tapi_reg)
- tapi.RegisterCallback('UNREG',tapi_unreg)
- tapi.RegisterCallback('QADD',tapi_qadd)
- tapi.RegisterCallback('QDEL',tapi_qdel)
- tapi.RegisterCallback('QPAUSE',tapi_qpause)
- tapi.RegisterCallback('DIAL',tapi_dial)
- tapi.RegisterCallback('STATUS',tapi_status)
- directions = {'incoming_queue' : 'INPUT', 'incoming_operators' : 'INPUT', 'local_operators' : 'OUTPUT', 'outgoing_operators' : 'OUTPUT' }
- for id in nodes:
- nodes[id]['thread'] = t_node(nodes[id])
- nodes[id]['lock'] = Lock()
- print dir(nodes[id]['thread'])
- nodes[id]['thread'].start()
- reactor.listenUDP(5041, tapi)
- reactor.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement