Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from flask import Flask, render_template, jsonify, session, redirect, url_for
- import os, sys
- import ConfigParser
- import json
- import model
- from distutils.util import strtobool
- from werkzeug.serving import run_simple
- from werkzeug.wsgi import DispatcherMiddleware
- from datetime import date
- # get current names for directory and file
- dirname, filename = os.path.split(os.path.abspath(__file__))
- # py-asterisk
- sys.path.append(os.path.join(dirname, 'libs','py-asterisk'))
- from Asterisk.Manager import *
- # sqlalchemy
- sys.path.append(os.path.join(dirname, 'libs','sqlalchemy', 'lib'))
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy.orm.exc import NoResultFound
- # session db mysql
- Session_MySQL = sessionmaker(bind=model.engine_mysql)
- session_mysql = Session_MySQL()
- # session db freepbx
- Session_FreePBX = sessionmaker(bind=model.engine_freepbx)
- session_freepbx = Session_FreePBX()
- # config file
- cfg_file = 'config.ini'
- cfg = ConfigParser.ConfigParser()
- try:
- with open(os.path.join(dirname, cfg_file)) as f:
- cfg.readfp(f)
- except IOError:
- print 'Error open file config. Check if config.ini exists'
- sys.exit()
- def init_day(d = date.today()):
- return datetime.datetime(d.year, d.month, d.day, 0, 0, 0)
- def __connect_manager():
- host = cfg.get('manager', 'host')
- port = int(cfg.get('manager', 'port'))
- user = cfg.get('manager', 'user')
- password = cfg.get('manager', 'password')
- try:
- manager = Manager((host, port), user, password)
- return manager
- except:
- app.logger.info('Error to connect to Asterisk Manager. Check config.ini and manager.conf of asterisk')
- def is_debug():
- try:
- var = cfg.get('general', 'debug')
- v = True if strtobool(var) == 1 else False
- except:
- return False
- return v
- def port_bind():
- return int(__get_entry_ini_default('general', 'port', 5000))
- def host_bind():
- return __get_entry_ini_default('general', 'host', '0.0.0.0')
- def get_hide_config():
- tmp = __get_entry_ini_default('general', 'hide', '')
- tmp = tmp.replace('\'', '')
- return tmp.split(',')
- def __get_entry_ini_default(section, var, default):
- try:
- var = cfg.get(section, var)
- v = var
- except:
- return default
- return v
- def __get_data_queues_manager():
- manager = __connect_manager()
- try:
- data = manager.QueueStatus()
- except:
- app.logger.info('Error to connect to Asterisk Manager. Check config.ini and manager.conf of asterisk')
- data = []
- return data
- def get_data_queues(queue = None):
- data = parser_data_queue(__get_data_queues_manager())
- if queue is not None:
- data = data[queue]
- if is_debug():
- app.logger.debug(data)
- return data
- def hide_queue(data):
- tmp_data = {}
- hide = get_hide_config()
- for q in data:
- if q not in hide:
- tmp_data[q] = data[q]
- return tmp_data
- def rename_queue(data):
- tmp_data = {}
- for q in data:
- rename = __get_entry_ini_default('rename', q, None)
- if rename is not None:
- tmp_data[rename] = data[q]
- else:
- tmp_data[q] = data[q]
- return tmp_data
- def set_global_queues_freepbx():
- queues = {}
- try:
- data = session_freepbx.query(model.QueuesConfig).all()
- except NoResultFound, e:
- data = None
- for q in data:
- queues[q.extension] = q.descr
- session['queues'] = queues
- def if_need_set_queue_globals():
- try:
- if len(session['queues']) == 0:
- set_global_queues_freepbx()
- except:
- set_global_queues_freepbx()
- def parser_data_queue(data):
- data = hide_queue(data)
- data = rename_queue(data)
- # convert references manager to string
- for q in data:
- for e in data[q]['entries']:
- tmp = data[q]['entries'].pop(e)
- data[q]['entries'][str(e)] = tmp
- tmp = data[q]['entries'][str(e)]['Channel']
- data[q]['entries'][str(e)]['Channel'] = str(tmp)
- for m in data[q]['members']:
- #Asterisk 1.8 dont have StateInterface
- if 'StateInterface' not in data[q]['members'][m]:
- data[q]['members'][m]['StateInterface'] = m
- try:
- if q in session['queues']:
- data[q]['long_name'] = session['queues'][q]
- else:
- data[q]['long_name'] = None
- except:
- data[q]['long_name'] = None
- return data
- def first_queue():
- data = get_data_queues()
- if data:
- return data.keys()[0]
- else:
- return ''
- def events_queuelog_data_type(from_date, to_date, events=None, agent=None, queue=None):
- try:
- q = session_mysql.query(model.QueueLog)
- if from_date:
- q = q.filter(model.QueueLog.created >= from_date)
- if to_date:
- q = q.filter(model.QueueLog.created <= to_date)
- if events:
- q = q.filter(model.QueueLog.event.in_(events))
- if agent:
- q = q.filter(model.QueueLog.agent.in_(agent))
- if queue:
- q = q.filter(model.QueueLog.queuename == queue)
- return q.all()
- return q.order_by(model.QueueLog.id.asc()).all()
- except NoResultFound, e:
- return None
- def count_answered(from_date, to_date, agent=None, queue=None):
- # events = ['COMPLETECALLER', 'COMPLETEAGENT']
- events = ['CONNECT']
- data = events_queuelog_data_type(from_date, to_date, events, agent, queue)
- return len(data)
- def count_inbound(from_date, to_date, agent=None, queue=None):
- events = ['ENTERQUEUE']
- calls = []
- data = events_queuelog_data_type(from_date, to_date, events, agent, queue)
- for call in data:
- if call.callid not in calls:
- calls.append(call.callid)
- return len(calls)
- def count_voicemail(from_date, to_date, agent=None, queue=None):
- events = ['VOICEMAIL']
- data = events_queuelog_data_type(from_date, to_date, events, agent, queue)
- return len(data)
- def count_abandon(from_date, to_date, agent=None, queue=None):
- events = ['ABANDON']
- data = events_queuelog_data_type(from_date, to_date, events, agent, queue)
- return len(data)
- def seconds_wait_abandon(from_date, to_date, agent=None, queue=None):
- events = ['ABANDON']
- seconds = 0
- data = events_queuelog_data_type(from_date, to_date, events, agent, queue)
- for call in data:
- seconds = seconds + int(call.data3)
- return seconds
- def seconds_wait(from_date, to_date, agent=None, queue=None):
- events = ['CONNECT']
- seconds = 0
- data = events_queuelog_data_type(from_date, to_date, events, agent, queue)
- for call in data:
- seconds = seconds + int(call.data1)
- return seconds
- def seconds_talking(from_date, to_date, agent=None, queue=None):
- events = ['COMPLETECALLER', 'COMPLETEAGENT']
- seconds = 0
- data = events_queuelog_data_type(from_date, to_date, events, agent, queue)
- for call in data:
- seconds = seconds + int(call.data2)
- return seconds
- def data_queue(from_date, to_date, agent=None, queue=None):
- data = {}
- data['answered'] = count_answered(from_date, to_date, agent, queue)
- data['inbound'] = count_inbound(from_date, to_date, agent, queue)
- data['count_abandon'] = count_abandon(from_date, to_date, agent, queue)
- data['voicemail'] = count_voicemail(from_date, to_date, agent, queue)
- data['seconds_wait'] = seconds_wait(from_date, to_date, agent, queue)
- data['seconds_talking'] = seconds_talking(from_date, to_date, agent, queue)
- data['seconds_wait_abandon'] = seconds_wait_abandon(from_date, to_date, agent, queue)
- return data
- # Flask env
- APPLICATION_ROOT = __get_entry_ini_default('general', 'base_url', '/')
- app = Flask(__name__)
- app.secret_key = 'CHANGEME'
- app.config.from_object(__name__)
- @app.before_first_request
- def setup_logging():
- # issue https://github.com/benoitc/gunicorn/issues/379
- if not app.debug:
- app.logger.addHandler(logging.StreamHandler())
- app.logger.setLevel(logging.INFO)
- #Utilities helpers
- @app.context_processor
- def utility_processor():
- def format_id_agent(value):
- v = value.replace('/', '-')
- return v.replace('@', '_')
- return dict(format_id_agent=format_id_agent)
- @app.context_processor
- def utility_processor():
- def str_status_agent(value):
- try:
- value = int(value)
- except:
- value = 0
- unavailable = [0, 4, 5]
- free = [1]
- if value in unavailable:
- return 'unavailable'
- elif value in free:
- return 'free'
- else:
- return 'busy'
- return dict(str_status_agent=str_status_agent)
- @app.context_processor
- def utility_processor():
- def request_interval():
- return int(__get_entry_ini_default('general', 'interval', 5)) * 1000
- return dict(request_interval=request_interval)
- # ---------------------
- # ---- Routes ---------
- # ---------------------
- # home
- @app.route('/')
- def home():
- data = get_data_queues()
- return render_template('index.html', queues = data)
- @app.route('/queue/<name>')
- def queue(name = None):
- data = get_data_queues(name)
- return render_template('queue.html', data = data, name = name)
- @app.route('/queue/<name>.json')
- def queue_json(name = None):
- data = get_data_queues(name)
- return jsonify(
- name = name,
- data = data,
- current_time = time.strftime("%Y-%m-%d %H:%M:%S")
- )
- # data queue
- @app.route('/queues')
- def queues():
- data = get_data_queues()
- return jsonify(
- data = data
- )
- @app.route('/qstatus')
- def init_route_qstatus():
- return redirect(url_for('qstatus', name=first_queue()))
- @app.route('/qstatus/<name>')
- def qstatus(name):
- if_need_set_queue_globals()
- queues = get_data_queues()
- data = queues[name]
- return render_template('qstatus.html', data = data, queues = queues, name = name)
- @app.route('/qoper/<name>.json')
- def qoper_json(name = None):
- queue_values = data_queue(init_day(), None, None, name)
- data = get_data_queues(name)
- return jsonify(
- name = name,
- data = data,
- values = queue_values,
- current_time = time.strftime("%Y-%m-%d %H:%M:%S")
- )
- @app.route('/qoper')
- def init_route_qoper():
- return redirect(url_for('qoper', name=first_queue()))
- @app.route('/qoper/<name>')
- def qoper(name):
- if_need_set_queue_globals()
- queues = get_data_queues()
- data = queues[name]
- return render_template('qoper.html', data = data, queues = queues, name = name)
- # ---------------------
- # ---- Main ----------
- # ---------------------
- if __name__ == '__main__':
- if is_debug():
- app.config['DEBUG'] = True
- app.logger.debug(APPLICATION_ROOT)
- if APPLICATION_ROOT == '/':
- app.run(host=host_bind(), port=port_bind())
- else:
- application = DispatcherMiddleware(Flask('dummy_app'), {
- app.config['APPLICATION_ROOT']: app,
- })
- run_simple(host_bind(), port_bind(), application, use_reloader=True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement