Advertisement
kmzkitties

sample

Aug 22nd, 2019
374
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.82 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. import re
  3. from datetime import datetime
  4. from random import choice, randint
  5. from urllib.parse import parse_qs, quote
  6.  
  7. import requests
  8. from flask import Flask, request, Response
  9.  
  10. from config import *
  11.  
  12.  
  13. class InvalidSourceUserException(Exception): pass
  14. class InvalidTargetUserException(Exception): pass
  15.  
  16.  
  17. def urban_dictionary_process(response):
  18. data = response.json()
  19.  
  20. if len(data['list']) == 0:
  21. return 'Term not found.'
  22.  
  23. item = data['list'][0]
  24. word = item['word']
  25. word_f = '**{}.**'.format(word)
  26. word_enc = quote(word)
  27.  
  28. definition = item['definition'].replace('\r', '').replace('\n', ' ').strip()
  29. definition = URBANDICTIONARY_REGEX.sub(r'\1', definition)
  30.  
  31. url = 'https://www.urbandictionary.com/define.php?term={}'.format(word_enc)
  32.  
  33. full_length = len(word_f) + 1 + len(definition) + 1 + len(url)
  34.  
  35. if full_length >= 400:
  36. limit = len(definition) - (full_length - 400)
  37. print('Limit', limit)
  38. definition = definition[:limit-1].strip() + '…'
  39.  
  40. output = '{} {} {}'.format(word_f, definition, url)
  41.  
  42. print('Length', len(output))
  43.  
  44. return output
  45.  
  46.  
  47. def tatsumaki_process(response):
  48. data = response.json()
  49.  
  50. if data is None:
  51. return 'API error: unable to decode JSON data.'
  52.  
  53. if isinstance(data, dict) and 'message' in data:
  54. return 'API error: {}'.format(data['message'])
  55.  
  56. return {
  57. int(x['user_id']): int(x['score'])
  58. for x in response.json()
  59. if x is not None
  60. }
  61.  
  62.  
  63. DISCORD_USER_ID_REGEX = re.compile(r'(?<=<@)\d+?(?=>)')
  64. TWITCH_USER_ID_REGEX = re.compile(r'@[a-z0-9][a-z0-9_]{3,24}', re.ASCII | re.IGNORECASE)
  65. URBANDICTIONARY_REGEX = re.compile(r'\[(.+?)\]')
  66. DATA_SOURCES = {
  67. 'tatsumaki_leaderboard': {
  68. 'request': lambda **kwargs: requests.Request(
  69. 'GET',
  70. 'https://api.tatsumaki.xyz/guilds/{}/leaderboard'.format(kwargs['guild_id']),
  71. params={'limit': TATSUMAKI_ROW_LIMIT},
  72. headers={'Authorization': TATSUMAKI_API_KEY}
  73. ),
  74. 'process': tatsumaki_process,
  75. 'lifetime': TATSUMAKI_CACHE_PERIOD,
  76. },
  77. 'urban_dictionary': {
  78. 'request': lambda **kwargs: requests.Request(
  79. 'GET',
  80. 'https://api.urbandictionary.com/v0/define',
  81. params={'term': kwargs['term']},
  82. ),
  83. 'process': urban_dictionary_process,
  84. 'lifetime': None,
  85. }
  86. }
  87.  
  88. app = Flask(__name__)
  89. cache = {}
  90. session = requests.Session()
  91.  
  92.  
  93. def retrieve(data_source_id, *args, **kwargs):
  94. if data_source_id not in DATA_SOURCES:
  95. raise KeyError('Invalid data source ID.')
  96.  
  97. data_source = DATA_SOURCES[data_source_id]
  98.  
  99. current_datetime = datetime.now()
  100.  
  101. if data_source['lifetime'] is not None and data_source_id in cache and current_datetime - cache[data_source_id]['updated'] < data_source['lifetime']:
  102. return cache[data_source_id]['data'], True
  103.  
  104. prepared_request = data_source['request'](*args, **kwargs).prepare()
  105. response = session.send(prepared_request, timeout=HTTP_REQUEST_TIMEOUT)
  106. data = data_source['process'](response)
  107. updated = current_datetime
  108.  
  109. if data_source['lifetime'] is not None:
  110. cache[data_source_id] = {'data': data, 'updated': updated}
  111.  
  112. return data, False
  113.  
  114.  
  115. def fight_discord(header, query):
  116. try:
  117. source_user = int(header['providerId'][0])
  118. except:
  119. raise InvalidSourceUserException()
  120.  
  121. try:
  122. target_user = int(DISCORD_USER_ID_REGEX.findall(query.replace('!', ''))[0])
  123. except:
  124. raise InvalidTargetUserException()
  125.  
  126. source_text = '<@{}>'.format(source_user)
  127. target_text = '<@{}>'.format(target_user)
  128.  
  129. if source_user == target_user:
  130. return source_text, target_text, 1, 1, True
  131.  
  132. try:
  133. leaderboard, is_cached = retrieve('tatsumaki_leaderboard', guild_id=DISCORD_SERVER_ID)
  134.  
  135. if isinstance(leaderboard, str):
  136. return leaderboard
  137. except:
  138. return 'API error: Unable to get leaderboard from Tatsumaki.'
  139.  
  140. if source_user in leaderboard:
  141. source_score = leaderboard[source_user]
  142. else:
  143. return "You're not in the leaderboard just yet, try fighting again later."
  144.  
  145. if target_user in leaderboard:
  146. target_score = leaderboard[target_user]
  147. else:
  148. return "You can't fight somebody with no experience at all. Shame on you!"
  149.  
  150. return source_text, target_text, source_score, target_score, is_cached
  151.  
  152.  
  153. def fight_twitch(header, query):
  154. try:
  155. source_user = '@' + header['name'][0].lower()
  156. except:
  157. raise InvalidSourceUserException()
  158.  
  159. try:
  160. target_user = TWITCH_USER_ID_REGEX.findall(query)[0].lower()
  161. except:
  162. raise InvalidTargetUserException()
  163.  
  164. return source_user, target_user, 1, 1, True
  165.  
  166.  
  167. @app.route('/fight/')
  168. def fight():
  169. # Headers
  170. if 'Nightbot-User' not in request.headers:
  171. return 'Required headers missing'
  172.  
  173. # Query arguments
  174. if 'target' not in request.args:
  175. return 'Required query arguments missing'
  176.  
  177. print('HEADER {}'.format(request.headers['Nightbot-User']))
  178. print('QUERY {}'.format(request.args['target']))
  179.  
  180. # Source user data
  181. try:
  182. request_header = parse_qs(request.headers['Nightbot-User'], strict_parsing=True)
  183. request_query = request.args['target']
  184. except:
  185. return 'Error while parsing header data'
  186.  
  187. try:
  188. provider = request_header['provider'][0]
  189.  
  190. if provider == 'discord':
  191. result = fight_discord(request_header, request_query)
  192. elif provider == 'twitch':
  193. result = fight_twitch(request_header, request_query)
  194. else:
  195. result = 'This platform does currently not support ranked fights.'
  196.  
  197. if isinstance(result, str):
  198. return result
  199. else:
  200. source_text, target_text, source_score, target_score, is_cached = result
  201. except InvalidSourceUserException:
  202. return "I don't recognize you, sorry."
  203. except InvalidTargetUserException:
  204. return "You specified an invalid user to fight. You have to mention them with an @."
  205.  
  206. if source_text == target_text:
  207. return '{} fought with themselves and are now in a state of quantum superposition.'.format(source_text)
  208.  
  209. # Calculate the fight result
  210. result = randint(1, source_score + target_score + 1)
  211. probability = source_score / (source_score + target_score)
  212. is_win = 1 <= result <= source_score
  213.  
  214. print('{} ({}) vs {} ({}) got {} prob {:.02%} cache {}'.format(
  215. source_text,
  216. source_score,
  217. target_text,
  218. target_score,
  219. result,
  220. probability,
  221. is_cached,
  222. ))
  223.  
  224. # Choose a weapon and win/loss message
  225. verb = choice(FIGHT_VERB)
  226. weapon = choice(FIGHT_WEAPONS)
  227. conclusion = choice(FIGHT_WIN) + '!' if is_win else choice(FIGHT_LOSS) + '.'
  228.  
  229. response_text = '{} {} {} using {} and {} ({:.01%} win chance)'.format(
  230. source_text,
  231. verb,
  232. target_text,
  233. weapon,
  234. conclusion,
  235. probability,
  236. )
  237.  
  238. resp = Response(response_text, mimetype='text/plain')
  239. resp.headers['X-Cache-Hit'] = 'HIT' if is_cached else 'MISS'
  240. return resp
  241.  
  242.  
  243. @app.route('/urban/')
  244. def urbandictionary():
  245. # Headers
  246. if 'Nightbot-User' not in request.headers:
  247. return 'Required headers missing'
  248.  
  249. # Query arguments
  250. if 'term' not in request.args:
  251. return 'Required query arguments missing'
  252.  
  253. result, is_cached = retrieve('urban_dictionary', term=request.args['term'])
  254.  
  255. resp = Response(result, mimetype='text/plain')
  256. resp.headers['X-Cache-Hit'] = 'HIT' if is_cached else 'MISS'
  257. return resp
  258.  
  259.  
  260. if __name__ == '__main__':
  261. app.run(host='0.0.0.0', port=44444)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement