Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import json
- import logging
- import os
- import telegram
- from queue import Queue, Empty as EmptyQueue
- import sys
- import threading
- import time
- # from urllib.parse import urlparse, urljoin
- import exc
- if ((3, 0) <= sys.version_info <= (3, 9)):
- from urllib.parse import urlparse, urljoin
- elif ((2, 0) <= sys.version_info <= (2, 9)):
- from urlparse import urlparse, urljoin
- from flask import Flask, request, render_template
- from flask_cors import CORS
- from flask_sse import sse
- from redis import StrictRedis
- import requests
- # from roboloide_modelo import utils
- import subprocess
- app = Flask(__name__)
- CORS(app)
- app.config["REDIS_URL"] = "redis://localhost:6379"
- app.register_blueprint(sse, url_prefix='/notificador-stream')
- app.config['SECRET_KEY'] = os.urandom(24)
- thread = None
- LOGGER = logging.getLogger('roboloide_notificador')
- LOGGER.setLevel(level=logging.DEBUG)
- handler = logging.StreamHandler(sys.stdout)
- handler.setFormatter(logging.Formatter(
- '------ %(levelname)s %(name)s - %(pathname)s::%(lineno)dn'
- '%(asctime)s: %(message)s'))
- LOGGER.addHandler(handler)
- # bot = telegram.Bot(token='563825372:AAGhYPGYeci6p2s5SFUtey2XFL2b8gFNfEA')
- def _parse_args(argv, progname=None):
- progname = progname or 'roboloide-notificador'
- parser = argparse.ArgumentParser(prog=progname,
- description='Notificador de eventos')
- parser.add_argument('--debug', '-d', action='store_true',
- help='Enable debug mode')
- parser.add_argument('--port', '-p', default=8000,
- help='TCP port')
- parser.add_argument('--host', '-h', default='0.0.0.0',
- help='Host name')
- return parser.parse_args(argv)
- # def main(argv=None, progname=None):
- # # def _parse_args(argv, progname=None):
- # progname = progname or 'roboloide-modelo'
- # argv = argv or sys.argv[1:]
- # parser = argparse.ArgumentParser(
- # prog=progname, description='Modelo de dados do Roboloide')
- # parser.add_argument('--hostname', default='0.0.0.0',
- # help='Hostname')
- # parser.add_argument('--port', default=8000, type=int,
- # help='TCP port')
- # parser.add_argument('--debug', action='store_true', help='Debug mode')
- # args = parser.parse_args(argv)
- # if args.debug:
- # utils.redis_cache.invalidar_tudo()
- # app.run(
- # host=args.hostname,
- # port=args.port,
- # debug=True
- # )
- # else:
- # new_env = os.environ.copy()
- # new_env['PYTHONUNBUFFERED'] = '1'
- # subprocess.call(
- # ['gunicorn', 'roboloide_modelo.notificador:app', '--reload',
- # '--worker-class', 'gevent', '--workers', os.environ.get('6', '9'),
- # '--bind', '{args.hostname}:{args.port}'.format(args=args),
- # '--config', 'python:gunicorn_conf'],
- # stderr=sys.stderr, stdout=sys.stdout, stdin=sys.stdin,
- # env=new_env
- # )
- def is_safe_url(target):
- ref_url = urlparse(request.host_url)
- test_url = urlparse(urljoin(request.host_url, target))
- return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
- def escutar_eventos():
- try:
- print('ouvindo')
- redis = StrictRedis()
- channel = redis.pubsub()
- canais = [
- 'roboloide-captura-eventos',
- 'roboloide-captura-captura',
- 'roboloide-ajuste-eventos',
- 'roboloide-modelo-eventos',
- 'roboloide-controlador-eventos',
- 'roboloide-notificador-eventos',
- 'roboloide-modelo-captura'
- ]
- channel.subscribe(*canais)
- except exc.RoboNotificadorEncerrou('Robo notificador foi encerrado.'):
- msg = 'ERRO!n'
- msg += 'Tipo: Módulo caiun'
- msg += 'Módulo: Notificadorn'
- msg += 'Descrição: Módulo Notificador foi encerrado.n'
- # bot.send_message(chat_id='-205358775', text=msg)
- raise
- print ("saiu")
- def inicializar_thread():
- queue = Queue()
- thread = threading.Thread(target = escutar_eventos, args=(queue),)
- thread.start()
- time.sleep(1)
- @app.route('/notificador/',methods=['GET','POST'])
- def test_page():
- return render_template('index.html')
- @app.route('/notificador/publicar', methods=['GET','POST'])
- def publicar_evento():
- # TODO restringir o acesso a este viewpoint para apenas este módulo
- print(request.get_json())
- data = request.get_json()
- sse.publish(data, type='event')
- return 'mensagem publicada'
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement