Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import re
- from datetime import datetime
- from random import choice, randint
- from urllib.parse import parse_qs, quote
- import requests
- from flask import Flask, request, Response
- from config import *
- class InvalidSourceUserException(Exception): pass
- class InvalidTargetUserException(Exception): pass
- def urban_dictionary_process(response):
- data = response.json()
- if len(data['list']) == 0:
- return 'Term not found.'
- item = data['list'][0]
- word = item['word']
- word_f = '**{}.**'.format(word)
- word_enc = quote(word)
- definition = item['definition'].replace('\r', '').replace('\n', ' ').strip()
- definition = URBANDICTIONARY_REGEX.sub(r'\1', definition)
- url = 'https://www.urbandictionary.com/define.php?term={}'.format(word_enc)
- full_length = len(word_f) + 1 + len(definition) + 1 + len(url)
- if full_length >= 400:
- limit = len(definition) - (full_length - 400)
- print('Limit', limit)
- definition = definition[:limit-1].strip() + '…'
- output = '{} {} {}'.format(word_f, definition, url)
- print('Length', len(output))
- return output
- def tatsumaki_process(response):
- data = response.json()
- if data is None:
- return 'API error: unable to decode JSON data.'
- if isinstance(data, dict) and 'message' in data:
- return 'API error: {}'.format(data['message'])
- return {
- int(x['user_id']): int(x['score'])
- for x in response.json()
- if x is not None
- }
- DISCORD_USER_ID_REGEX = re.compile(r'(?<=<@)\d+?(?=>)')
- TWITCH_USER_ID_REGEX = re.compile(r'@[a-z0-9][a-z0-9_]{3,24}', re.ASCII | re.IGNORECASE)
- URBANDICTIONARY_REGEX = re.compile(r'\[(.+?)\]')
- DATA_SOURCES = {
- 'tatsumaki_leaderboard': {
- 'request': lambda **kwargs: requests.Request(
- 'GET',
- 'https://api.tatsumaki.xyz/guilds/{}/leaderboard'.format(kwargs['guild_id']),
- params={'limit': TATSUMAKI_ROW_LIMIT},
- headers={'Authorization': TATSUMAKI_API_KEY}
- ),
- 'process': tatsumaki_process,
- 'lifetime': TATSUMAKI_CACHE_PERIOD,
- },
- 'urban_dictionary': {
- 'request': lambda **kwargs: requests.Request(
- 'GET',
- 'https://api.urbandictionary.com/v0/define',
- params={'term': kwargs['term']},
- ),
- 'process': urban_dictionary_process,
- 'lifetime': None,
- }
- }
- app = Flask(__name__)
- cache = {}
- session = requests.Session()
- def retrieve(data_source_id, *args, **kwargs):
- if data_source_id not in DATA_SOURCES:
- raise KeyError('Invalid data source ID.')
- data_source = DATA_SOURCES[data_source_id]
- current_datetime = datetime.now()
- if data_source['lifetime'] is not None and data_source_id in cache and current_datetime - cache[data_source_id]['updated'] < data_source['lifetime']:
- return cache[data_source_id]['data'], True
- prepared_request = data_source['request'](*args, **kwargs).prepare()
- response = session.send(prepared_request, timeout=HTTP_REQUEST_TIMEOUT)
- data = data_source['process'](response)
- updated = current_datetime
- if data_source['lifetime'] is not None:
- cache[data_source_id] = {'data': data, 'updated': updated}
- return data, False
- def fight_discord(header, query):
- try:
- source_user = int(header['providerId'][0])
- except:
- raise InvalidSourceUserException()
- try:
- target_user = int(DISCORD_USER_ID_REGEX.findall(query.replace('!', ''))[0])
- except:
- raise InvalidTargetUserException()
- source_text = '<@{}>'.format(source_user)
- target_text = '<@{}>'.format(target_user)
- if source_user == target_user:
- return source_text, target_text, 1, 1, True
- try:
- leaderboard, is_cached = retrieve('tatsumaki_leaderboard', guild_id=DISCORD_SERVER_ID)
- if isinstance(leaderboard, str):
- return leaderboard
- except:
- return 'API error: Unable to get leaderboard from Tatsumaki.'
- if source_user in leaderboard:
- source_score = leaderboard[source_user]
- else:
- return "You're not in the leaderboard just yet, try fighting again later."
- if target_user in leaderboard:
- target_score = leaderboard[target_user]
- else:
- return "You can't fight somebody with no experience at all. Shame on you!"
- return source_text, target_text, source_score, target_score, is_cached
- def fight_twitch(header, query):
- try:
- source_user = '@' + header['name'][0].lower()
- except:
- raise InvalidSourceUserException()
- try:
- target_user = TWITCH_USER_ID_REGEX.findall(query)[0].lower()
- except:
- raise InvalidTargetUserException()
- return source_user, target_user, 1, 1, True
- @app.route('/fight/')
- def fight():
- # Headers
- if 'Nightbot-User' not in request.headers:
- return 'Required headers missing'
- # Query arguments
- if 'target' not in request.args:
- return 'Required query arguments missing'
- print('HEADER {}'.format(request.headers['Nightbot-User']))
- print('QUERY {}'.format(request.args['target']))
- # Source user data
- try:
- request_header = parse_qs(request.headers['Nightbot-User'], strict_parsing=True)
- request_query = request.args['target']
- except:
- return 'Error while parsing header data'
- try:
- provider = request_header['provider'][0]
- if provider == 'discord':
- result = fight_discord(request_header, request_query)
- elif provider == 'twitch':
- result = fight_twitch(request_header, request_query)
- else:
- result = 'This platform does currently not support ranked fights.'
- if isinstance(result, str):
- return result
- else:
- source_text, target_text, source_score, target_score, is_cached = result
- except InvalidSourceUserException:
- return "I don't recognize you, sorry."
- except InvalidTargetUserException:
- return "You specified an invalid user to fight. You have to mention them with an @."
- if source_text == target_text:
- return '{} fought with themselves and are now in a state of quantum superposition.'.format(source_text)
- # Calculate the fight result
- result = randint(1, source_score + target_score + 1)
- probability = source_score / (source_score + target_score)
- is_win = 1 <= result <= source_score
- print('{} ({}) vs {} ({}) got {} prob {:.02%} cache {}'.format(
- source_text,
- source_score,
- target_text,
- target_score,
- result,
- probability,
- is_cached,
- ))
- # Choose a weapon and win/loss message
- verb = choice(FIGHT_VERB)
- weapon = choice(FIGHT_WEAPONS)
- conclusion = choice(FIGHT_WIN) + '!' if is_win else choice(FIGHT_LOSS) + '.'
- response_text = '{} {} {} using {} and {} ({:.01%} win chance)'.format(
- source_text,
- verb,
- target_text,
- weapon,
- conclusion,
- probability,
- )
- resp = Response(response_text, mimetype='text/plain')
- resp.headers['X-Cache-Hit'] = 'HIT' if is_cached else 'MISS'
- return resp
- @app.route('/urban/')
- def urbandictionary():
- # Headers
- if 'Nightbot-User' not in request.headers:
- return 'Required headers missing'
- # Query arguments
- if 'term' not in request.args:
- return 'Required query arguments missing'
- result, is_cached = retrieve('urban_dictionary', term=request.args['term'])
- resp = Response(result, mimetype='text/plain')
- resp.headers['X-Cache-Hit'] = 'HIT' if is_cached else 'MISS'
- return resp
- if __name__ == '__main__':
- app.run(host='0.0.0.0', port=44444)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement