Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- glicko2
- ~~~~~~~
- The Glicko2 rating system.
- :copyright: (c) 2012 by Heungsub Lee
- :license: BSD, see LICENSE for more details.
- =========================
- Applied to Art-Battles by Wolfram
- """
- import math
- import io
- import operator
- import datetime
- #: The actual score for win
- WIN = 1.
- #: The actual score for draw
- DRAW = 0.5
- #: The actual score for loss
- LOSS = 0.
- MU = 1500.0
- PHI = 350.0
- SIGMA = 0.06
- TAU = 1.0
- EPSILON = 0.000001
- #: A constant which is used to standardize the logistic function to
- #: `1/(1+exp(-x))` from `1/(1+10^(-r/400))`
- Q = math.log(10) / 400
- DAYS = [None, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
- HAT = [u"Participant", u"Rate", u"RD", u"AB", u"123ranks"]
- class Rating(object):
- def __init__(self, mu=MU, phi=PHI, sigma=SIGMA):
- self.mu = mu
- self.phi = phi
- self.sigma = sigma
- def __repr__(self):
- args = (self.mu, self.phi, self.sigma)
- return '(mu=%.3f, phi=%.3f, sigma=%.3f)' % args
- def __lt__(self, other):
- return self.mu < other.mu
- class Glicko2(object):
- def __init__(self, mu=MU, phi=PHI, sigma=SIGMA, tau=TAU, epsilon=EPSILON):
- self.mu = mu
- self.phi = phi
- self.sigma = sigma
- self.tau = tau
- self.epsilon = epsilon
- def create_rating(self, mu=None, phi=None, sigma=None):
- if mu is None:
- mu = self.mu
- if phi is None:
- phi = self.phi
- if sigma is None:
- sigma = self.sigma
- return Rating(mu, phi, sigma)
- def scale_down(self, rating, ratio=173.7178):
- mu = (rating.mu - self.mu) / ratio
- phi = rating.phi / ratio
- return self.create_rating(mu, phi, rating.sigma)
- def scale_up(self, rating, ratio=173.7178):
- mu = rating.mu * ratio + self.mu
- phi = rating.phi * ratio
- return self.create_rating(mu, phi, rating.sigma)
- def reduce_impact(self, rating):
- """The original form is `g(RD)`. This function reduces the impact of
- games as a function of an opponent's RD.
- """
- return 1 / math.sqrt(1 + (3 * rating.phi ** 2) / (math.pi ** 2))
- def expect_score(self, rating, other_rating, impact):
- return 1. / (1 + math.exp(-impact * (rating.mu - other_rating.mu)))
- def determine_sigma(self, rating, difference, variance):
- """Determines new sigma."""
- phi = rating.phi
- difference_squared = difference ** 2
- # 1. Let a = ln(s^2), and define f(x)
- alpha = math.log(rating.sigma ** 2)
- def f(x):
- """This function is twice the conditional log-posterior density of
- phi, and is the optimality criterion.
- """
- tmp = phi ** 2 + variance + math.exp(x)
- a = math.exp(x) * (difference_squared - tmp) / (2 * tmp ** 2)
- b = (x - alpha) / (self.tau ** 2)
- return a - b
- # 2. Set the initial values of the iterative algorithm.
- a = alpha
- if difference_squared > phi ** 2 + variance:
- b = math.log(difference_squared - phi ** 2 - variance)
- else:
- k = 1
- while f(alpha - k * math.sqrt(self.tau ** 2)) < 0:
- k += 1
- b = alpha - k * math.sqrt(self.tau ** 2)
- # 3. Let fA = f(A) and f(B) = f(B)
- f_a, f_b = f(a), f(b)
- # 4. While |B-A| > e, carry out the following steps.
- # (a) Let C = A + (A - B)fA / (fB-fA), and let fC = f(C).
- # (b) If fCfB < 0, then set A <- B and fA <- fB; otherwise, just set
- # fA <- fA/2.
- # (c) Set B <- C and fB <- fC.
- # (d) Stop if |B-A| <= e. Repeat the above three steps otherwise.
- while abs(b - a) > self.epsilon:
- c = a + (a - b) * f_a / (f_b - f_a)
- f_c = f(c)
- if f_c * f_b < 0:
- a, f_a = b, f_b
- else:
- f_a /= 2
- b, f_b = c, f_c
- # 5. Once |B-A| <= e, set s' <- e^(A/2)
- return math.exp(1) ** (a / 2)
- def rate(self, rating, series):
- # Step 2. For each player, convert the rating and RD's onto the
- # Glicko-2 scale.
- rating = self.scale_down(rating)
- # Step 3. Compute the quantity v. This is the estimated variance of the
- # team's/player's rating based only on game outcomes.
- # Step 4. Compute the quantity difference, the estimated improvement in
- # rating by comparing the pre-period rating to the performance
- # rating based only on game outcomes.
- d_square_inv = 0
- variance_inv = 0
- difference = 0
- for actual_score, other_rating in series:
- other_rating = self.scale_down(other_rating)
- impact = self.reduce_impact(other_rating)
- expected_score = self.expect_score(rating, other_rating, impact)
- variance_inv += impact ** 2 * expected_score * (1 - expected_score)
- difference += impact * (actual_score - expected_score)
- d_square_inv += (
- expected_score * (1 - expected_score) *
- (Q ** 2) * (impact ** 2))
- difference /= variance_inv
- variance = 1. / variance_inv
- denom = rating.phi ** -2 + d_square_inv
- phi = math.sqrt(1 / denom)
- # Step 5. Determine the new value, Sigma', of the sigma. This
- # computation requires iteration.
- sigma = self.determine_sigma(rating, difference, variance)
- # Step 6. Update the rating deviation to the new pre-rating period
- # value, Phi*.
- phi_star = math.sqrt(phi ** 2 + sigma ** 2)
- # Step 7. Update the rating and RD to the new values, Mu' and Phi'.
- phi = 1 / math.sqrt(1 / phi_star ** 2 + 1 / variance)
- mu = rating.mu + phi ** 2 * (difference / variance)
- # Step 8. Convert ratings and RD's back to original scale.
- return self.scale_up(self.create_rating(mu, phi, sigma))
- def rate_1vs1(self, rating1, rating2, drawn=False):
- return (self.rate(rating1, [(DRAW if drawn else WIN, rating2)]),
- self.rate(rating2, [(DRAW if drawn else LOSS, rating1)]))
- def quality_1vs1(self, rating1, rating2):
- expected_score1 = self.expect_score(rating1, rating2, self.reduce_impact(rating1))
- expected_score2 = self.expect_score(rating2, rating1, self.reduce_impact(rating2))
- expected_score = (expected_score1 + expected_score2) / 2
- return 2 * (0.5 - abs(0.5 - expected_score))
- class Round(object):
- def __init__(self, game):
- self.date = game[0][5:]
- # Game result represents as 2-dim list: sublists are ranks that contain
- # all names of participants, sharing this rank
- self.result = map(lambda k: k.split("="), game[1:])
- def __repr__(self):
- c = type(self)
- args = (c.__module__, c.__name__, self.date, self.result)
- return '%s.%s(date=%s, result=%s)' % args
- class Combo(object):
- def __init__(self):
- self.content = []
- def __repr__(self):
- return u", ".join(map(unicode, self.content))
- def __lt__(self, other):
- return len(self.content) < len(other.content)
- def __len__(self):
- return len(self.content)
- def append(self, date):
- self.content.append(date)
- def copy(self, other):
- self.clear()
- for i in other.content:
- self.append(i)
- def clear(self):
- self.content = []
- def reverse(self):
- return u", ".join(map(unicode, reversed(self.content)))
- class Participant(object):
- def __init__(self, name):
- self.name = name
- self.date_of_last_game = None
- self.rating = Rating()
- self.games = 0
- self.ranks = [0, 0, 0]
- self.participation_combo = Combo()
- self.best_participation_combo = Combo()
- self.winning_combo = Combo()
- self.best_winning_combo = Combo()
- def update_game(self, last_game_date, curr_game_date, rank):
- self.games += 1
- if last_game_date == self.date_of_last_game:
- self.participation_combo.append(curr_game_date)
- else:
- self.participation_combo.clear()
- self.participation_combo.append(curr_game_date)
- if rank == 0:
- self.winning_combo.append(curr_game_date)
- else:
- self.winning_combo.clear()
- self.update_best(self.best_winning_combo, self.winning_combo)
- self.update_best(self.best_participation_combo, self.participation_combo)
- if 0 <= rank <= 2:
- self.ranks[rank] += 1
- self.date_of_last_game = curr_game_date
- def update_best(self, best, curr):
- if best < curr:
- best.copy(curr)
- def table_to_string(table, sorting_column=None, hat=HAT):
- if sorting_column is not None:
- table = sort_by_column(table, sorting_column)
- table.reverse()
- table = [hat] + table
- table = map(lambda l: map(lambda k: unicode(k), l), table)
- maxlen = [0 for _ in table[0]]
- for row in table:
- for i, j in enumerate(row):
- if len(j) > maxlen[i]:
- maxlen[i] = len(j)
- output_string = u""
- for row in table:
- for i, j in enumerate(row):
- output_string += j.ljust(maxlen[i]+1)
- output_string += u"\n"
- return output_string
- def print_table(filename, table, sorting_column=None, hat=HAT):
- output_file = io.open(filename, "w")
- output_file.write(table_to_string(table, sorting_column, hat))
- output_file.close()
- def sort_by_column(table, column):
- return sorted(table, key=operator.itemgetter(column))
- def date_difference(date1, date2):
- d1, m1, y1 = map(int, date1.split("."))
- d2, m2, y2 = map(int, date2.split("."))
- month_difference = 0
- if m1 > m2:
- m1, m2 = m2, m1
- inv = 1
- else:
- inv = -1
- for i in xrange(m1, m2):
- month_difference += DAYS[i]
- return (y1-y2)*365 + inv*month_difference + (d1-d2)
- def current_date():
- return unicode(datetime.datetime.now())[:10]
- def main(path="artbattle.txt", post_template="post_template.txt"):
- main_file = io.open(path)
- initial_lines = main_file.readlines()
- lines = map(lambda k: k.replace("\n", ""), initial_lines)
- games = [] # List of games: any game is list of strings
- current_game = None
- for line in lines:
- if line[:5] == "game ":
- if current_game is not None:
- games.append(current_game)
- current_game = []
- current_game.append(line)
- games.append(current_game)
- games = map(Round, games) # Convert to Round class
- player_dictionary = {} # Name: player
- art_battles_with_i_players = []
- glicko2 = Glicko2()
- change_rating_file = io.open("changes.txt", "w")
- log_file = io.open("log.txt", "w")
- ch_rate_table = []
- date_of_last_game = None
- for game in games:
- # Adding new players
- names = [item for sublist in game.result for item in sublist] # Flatten the 2-dim list
- while len(art_battles_with_i_players) <= len(names):
- art_battles_with_i_players.append(Combo())
- art_battles_with_i_players[len(names)].append(game.date)
- for name in names:
- if name not in player_dictionary:
- player_dictionary[name] = Participant(name)
- # Getting list of ratings
- ratings = map(lambda k: player_dictionary[k].rating, names)
- # New ratings counting
- new_ratings = []
- rating_sum = 0
- dispersion_sum = 0
- volatility_sum = 0
- for rating in ratings:
- rating_sum += rating.mu
- dispersion_sum += rating.phi**2
- volatility_sum += rating.sigma
- opponent_number = 1.*(len(names)-1)
- log_dict = {}
- for i in xrange(len(ratings)):
- rating = ratings[i]
- alpha = 0
- defeated = 0.
- for block in game.result:
- beta = alpha + len(block)
- if i >= beta:
- defeated += LOSS * len(block)
- elif i < alpha:
- defeated += WIN * len(block)
- else:
- defeated += DRAW * (len(block)-1)
- alpha = beta
- mean_opp_rating = Rating((rating_sum-rating.mu)/opponent_number,
- math.sqrt((dispersion_sum-rating.phi**2)/opponent_number),
- (volatility_sum-rating.sigma)/opponent_number)
- new_ratings.append(glicko2.rate(rating, [[defeated/opponent_number, mean_opp_rating]]))
- log_dict[names[i]] = [defeated/opponent_number, mean_opp_rating]
- # Updating ratings
- changes = {} # Rating changes of participants of the current game
- for i in xrange(len(ratings)):
- ch = int(new_ratings[i].mu-player_dictionary[names[i]].rating.mu)
- changes[names[i]] = ("+" if ch > 0 else "") + str(ch) # Convert to string
- player_dictionary[names[i]].rating = new_ratings[i]
- # Updating game
- for rank, people in enumerate(game.result):
- for player in people:
- player_dictionary[player].update_game(date_of_last_game, game.date, rank)
- date_of_last_game = game.date
- # Rating changes output
- change_rating_file.write(unicode("\ngame " + game.date + "\n"))
- log_file.write(unicode("\ngame " + game.date + "\n"))
- ch_rate_table = []
- for name in names:
- ch_rate_table.append([name, int(player_dictionary[name].rating.mu), changes[name]])
- change_rating_file.write(table_to_string(ch_rate_table, hat=[u"Participant", u"Rate", u"Change"]))
- dynamic_rate_table = []
- for name in names:
- r = player_dictionary[name].rating
- dynamic_rate_table.append([name, r.mu, r.phi, r.sigma]+log_dict[name])
- log_file.write(table_to_string(dynamic_rate_table,
- hat=[u"Participant", u"Rate", u"RD", u"Volat", u"Result", u"MeanRate"]))
- change_rating_file.close()
- log_file.close()
- # Rating changes comment
- change_comment_file = io.open("comment.txt", "w")
- change_comment_file.write(u'<span class="spoiler"><span class="spoiler-title">'
- u'Рейтинги</span><span class="spoiler-body"><pre>')
- change_comment_file.write(table_to_string(ch_rate_table, hat=[u"Participant", u"Rate", u"Change"]))
- change_comment_file.write(u'</pre><span class="spoiler"><span class="spoiler-title">'
- u'Таблица</span><span class="spoiler-body"><pre>')
- output_table = []
- participation_combo_table = []
- winning_combo_table = []
- for key, value in player_dictionary.items():
- output_table.append([key, int(value.rating.mu), int(value.rating.phi), value.games, value.ranks])
- participation_combo_table.append([key, len(value.best_participation_combo), value.best_participation_combo])
- winning_combo_table.append([key, len(value.best_winning_combo), value.best_winning_combo])
- # Data debugger: print list of participants, sorted by alphabet for searching repetitions
- participant_file = io.open("artists.txt", "w")
- for name in sorted(player_dictionary.keys()):
- participant_file.write(unicode(name+"\n"))
- participant_file.close()
- # Print filtered rating table
- new_out_table = filter(lambda k: k[3] >= 5, output_table)
- print_table("confirmed_ratings.txt", new_out_table, 1)
- new_out_table = filter(lambda k: date_difference(date_of_last_game,
- player_dictionary[k[0]].date_of_last_game) < 200, output_table)
- print_table("filtered_ratings.txt", new_out_table, 1)
- print_table("combo_p.txt", participation_combo_table, 1, [u"Participant", u"Combo", u"Dates"])
- print_table("combo_w.txt", winning_combo_table, 1, [u"Participant", u"Combo", u"Dates"])
- change_comment_file.write(table_to_string(new_out_table, 1))
- change_comment_file.write(u'</pre></span></span></span></span>')
- change_comment_file.close()
- art_battles_with_i_players.reverse()
- abwipr_string = ""
- max_players = len(art_battles_with_i_players)-1
- for i, ab in enumerate(art_battles_with_i_players):
- if len(ab) > 0:
- abwipr_string += u"%i[%i]: %s\n" % ((max_players-i), len(ab), ab.reverse())
- try:
- post_file = io.open(post_template)
- post_string = u"".join(post_file.readlines())
- post_file.close()
- post_file = io.open("post.txt", "w")
- post_file.write(post_string % (current_date(), table_to_string(output_table, 1), "".join(initial_lines),
- len(player_dictionary.keys()), len(games),
- table_to_string(filter(lambda k: k[4] != [0, 0, 0], output_table), 4),
- table_to_string(filter(lambda k: k[3] >= 4, output_table), 3),
- table_to_string(filter(lambda k: k[1] >= 2, participation_combo_table), 1,
- [u"Participant", u"Combo", u"Dates"]),
- table_to_string(filter(lambda k: k[1] >= 2, winning_combo_table), 1,
- [u"Participant", u"Combo", u"Dates"]),
- abwipr_string[:-1]))
- post_file.close()
- except IOError:
- print 'Warning: file "%s" not found.' % post_template
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement