Advertisement
Guest User

Extracted queue worker from Duel Arena and did some unittest

a guest
Oct 21st, 2017
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 11.49 KB | None | 0 0
  1. # DuelArena will start automatically if at least 3 players
  2. # opted in (!duel or !d) to the queue.
  3. # DuelArena will be deactivated automatically if connected players
  4. # exceed the player_limit (default 5), or if there are only 2 players left, or
  5. # if too many players opted out.
  6.  
  7. import minqlx
  8. from hamcrest import *
  9. import unittest
  10.  
  11. MIN_ACTIVE_PLAYERS = 3  # with <3 connected and subscribed players we deactive DuelArena
  12. MAX_ACTIVE_PLAYERS = 5  # with >5 connected players we deactivate DuelArena
  13.  
  14.  
  15. class TestDuelArenaWorker(unittest.TestCase):
  16.     def setUp(self):
  17.         self.duelarena = DuelArenaWorker()
  18.  
  19.     def test_enqueue_players(self):
  20.         self.enqueue("Player 1", "Player 2", "Player 3")
  21.  
  22.         assert_that(self.duelarena.queue, contains("Player 3", "Player 2", "Player 1"))
  23.  
  24.     def test_nextPlayer(self):
  25.         self.enqueue("Player 1", "Player 2", "Player 3")
  26.  
  27.         newplayer = self.duelarena.playerLost("Player 4")
  28.  
  29.         assert_that(newplayer, "Player 1")
  30.  
  31.     def enqueue(self, *items):
  32.         for item in items:
  33.             self.duelarena.enqueuePlayer(item)
  34.  
  35.     def test_lost_player_gets_requeued(self):
  36.         self.enqueue("Player 1", "Player 2", "Player 3")
  37.  
  38.         self.duelarena.playerLost("Player 4")
  39.  
  40.         assert_that(self.duelarena.queue, contains("Player 4", "Player 3", "Player 2"))
  41.  
  42.     def test_removing_a_player(self):
  43.         self.enqueue("Player 1", "Player 2", "Player 3")
  44.  
  45.         self.duelarena.dequeuePlayer("Player 3")
  46.  
  47.         assert_that(self.duelarena.queue, contains("Player 2", "Player 1"))
  48.  
  49.     def test_removing_a_non_existing_player(self):
  50.         self.enqueue("Player 1", "Player 2", "Player 3")
  51.  
  52.         self.duelarena.dequeuePlayer("Player 4")
  53.  
  54.         assert_that(self.duelarena.queue, contains("Player 3", "Player 2", "Player 1"))
  55.  
  56.  
  57. class DuelArenaWorker():
  58.     def __init__(self):
  59.         self.queue = []
  60.  
  61.     def enqueuePlayer(self, player):
  62.         self.queue.insert(0, player)
  63.  
  64.     def dequeuePlayer(self, player):
  65.         try:
  66.             self.queue.remove(player)
  67.         except ValueError:
  68.             pass
  69.  
  70.     def nextPlayer(self):
  71.         return self.queue.pop()
  72.  
  73.     def playerLost(self, player):
  74.         self.enqueuePlayer(player)
  75.         return self.nextPlayer()
  76.  
  77.  
  78. class duelarena(minqlx.Plugin):
  79.     def __init__(self):
  80.  
  81.         self.initialize_hooks_and_commands()
  82.         self.duelarena = DuelArenaWorker()
  83.  
  84.         self.duelmode = False  # global gametype switch
  85.         self.initduel = False  # initial player setup switch
  86.         self.psub = []  # steam_ids of players subscribed to DuelArena
  87.         self.red_score = 0  # previous score to determine winner team
  88.         self.blue_score = 0  # previous score to determine winner team
  89.         self.player_red = 0  # force spec exception for this player
  90.         self.player_blue = 0  # force spec exception for this player
  91.  
  92.     def initialize_hooks_and_commands(self):
  93.         self.add_hook("team_switch", self.handle_switch)
  94.         self.add_hook("player_disconnect", self.handle_player_disco)
  95.         self.add_hook("player_connect", self.handle_player_connect)
  96.         self.add_hook("game_start", self.handle_game_start)
  97.         self.add_hook("round_end", self.handle_round_end)
  98.         self.add_hook("game_end", self.handle_game_end)
  99.         self.add_command(("duel", "d"), self.cmd_duel)
  100.         self.add_command(("queue", "q"), self.cmd_printqueue)
  101.  
  102.     def handle_switch(self, player, old, new):
  103.         # don't allow anyone to join manually when DuelArena is activ
  104.         if self.game and new in ['red', 'blue'] and self.duelmode and self.game.state != "warmup":
  105.             # except we auto join him
  106.             if player == self.player_red or player == self.player_blue:
  107.                 return
  108.             player.put("spectator")
  109.             player.tell(
  110.                 "^3Server is in DuelArena mode. You will automatically join. Type ^2!duel ^3or ^2!d ^3to enter or to leave the queue")
  111.  
  112.     @minqlx.delay(3)
  113.     def handle_player_connect(self, player):
  114.  
  115.         playercount = self.connected_players()
  116.  
  117.         if playercount == 3 or playercount == 5 and not self.duelmode:
  118.             self.center_print("^3Type ^2!d ^3for DuelArena!")
  119.             self.msg("^3Type ^2!d ^3for DuelArena!")
  120.  
  121.         self.duelarena_switch()
  122.  
  123.         dbg = "echo DUELARENA DBG: playercount {} duelmode: {} psub: {}"
  124.         minqlx.console_command(dbg.format(playercount, self.duelmode, len(self.psub)))
  125.  
  126.     def handle_player_disco(self, player, reason):
  127.  
  128.         playercount = self.connected_players()
  129.  
  130.         if playercount == 3 or playercount == 5 and not self.duelmode:
  131.             self.center_print("^3Type ^2!d ^3for DuelArena!")
  132.             self.msg("^3Type ^2!d ^3for DuelArena!")
  133.  
  134.         self.duelarena_switch()
  135.  
  136.         self.duelarena.dequeuePlayer(player)
  137.  
  138.     @minqlx.delay(3)
  139.     def handle_game_start(self, game):
  140.  
  141.         if self.duelmode:
  142.             self.red_score = 0
  143.             self.blue_score = 0
  144.             self.init_duel()
  145.  
  146.     def handle_game_end(self, *args, **kwargs):
  147.  
  148.         # put both players back to the queue, winner first position, loser last position
  149.         if self.duelmode:
  150.  
  151.             teams = self.teams()
  152.  
  153.             if self.game.blue_score > self.blue_score:
  154.                 empty_team = "red"
  155.                 winner_team = "blue"
  156.             elif self.game.red_score > self.red_score:
  157.                 empty_team = "blue"
  158.                 winner_team = "red"
  159.             else:
  160.                 return
  161.  
  162.             for _p in teams[empty_team]:
  163.                 loser = _p
  164.  
  165.             self.duelarena.playerLost(loser)
  166.  
  167.     @minqlx.delay(1.5)
  168.     def handle_round_end(self, round_number):
  169.  
  170.         # Not in CA? Do nothing
  171.         if (self.game is None) or (self.game.type_short != "ca"): return
  172.  
  173.         if self.initduel:
  174.             self.init_duel()
  175.             return
  176.  
  177.         if self.duelmode:
  178.  
  179.             if self.game.blue_score > self.blue_score:
  180.                 empty_team = "red"
  181.             elif self.game.red_score > self.red_score:
  182.                 empty_team = "blue"
  183.             else:
  184.                 return  # Draw or first round? Do nothing
  185.  
  186.             self.blue_score = self.game.blue_score
  187.             self.red_score = self.game.red_score
  188.  
  189.             next_player = self.duelarena.nextPlayer()
  190.  
  191.             cancelduel = True
  192.  
  193.             teams = self.teams()
  194.  
  195.             for _p in teams[empty_team]:
  196.                 loser = _p
  197.  
  198.             for _p in teams['spectator']:
  199.                 if _p.steam_id == next_player.steam_id:
  200.                     self.player_blue = _p
  201.                     self.player_red = _p
  202.                     _p.put(empty_team)
  203.                     self.queue.insert(0, loser.steam_id)
  204.                     loser.put("spectator")
  205.                     cancelduel = False
  206.  
  207.             if cancelduel: self.duelmode = False  # no specs found? Deactivate DuelArena
  208.  
  209.     def cmd_duel(self, player, msg, channel):
  210.  
  211.         if self.connected_players() > MAX_ACTIVE_PLAYERS:
  212.             player.tell(
  213.                 "^3!duel command not available with {} or more players connected".format(MAX_ACTIVE_PLAYERS + 1))
  214.             return
  215.  
  216.         if player.steam_id not in self.psub:
  217.             if player.steam_id not in self.queue: self.queue.insert(0, player.steam_id)
  218.             self.psub.append(player.steam_id)
  219.             indicator = len(self.queue) - self.queue.index(player.steam_id)
  220.             player.tell(
  221.                 "^3You successfully opted in to the DuelArena queue. Type ^2!queue ^3or ^2!q ^3to get your queue position. Your position is ^2{}.".format(
  222.                     indicator))
  223.             countdown = 3 - len(self.psub)
  224.             if not self.duelmode and countdown > 0:
  225.                 self.msg(
  226.                     "{} ^3entered the DuelArena queue. ^1{} ^3more players needed to start DuelArena. Type ^2!duel ^3or ^2!d ^3to enter DuelArena queue.".format(
  227.                         player.name, countdown))
  228.             else:
  229.                 self.msg(
  230.                     "{} ^3entered the DuelArena. Type ^2!duel ^3or ^2!d ^3to join DuelArena queue.".format(player.name))
  231.         elif player.steam_id in self.psub:
  232.             if player.steam_id in self.queue: self.queue.remove(player.steam_id)
  233.             self.psub.remove(player.steam_id)
  234.             self.msg("{} ^3left DuelArena.".format(player.name))
  235.  
  236.         self.duelarena_switch()
  237.  
  238.     def cmd_printqueue(self, player, msg, channel):
  239.  
  240.         if self.connected_players() > MAX_ACTIVE_PLAYERS:
  241.             player.tell(
  242.                 "^3!queue command not available with {} or more players connected".format(MAX_ACTIVE_PLAYERS + 1))
  243.             return
  244.  
  245.         qstring = ""
  246.  
  247.         for s_id in self.queue:
  248.             p = self.player(s_id)
  249.             indicator = len(self.queue) - self.queue.index(s_id)
  250.             if indicator == 1:
  251.                 place = "1st"
  252.             elif indicator == 2:
  253.                 place = "2nd"
  254.             elif indicator == 3:
  255.                 place = "3rd"
  256.             elif indicator == 4:
  257.                 place = "4th"
  258.             elif indicator == 5:
  259.                 place = "5th"
  260.             qstring = qstring + ("^3{}^7: ^2{} ".format(place, p.name))
  261.  
  262.         if qstring != "":
  263.             self.msg(qstring)
  264.         else:
  265.             self.msg("^3There's no one in the queue yet. Type ^2!d ^3or ^2!duel ^3to enter the queue.")
  266.  
  267.     def init_duel(self):
  268.  
  269.         self.checklists()
  270.  
  271.         teams = self.teams()
  272.  
  273.         self.player_red = self.player(self.queue.pop())
  274.         self.player_blue = self.player(self.queue.pop())
  275.         self.player_red.put("red")
  276.         self.player_blue.put("blue")
  277.  
  278.         # put all other players to spec
  279.         for _p in teams['red'] + teams['blue']:
  280.             if _p != self.player_red and _p != self.player_blue:
  281.                 _p.put("spectator")
  282.  
  283.         self.initduel = False
  284.  
  285.     def duelarena_switch(self):
  286.  
  287.         self.checklists()
  288.  
  289.         if self.duelmode:
  290.             player_count = self.connected_players()
  291.             if player_count > MAX_ACTIVE_PLAYERS or player_count < MIN_ACTIVE_PLAYERS or len(
  292.                     self.psub) < MIN_ACTIVE_PLAYERS:
  293.                 self.duelmode = False
  294.                 self.msg("^3DuelArena has been deactivated! You are free to join.")
  295.         elif not self.duelmode:
  296.             if len(self.psub) >= MIN_ACTIVE_PLAYERS and len(self.psub) <= MAX_ACTIVE_PLAYERS:
  297.                 self.duelmode = True
  298.                 self.msg("^3DuelArena activated! Type ^2!d ^3to join or leave the queue.")
  299.                 self.center_print("^3DuelArena activated! Type ^2!d ^3to join.")
  300.                 if self.game and self.game.state == "in_progress":
  301.                     self.initduel = True
  302.                     self.red_score = self.game.blue_score
  303.                     self.red_score = self.game.red_score
  304.  
  305.     def checklists(self):
  306.  
  307.         for sid in self.queue:
  308.             if not self.player(sid): self.queue.remove(sid)
  309.  
  310.         for sid in self.psub:
  311.             if not self.player(sid): self.psub.remove(sid)
  312.  
  313.         for p in self.players():
  314.             if p.ping >= 990:
  315.                 if p.steam_id in self.queue: self.queue.remove(p.steam_id)
  316.                 if p.steam_id in self.psub: self.psub.remove(p.steam_id)
  317.  
  318.  
  319.                 ## Helper functions
  320.  
  321.     def connected_players(self):
  322.         teams = self.teams()
  323.         players = int(len(teams["red"] + teams["blue"] + teams["spectator"] + teams["free"]))
  324.         return players
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement