Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from telegram.ext import (Updater, CommandHandler, MessageHandler,
- Filters, CallbackQueryHandler)
- from telegram import (ReplyKeyboardMarkup, InlineKeyboardButton,
- InlineKeyboardMarkup, ParseMode)
- import urllib.request as url
- import urllib.parse as parse
- from bs4 import BeautifulSoup
- import logging
- from json import loads
- logging.basicConfig(level=logging.INFO)
- MAX_SAVED = 10
- SAVED_PLACES = 'Сохраненные места'
- DELETE_SAVED_PLACE = 'Удалить сохраненное место'
- SETTINGS = 'Настройки'
- NO_SAVED = 'Пока нет сохраненных запросов.'
- SAVE = 'Сохранить запрос'
- START_TEXT = "Бот для получения погоды. \
- Напишите в сообщения ваш запрос на естественном языке."
- PLACES_FILE = 'places.txt'
- HELP_TEXT = 'Запрос выполняется в чат с ботом. Есть возможность сохранить \
- местоположение после выполнения запроса. Впоследствии список всех сохраненных \
- местоположений будет отображаться в меню "Сохраненные места". \
- Удалять из этого можно с помощью меню "Удалить сохраненное место". \
- С помощью меню "Настройки" настраиваются выбранные единицы измерения.'
- CACHE_FILE = 'cache.txt'
- INFO_FILE = 'info.txt'
- KEY = 'AIzaSyDtoJOHBCcbYQmVVnq9hmrrfa7ntO3N6LI'
- HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; rv:46.0) \
- Gecko/20800121 Firefox/46.0', 'Accept-Language': 'ru'}
- KEYBOARD_MARKUP = ReplyKeyboardMarkup([[SAVED_PLACES],
- [DELETE_SAVED_PLACE],
- [SETTINGS]],
- resize_keyboard=True)
- SETTINGS_MARKUP = (lambda user_id:
- InlineKeyboardMarkup([[InlineKeyboardButton('Температура',
- callback_data='T'
- + str(user_id))],
- [InlineKeyboardButton('Скорость ветра',
- callback_data='S' +
- str(user_id))]]))
- TEMP_MARKUP = (lambda user_id:
- InlineKeyboardMarkup([[InlineKeyboardButton('градус Цельсия',
- callback_data='C' +
- str(user_id))],
- [InlineKeyboardButton('градус Фаренгейта',
- callback_data='F' +
- str(user_id))]]))
- SPEED_MARKUP = (lambda user_id:
- InlineKeyboardMarkup([[InlineKeyboardButton('м/с',
- callback_data='R' +
- str(user_id))],
- [InlineKeyboardButton('миль/ч',
- callback_data='A' +
- str(user_id))]]))
- places = {}
- try:
- with open(PLACES_FILE) as f:
- places = eval(f.read())
- except FileNotFoundError:
- with open(PLACES_FILE, 'w') as f:
- f.write(str(places))
- cache1 = {}
- cache2 = {}
- curr_cache = 0
- try:
- with open(CACHE_FILE) as f:
- cache1, cache2 = eval(f.readline().strip()), eval(f.readline().strip())
- curr_cache = int(eval(f.readline().strip()))
- except FileNotFoundError:
- with open(CACHE_FILE, 'w') as f:
- f.write(str(cache1) + '\n' + str(cache2) + '\n' + str(curr_cache))
- info = {}
- try:
- with open(INFO_FILE) as f:
- info = eval(f.read())
- except FileNotFoundError:
- with open(INFO_FILE, 'w') as f:
- f.write(str(info))
- class WeatherInfo:
- def __init__(self, loc_, dts_, dc_, tm_, ttm_, pp_, hm_, ws_):
- self.loc, self.dts, self.dc, self.tm = (loc_, dts_, dc_,
- str(min(int(tm_), int(ttm_))))
- self.ttm, self.pp, self.hm, self.ws = (str(max(int(tm_), int(ttm_))),
- pp_, hm_, ws_)
- def get_info(self, user_id):
- if user_id not in info:
- info[user_id] = [0, 0]
- with open(INFO_FILE, 'w') as f:
- f.write(str(info))
- temp, wind = info[user_id][0], info[user_id][1]
- t_res = self.tm
- if temp == 1:
- t_res = self.ttm
- t_wind = self.ws
- if 'ми/ч' in self.ws:
- t_wind = t_wind.replace('ми/ч', 'миль/ч')
- if 'миль/ч' in t_wind and wind == 0:
- t_wind = int(t_wind.split()[0])
- t_wind = '{} м/с'.format(round(t_wind * 0.44704))
- if 'м/с' in t_wind and wind == 1:
- t_wind = int(t_wind.split()[0])
- t_wind = '{} миль/ч'.format(round(t_wind * 2.23693))
- return '\n'.join([self.loc, self.dts, self.dc,
- 'Температура: {}{}{}'.format(t_res, chr(176),
- 'C' if not temp
- else 'F'),
- 'Вероятность осадков: {}'.format(self.pp),
- 'Влажность: {}'.format(self.hm),
- 'Ветер: {}'.format(t_wind),
- '[{}]({})'.format(u"\u200B", get_image(self.loc))])
- def write_new_cache(location):
- global curr_cache
- if location in cache1:
- return
- cache1[location] = curr_cache
- cache2[curr_cache] = location
- curr_cache += 1
- with open(CACHE_FILE, 'w') as f:
- f.write(str(cache1) + '\n' + str(cache2) + '\n' + str(curr_cache))
- def get_cache(location):
- if location not in cache1:
- write_new_cache(location)
- return cache1[location]
- def str_cache(location):
- return str(get_cache(location))
- def get_location(cache):
- return cache2[int(cache)]
- def get_request(question, key):
- return 'http://www.google.ru/search?q={}\
- &cr=countryRU&lr=lang_ru&key={}'.format(parse.quote(question), key)
- def get_data(soup, id_search, type_search):
- return soup.find(type_search, {'id': id_search}).text
- def update_info():
- with open(INFO_FILE, 'w') as f:
- f.write(str(info))
- def get_image(location):
- req = url.Request('https://www.google.ru/search?q={}\
- &cr=countryRU&lr=lang_ru&tbm=isch&key={}'.format(parse.quote(location), KEY),
- headers=HEADERS)
- page = url.urlopen(req)
- r = page.read().decode('utf-8')
- soup = BeautifulSoup(r, 'lxml')
- im = soup.find('div', {'class': 'rg_bx rg_di rg_el ivg-i'})
- p = loads(im.text[im.text.find('{'):])['ou']
- return p
- def get_by_question(question, user_id):
- question = 'погода ' + question
- try:
- req = url.Request(get_request(question, KEY), headers=HEADERS)
- page = url.urlopen(req)
- soup = BeautifulSoup(page.read().decode('utf-8'), 'lxml')
- weather = soup.find('div', {'id': 'wob_wc'})
- if weather is None:
- return 'Запрос некорректен. Попробуйте переформулировать.'
- wob_loc = get_data(weather, 'wob_loc', 'div')
- wob_dts = get_data(weather, 'wob_dts', 'div')
- wob_dc = get_data(weather, 'wob_dc', 'span')
- wob_tm = get_data(weather, 'wob_tm', 'span')
- wob_ttm = get_data(weather, 'wob_ttm', 'span')
- wob_pp = get_data(weather, 'wob_pp', 'span')
- wob_hm = get_data(weather, 'wob_hm', 'span')
- wob_ws = get_data(weather, 'wob_ws', 'span')
- return WeatherInfo(wob_loc, wob_dts, wob_dc,
- wob_tm, wob_ttm, wob_pp,
- wob_hm, wob_ws).get_info(user_id)
- except Exception as e:
- return 'Произошла ошибка на сервере. Попробуйте позднее.'
- def start(bot, update):
- update.message.reply_text(START_TEXT, reply_markup=KEYBOARD_MARKUP)
- def help_bot(bot, update):
- update.message.reply_text(HELP_TEXT)
- def keyboard_by_user(user_id, erase=False):
- return InlineKeyboardMarkup([[InlineKeyboardButton(name,
- callback_data='1' + str_cache(name) if not
- erase else '2 {} {}'.format(user_id,
- str_cache(name)))]
- for name in places[user_id]])
- def save_location(user_id, location):
- if user_id not in places:
- places[user_id] = [location]
- else:
- if location in places[user_id]:
- return
- places[user_id].append(location)
- if len(places[user_id]) > MAX_SAVED:
- places[user_id].pop(0)
- with open(PLACES_FILE, 'w') as f:
- f.write(str(places))
- def reply_request(bot, update):
- query = update.message.text
- user_id = update.effective_user.id
- if query == SAVED_PLACES:
- if user_id not in places or not places[user_id]:
- update.message.reply_text(NO_SAVED)
- return
- update.message.reply_text(SAVED_PLACES,
- reply_markup=keyboard_by_user(user_id))
- elif query == DELETE_SAVED_PLACE:
- if user_id not in places or not places[user_id]:
- update.message.reply_text(NO_SAVED)
- return
- update.message.reply_text(DELETE_SAVED_PLACE,
- reply_markup=keyboard_by_user(user_id, True))
- elif query == SETTINGS:
- update.message.reply_text(SETTINGS,
- reply_markup=SETTINGS_MARKUP(user_id))
- else:
- ans = get_by_question(query, user_id)
- if len(ans.split('\n')) == 1:
- update.message.reply_text(ans)
- return
- location = ans.split('\n')[0]
- keyboard = None
- keyboard = InlineKeyboardMarkup([[InlineKeyboardButton(SAVE,
- callback_data='3 {} {}'.format(str_cache(location),
- str(user_id)))]])
- if user_id in places and location in places[user_id]:
- keyboard = None
- update.message.reply_text(ans, parse_mode=ParseMode.MARKDOWN,
- reply_markup=keyboard)
- def set_t(c):
- if c in 'CF':
- return 'Температура'
- else:
- return 'Скорость ветра'
- def new_val(c):
- if c == 'C':
- return 'градус Цельсия'
- elif c == 'F':
- return 'градус Фаренгейта'
- elif c == 'R':
- return 'м/с'
- else:
- return 'миль/ч'
- def button(bot, update):
- query = update.callback_query
- data = query.data
- if data[0] == 'T':
- user_id = int(data[1:])
- bot.edit_message_reply_markup(chat_id=query.message.chat_id,
- message_id=query.message.message_id,
- inline_message_id=query.inline_message_id,
- reply_markup=TEMP_MARKUP(user_id))
- elif data[0] == 'S':
- user_id = int(data[1:])
- bot.edit_message_reply_markup(chat_id=query.message.chat_id,
- message_id=query.message.message_id,
- inline_message_id=query.inline_message_id,
- reply_markup=SPEED_MARKUP(user_id))
- elif data[0] in 'CFRA':
- user_id = int(data[1:])
- if user_id not in info:
- info[user_id] = [0, 0]
- info[user_id][int(data[0] in 'RA')] = int(data[0] in 'FA')
- update_info()
- bot.send_message(text='*{}* установлено на *{}*'.format(set_t(data[0]),
- new_val(data[0])),
- chat_id=query.message.chat_id,
- parse_mode=ParseMode.MARKDOWN)
- elif data[0] == '1':
- bot.send_message(text=get_by_question(get_location(data[1:]),
- update.effective_user.id),
- chat_id=query.message.chat_id,
- parse_mode=ParseMode.MARKDOWN)
- elif data[0] == '2':
- data = data.split()
- user_id = int(data[1])
- location = get_location(data[2])
- places[user_id].remove(location)
- bot.send_message(text='*{}* удалено'.format(location),
- chat_id=query.message.chat_id,
- parse_mode=ParseMode.MARKDOWN)
- with open(PLACES_FILE, 'w') as f:
- f.write(str(places))
- else:
- data = data.split()
- user_id = int(data[-1])
- location = get_location(data[1])
- save_location(user_id, location)
- bot.send_message(text='*{}* сохранено'.format(location),
- chat_id=query.message.chat_id,
- parse_mode=ParseMode.MARKDOWN)
- def execute_bot():
- updater = Updater('569733577:AAGE6Rs5V8ORG7fpHNUKNqCasgL0A1Asfoo',
- request_kwargs={"proxy_url":
- "http://94.177.216.109:8888"})
- updater.dispatcher.add_handler(CommandHandler('start', start))
- updater.dispatcher.add_handler(CommandHandler('help', help_bot))
- updater.dispatcher.add_handler(MessageHandler(Filters.text, reply_request))
- updater.dispatcher.add_handler(CallbackQueryHandler(button))
- updater.start_polling()
- updater.idle()
- if __name__ == '__main__':
- execute_bot()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement