Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import requests
- import time
- import datetime
- from dbhelper import DBHelper
- import urllib
- from io import BytesIO
- # info to load at start
- # token is not in the source code for security
- TOKEN = open('token.txt', 'r').read().split('\n')[0].strip()
- # URL to interact with the API
- URL = "https://api.telegram.org/bot{}/".format(TOKEN)
- # handler to the database
- db = DBHelper()
- # caching the number of questions
- # 1 personal 2 food 3 activity
- nq_caterogy = {1:3, 2:2, 3:2}
- ###############
- #
- # ERROR LOG
- #
- ################
- def log_entry(entry):
- # get actual time
- now = datetime.datetime.now()
- dateerror = now.strftime("%Y-%m-%d %H:%M")
- # open the log file in append mode
- with open('error.log', 'a') as log:
- log.write('\n\n'+dateerror+'\n')
- log.write(entry+'\n\n')
- #################
- #
- # BASIC TOOLS
- #
- ################
- def build_keyboard(items):
- # contruir un teclado para una vez
- keyboard = [[item] for item in items]
- reply_markup = {"keyboard":keyboard, "one_time_keyboard": True}
- return json.dumps(reply_markup)
- # funcion auxiliar de nivel más bajo para recibir mensajes
- def get_url(url):
- # funcion para recibir lo que se le pasa al bot
- response = requests.get(url)
- content = response.content.decode("utf8")
- return content
- def get_json_from_url(url):
- # obteniene lo que se le pasa al bot con la funcion auxiliar
- content = get_url(url)
- # transforma de JSON a diccionario
- js = json.loads(content)
- return js
- def get_updates(offset=None):
- # peticion para obtener las novedades
- url = URL + "getUpdates"
- # offset es el numero del ultimo mensaje recibido
- # el objetivo es no volver a pedirlo todo
- if offset:
- url += "?offset={}".format(offset)
- # llamada a la funcion auxiliar
- js = get_json_from_url(url)
- return js
- def get_last_update_id(updates):
- # el orden de llegada de los mensajes al bot produce un id creciente
- # devolvemos el maximo para saber por donde nos hemos quedado
- update_ids = []
- return max([int(el['update_id']) for el in updates['result']])
- #################
- #
- # TELEGRAM API MACROS
- #
- #################
- def send_message(text, chat_id, reply_markup=None):
- text = urllib.parse.quote_plus(text)
- url = URL + "sendMessage?text={}&chat_id={}&parse_mode=Markdown".format(text, chat_id)
- # reply_markup is for a special keyboard
- if reply_markup:
- url += "&reply_markup={}".format(reply_markup)
- get_url(url)
- def send_photo(localpath, chat_id):
- # give the name of a file in 'img' folder
- # send that image to the user
- url = URL + "sendPhoto"
- files = {'photo': open(localpath, 'rb')}
- data = {'chat_id' : chat_id}
- r = requests.post(url, files=files, data=data)
- def send_sticker(sticker_id, chat_id):
- # send an sticker
- url = URL + "sendSticker"
- # atributtes
- data = {'chat_id' : chat_id, 'sticker': sticker_id}
- # it returns the same file if success, can be stored in a variable
- requests.post(url, data=data)
- def forward(chat_from, msg_id, chat_id):
- # forward a messege
- url = URL + "forwardMessage"
- # atributtes
- data = {'chat_id' : chat_id, 'from_chat_id': chat_from, 'message_id':msg_id}
- requests.post(url, data=data)
- def send_GIF(localpath, chat_id):
- # sent a short video as a GIF
- url = URL + "sendVideo"
- # atributtes
- files = {'video': open('img/'+localpath, 'rb')}
- data = {'chat_id' : chat_id}
- requests.post(url, files=files, data=data)
- ###############################
- #
- # BOT SPECIFIC METHODS
- #
- ###############################
- def checknumber(str):
- try:
- float(str)
- return True
- except ValueError:
- return False
- ##############################
- #
- # MAIN FUNCTIONS
- #
- #############################
- def filter_update(update):
- if 'edited_message' in update:
- # check if text
- if 'text' in update['edited_message']:
- # update the answer
- process_edit(update)
- return False, update['edited_message']['chat']['id'], update['edited_message']['message_id']
- else:
- # returning none if it's an update without text
- return None, update["message"]["chat"]["id"]
- else:
- if 'text' in update['message']:
- return update["message"]["text"].strip(), update["message"]["chat"]["id"], update['message']['message_id']
- else:
- # return none if it's a message withpout text
- return None, update["message"]["chat"]["id"], update['message']['message_id']
- def process_edit(update):
- text = update["edited_message"]["text"]
- message_id = update['edited_message']['message_id']
- db.update_response_edited(message_id, text)
- def numeric_keyboard():
- keyboard = [['7', '8', '9'], ['4', '5', '6'], ['1', '2', '3'], ['0']]
- reply_markup = {"keyboard":keyboard, "one_time_keyboard": False}
- return json.dumps(reply_markup)
- def main_menu_keyboard():
- keyboard = [['/personal', '/food'], ['/activity', '/social'], ['/risk', '/tip']]
- reply_markup = {"keyboard":keyboard, "one_time_keyboard": True}
- return json.dumps(reply_markup)
- def handle_updates(updates):
- for update in updates["result"]:
- # controlar si hay texto
- # funcion auxiliar que trata eltipo de mensaje
- text, chat, message_id = filter_update(update)
- if text == False:
- continue
- elif text == None:
- send_message('This kind of message is not supported yet', chat)
- continue
- # get the actual phase and question
- #items = db.get_items(chat) ##
- if text == '/start':
- # wellcome message
- hello = 'CAADAgADBQYAAhhC7giZXSreX-e4UgI'
- send_sticker(hello, chat)
- send_message(open('text/en/welcome.txt', 'r').read(), chat)
- # insert user into the db, check collisions
- if db.check_start(chat):
- # sanity check
- try:
- db.register_user(chat)
- except:
- pass
- else:
- db.change_phase(newphase = 0, id_user=chat)
- # show keyboard for the menu
- send_message('Choose your option!', chat,main_menu_keyboard())
- # Check if the user have done the start command
- elif db.check_user(chat):
- send_message('An error ocurred, pls use /start to restart the bot!', chat)
- continue
- elif text == '/personal':
- # set to phase and question 1
- db.change_phase(newphase= 1, id_user=chat)
- # welcome to personal
- # TODO
- # throw first question
- q1 = db.get_question(1, 1)
- send_message(q1, chat, numeric_keyboard())
- elif text == '/food':
- # set to phase and question 1
- db.change_phase(newphase= 2, id_user=chat)
- # welcome to personal
- # TODO
- # throw first question
- q1 = db.get_question(2, 1)
- send_message(q1, chat, numeric_keyboard())
- elif text == '/activity':
- # set to phase and question 1
- db.change_phase(newphase= 3, id_user=chat)
- # welcome to personal
- # TODO
- # throw first question
- q1 = db.get_question(3, 1)
- send_message(q1, chat, numeric_keyboard())
- elif text == '/tip':
- completed = db.check_completed(chat)
- # put phase to 0
- db.change_phase(newphase = 0, id_user=chat)
- # check if completed all questionaries
- if completed[0] and completed[1] and completed[2]:
- # give a personall advice
- send_message('This will be a "custom" tip depending on the user results', chat)
- # otherwise
- else:
- # give a general advice
- send_message('User did not complete all the surveys. Generic tip.', chat)
- send_message('Choose your option!', chat, main_menu_keyboard())
- elif text == '/risk':
- completed = db.check_completed(chat)
- # put phase to 0
- db.change_phase(newphase = 0, id_user=chat)
- # check if completed all questionaries
- if completed[0] and completed[1] and completed[2]:
- # give a personall advice
- send_message('Im not able to calculate your obesity risk yet, but look at this cute avocado!', chat)
- send_photo('img/avo.jpg', chat)
- # otherwise
- else:
- # give a general advice
- send_message('User did not complete all the surveys.', chat)
- send_message('Choose your option!', chat, main_menu_keyboard())
- else:
- # recata a que responde
- status = db.get_phase_question(chat)
- if (status[0] == 0):
- send_message('Choose your option!', chat, main_menu_keyboard())
- continue
- # Si no es una respuesta valida
- if not checknumber(text):
- send_message('Sorry but your answer have to be a number!', chat)
- continue
- # comprueba si el usuario ya ha respondido a la pregunta
- # check response
- if db.check_answer(chat, phase=status[0], question=status[1]):
- # guarda la respuesta del usuario
- db.add_answer(id_user=chat, phase=status[0], question=status[1], message_id=message_id, answer=text)
- else:
- db.update_response(id_user=chat, phase=status[0], question=status[1], message_id=message_id, answer=text)
- # comprueba que no es la ultima pregunta
- if nq_caterogy[status[0]] > status[1]:
- # no lo es
- # pasa de estado
- db.next_question(chat)
- # siguiente pregunta
- q = db.get_question(status[0], status[1]+1)
- send_message(q, chat)
- else:
- # si lo es, actualiza estatus y "vuelve" al menu principal
- db.completed_survey(chat, status[0])
- send_message('Thanks for completing the survey!', chat)
- send_message('Choose your option!', chat, main_menu_keyboard())
- def main():
- # variable para controlar el numero de mensajes
- last_update_id = None
- db.setup()
- # bucle infinito
- while True:
- # obten los mensajes no vistos
- updates = get_updates(last_update_id)
- # si hay algun mensaje do work
- try:
- if len(updates["result"]) > 0:
- last_update_id = get_last_update_id(updates) + 1
- handle_updates(updates)
- # hay que dejar descansar los servidores de telegram
- time.sleep(0.5)
- except:
- print('Error ocurred, watch log!')
- log_entry(str(updates))
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement