Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import collections
- import heapq
- import random
- import time
- import math
- from collections import deque
- import numpy as np
- import functions as fun
- # max. amount of tries we iterate until we abort the search
- MAX_RUNS = float('inf')
- # max. time after we until we abort the search (in seconds)
- TIME_LIMIT = float('inf')
- # used for backtrace of bi-directional A*
- BY_START = 1
- BY_END = 2
- class Player:
- def __init__(self):
- self.id = "Id1"
- self.name = "Grzegorz Wator"
- self.team = "Pink"
- self.cols_count = 0
- self.rows_count = 0
- self.x = 0
- self.y = 0
- self.tank_direction = fun.S
- self.gun_direction = fun.S
- self.map = ""
- # Executed at the beginning of the game.
- def set_init_values(self, player_id, team_name, cols_count, rows_count):
- self.id = player_id
- self.team = team_name
- self.cols_count = cols_count
- self.rows_count = rows_count
- # Executed at the beginning to get user name.
- def get_name(self):
- return self.name
- # Executed after killing other player.
- def set_information_about_killing(self):
- pass
- # Executed after tank death.
- def set_information_about_death(self):
- pass
- # Executed when tank had fault. (TANK FAULT RATE is 10% by default)
- def set_information_about_tank_fault(self):
- pass
- # Executed when move was wrong, for example when player wants to move out of map.
- def set_information_about_move_fault(self):
- pass
- # Executed on the end of the game.
- def set_information_about_game_end(self):
- pass
- # Executed at the beginning and after every move. Round kills is list of tuples (killer_name, victim_name).
- def set_map_and_position(self, map, x, y, tank_direction, gun_direction, round, round_kills):
- self.map = map
- self.x = x
- self.y = y
- self.tank_direction = tank_direction
- self.gun_direction = gun_direction
- # Executed before function get_next_move at the beginning of every round.
- def get_radio_message(self):
- return None
- # Executed at the beginning of every round to get player move.
- def get_next_move(self, radio_messages):
- # self.getRouteToEnemy("Aa")
- players = fun.get_players_from_map(self.map, self.cols_count, self.rows_count)
- if fun.can_shoot_someone(self.map, self.cols_count, self.rows_count, self.x, self.y, self.gun_direction):
- for player in players:
- if fun.is_the_player_on_line_of_fire(self.map, self.cols_count, self.rows_count, self.x, self.y,
- self.gun_direction, player[0]):
- return (fun.SHOOT, player.x, player.y)
- # No, lets check if I can kill someone after turning the gun.
- # First check turning gun to left.
- new_gun_direction = fun.get_gun_direction_after_gun_rotation(self.gun_direction,
- fun.TURN_GUN_TO_LEFT)
- if fun.can_shoot_someone(self.map, self.cols_count, self.rows_count, self.x, self.y,
- new_gun_direction):
- return (fun.TURN_GUN_TO_LEFT, 0, 0)
- # Now check turning gun to right.
- new_gun_direction = fun.get_gun_direction_after_gun_rotation(self.gun_direction,
- fun.TURN_GUN_TO_RIGHT)
- if fun.can_shoot_someone(self.map, self.cols_count, self.rows_count, self.x, self.y,
- new_gun_direction):
- return (fun.TURN_GUN_TO_RIGHT, 0, 0)
- # find distances to all tanks
- distances_to_tanks = self.find_distances_to_tanks(players)
- minDistance = min(list(filter(lambda t: t[1] > 0, distances_to_tanks)), key=lambda t:t[1])
- next_squere = self.getRouteToEnemy(*fun.get_position_of_player(self.map, self.cols_count, self.rows_count, minDistance[0]))
- return self.determine_next_move_base_on_next_square(next_squere)
- # return (random.choice([fun.MOVE_FORWARD, fun.TURN_TANK_TO_LEFT]), 0, 0)
- def getRouteToEnemy(self, enemy_x_pos, enemy_y_pos):
- newMap = self.convertMapToZerosAndOnes()
- grid = Grid(matrix=newMap)
- start = grid.node(self.x, self.y)
- end = grid.node(enemy_x_pos, enemy_y_pos)
- finder = AStarFinder(diagonal_movement=DiagonalMovement.never)
- path, runs = finder.find_path(start, end, grid)
- return path[1]
- def find_distances_to_tanks(self, players):
- return list(map(self.distance_to_tank, players))
- def convertMapToZerosAndOnes(self):
- return list(map(self.mapToAStar, self.map))
- def convertMapToBFSType(self):
- return list(map(self.mapToBFS, self.map))
- def mapToBFS(self, args):
- ALLOWED_CHARS = ["_"]
- return [0 if x in ALLOWED_CHARS or isinstance(x, tuple) else 1 for x in args]
- def mapToAStar(self, args):
- ALLOWED_CHARS = ["_"]
- return [1 if x in ALLOWED_CHARS or isinstance(x, tuple) else 0 for x in args]
- def distance_to_tank(self, tank):
- return tank[2], solveBFS(self.convertMapToBFSType(), (self.y, self.x), (tank[1], tank[0]))
- def determine_next_move_base_on_next_square(self, next_sqare):
- delta_x, delta_y = next_sqare[0] - self.x, next_sqare[1] - self.y
- if delta_x > 0 and self.tank_direction == fun.E:
- return fun.MOVE_FORWARD, 0, 0
- elif delta_x > 0 and self.tank_direction == fun.W:
- return fun.MOVE_BACK, 0, 0
- elif delta_x < 0 and self.tank_direction == fun.E:
- return fun.MOVE_BACK, 0, 0
- elif delta_x < 0 and self.tank_direction == fun.W:
- return fun.MOVE_FORWARD, 0, 0
- elif delta_x != 0 and self.tank_direction == fun.N or self.tank_direction == fun.S:
- return fun.TURN_TANK_TO_LEFT, 0, 0
- elif delta_y > 0 and self.tank_direction == fun.N:
- return fun.MOVE_FORWARD, 0, 0
- elif delta_y > 0 and self.tank_direction == fun.S:
- return fun.MOVE_BACK, 0, 0
- elif delta_y < 0 and self.tank_direction == fun.N:
- return fun.MOVE_BACK, 0, 0
- elif delta_y < 0 and self.tank_direction == fun.S:
- return fun.MOVE_FORWARD, 0, 0
- elif delta_y != 0 and self.tank_direction == fun.E or self.tank_direction == fun.W:
- return fun.TURN_TANK_TO_LEFT, 0, 0
- return (random.choice([fun.MOVE_FORWARD, fun.TURN_TANK_TO_LEFT]), 0, 0)
- delta_x = [-1, 1, 0, 0]
- delta_y = [0, 0, 1, -1]
- def valid(map, x, y):
- if x < 0 or x >= len(map) or y < 0 or y >= len(map[x]):
- return False
- return (map[x][y] != 1)
- def solveBFS(map, start, end):
- Q = deque([start])
- dist = {start: 0}
- while len(Q):
- curPoint = Q.popleft()
- curDist = dist[curPoint]
- if curPoint == end:
- return curDist
- for dx, dy in zip(delta_x, delta_y):
- nextPoint = (curPoint[0] + dx, curPoint[1] + dy)
- if not valid(map, nextPoint[0], nextPoint[1]) or nextPoint in dist.keys():
- continue
- dist[nextPoint] = curDist + 1
- Q.append(nextPoint)
- USE_NUMPY = False
- class DiagonalMovement:
- always = 1
- never = 2
- if_at_most_one_obstacle = 3
- only_when_no_obstacle = 4
- class Node(object):
- def __init__(self, x=0, y=0, walkable=True, weight=1):
- self.x = x
- self.y = y
- self.walkable = walkable
- self.weight = weight
- self.cleanup()
- def __lt__(self, other):
- return self.f < other.f
- def cleanup(self):
- self.h = 0.0
- self.g = 0.0
- self.f = 0.0
- self.opened = 0
- self.closed = False
- self.parent = None
- self.retain_count = 0
- self.tested = False
- def build_nodes(width, height, matrix=None, inverse=False):
- nodes = []
- use_matrix = (isinstance(matrix, (tuple, list))) or \
- (USE_NUMPY and isinstance(matrix, np.ndarray) and matrix.size > 0)
- for y in range(height):
- nodes.append([])
- for x in range(width):
- # 1, '1', True will be walkable
- # while others will be obstacles
- # if inverse is False, otherwise
- # it changes
- weight = int(matrix[y][x]) if use_matrix else 1
- walkable = weight <= 0 if inverse else weight >= 1
- nodes[y].append(Node(x=x, y=y, walkable=walkable, weight=weight))
- return nodes
- class Grid(object):
- def __init__(self, width=0, height=0, matrix=None, inverse=False):
- self.width = width
- self.height = height
- if isinstance(matrix, (tuple, list)) or (
- USE_NUMPY and isinstance(matrix, np.ndarray) and
- matrix.size > 0):
- self.height = len(matrix)
- self.width = self.width = len(matrix[0]) if self.height > 0 else 0
- if self.width > 0 and self.height > 0:
- self.nodes = build_nodes(self.width, self.height, matrix, inverse)
- else:
- self.nodes = [[]]
- def node(self, x, y):
- return self.nodes[y][x]
- def inside(self, x, y):
- return 0 <= x < self.width and 0 <= y < self.height
- def walkable(self, x, y):
- return self.inside(x, y) and self.nodes[y][x].walkable
- def neighbors(self, node, diagonal_movement=DiagonalMovement.never):
- x = node.x
- y = node.y
- neighbors = []
- s0 = d0 = s1 = d1 = s2 = d2 = s3 = d3 = False
- # ↑
- if self.walkable(x, y - 1):
- neighbors.append(self.nodes[y - 1][x])
- s0 = True
- # →
- if self.walkable(x + 1, y):
- neighbors.append(self.nodes[y][x + 1])
- s1 = True
- # ↓
- if self.walkable(x, y + 1):
- neighbors.append(self.nodes[y + 1][x])
- s2 = True
- # ←
- if self.walkable(x - 1, y):
- neighbors.append(self.nodes[y][x - 1])
- s3 = True
- if diagonal_movement == DiagonalMovement.never:
- return neighbors
- if diagonal_movement == DiagonalMovement.only_when_no_obstacle:
- d0 = s3 and s0
- d1 = s0 and s1
- d2 = s1 and s2
- d3 = s2 and s3
- elif diagonal_movement == DiagonalMovement.if_at_most_one_obstacle:
- d0 = s3 or s0
- d1 = s0 or s1
- d2 = s1 or s2
- d3 = s2 or s3
- elif diagonal_movement == DiagonalMovement.always:
- d0 = d1 = d2 = d3 = True
- # ↖
- if d0 and self.walkable(x - 1, y - 1):
- neighbors.append(self.nodes[y - 1][x - 1])
- # ↗
- if d1 and self.walkable(x + 1, y - 1):
- neighbors.append(self.nodes[y - 1][x + 1])
- # ↘
- if d2 and self.walkable(x + 1, y + 1):
- neighbors.append(self.nodes[y + 1][x + 1])
- # ↙
- if d3 and self.walkable(x - 1, y + 1):
- neighbors.append(self.nodes[y + 1][x - 1])
- return neighbors
- def cleanup(self):
- for y_nodes in self.nodes:
- for node in y_nodes:
- node.cleanup()
- def grid_str(self, path=None, start=None, end=None,
- border=True, start_chr='s', end_chr='e',
- path_chr='x', empty_chr=' ', block_chr='#',
- show_weight=False):
- data = ''
- if border:
- data = '+{}+'.format('-'*len(self.nodes[0]))
- for y in range(len(self.nodes)):
- line = ''
- for x in range(len(self.nodes[y])):
- node = self.nodes[y][x]
- if node == start:
- line += start_chr
- elif node == end:
- line += end_chr
- elif path and ((node.x, node.y) in path or node in path):
- line += path_chr
- elif node.walkable:
- # empty field
- weight = str(node.weight) if node.weight < 10 else '+'
- line += weight if show_weight else empty_chr
- else:
- line += block_chr # blocked field
- if border:
- line = '|'+line+'|'
- if data:
- data += '\n'
- data += line
- if border:
- data += '\n+{}+'.format('-'*len(self.nodes[0]))
- return data
- class Finder(object):
- def __init__(self, heuristic=None, weight=1,
- diagonal_movement=DiagonalMovement.never,
- weighted=True,
- time_limit=TIME_LIMIT,
- max_runs=MAX_RUNS):
- self.time_limit = time_limit
- self.max_runs = max_runs
- self.weighted = weighted
- self.diagonal_movement = diagonal_movement
- self.weight = weight
- self.heuristic = heuristic
- def calc_cost(self, node_a, node_b):
- if node_b.x - node_a.x == 0 or node_b.y - node_a.y == 0:
- # direct neighbor - distance is 1
- ng = 1
- else:
- ng = SQRT2
- if self.weighted:
- ng *= node_b.weight
- return node_a.g + ng
- def apply_heuristic(self, node_a, node_b, heuristic=None):
- if not heuristic:
- heuristic = self.heuristic
- return heuristic(
- abs(node_a.x - node_b.x),
- abs(node_a.y - node_b.y))
- def find_neighbors(self, grid, node, diagonal_movement=None):
- if not diagonal_movement:
- diagonal_movement = self.diagonal_movement
- return grid.neighbors(node, diagonal_movement=diagonal_movement)
- def keep_running(self):
- if self.runs >= self.max_runs:
- raise ExecutionRunsException(
- '{} run into barrier of {} iterations without '
- 'finding the destination'.format(
- self.__class__.__name__, self.max_runs))
- if time.time() - self.start_time >= self.time_limit:
- raise ExecutionTimeException(
- '{} took longer than {} seconds, aborting!'.format(
- self.__class__.__name__, self.time_limit))
- def process_node(self, node, parent, end, open_list, open_value=True):
- ng = self.calc_cost(parent, node)
- if not node.opened or ng < node.g:
- node.g = ng
- node.h = node.h or \
- self.apply_heuristic(node, end) * self.weight
- node.f = node.g + node.h
- node.parent = parent
- if not node.opened:
- heapq.heappush(open_list, node)
- node.opened = open_value
- else:
- open_list.remove(node)
- heapq.heappush(open_list, node)
- def find_path(self, start, end, grid):
- self.start_time = time.time() # execution time limitation
- self.runs = 0 # count number of iterations
- start.opened = True
- open_list = [start]
- while len(open_list) > 0:
- self.runs += 1
- self.keep_running()
- path = self.check_neighbors(start, end, grid, open_list)
- if path:
- return path, self.runs
- return [], self.runs
- def manhatten(dx, dy):
- return dx + dy
- def octile(dx, dy):
- f = SQRT2 - 1
- if dx < dy:
- return f * dx + dy
- else:
- return f * dy + dx
- class AStarFinder(Finder):
- def __init__(self, heuristic=None, weight=1,
- diagonal_movement=DiagonalMovement.never,
- time_limit=TIME_LIMIT,
- max_runs=MAX_RUNS):
- super(AStarFinder, self).__init__(
- heuristic=heuristic,
- weight=weight,
- diagonal_movement=diagonal_movement,
- time_limit=time_limit,
- max_runs=max_runs)
- if not heuristic:
- if diagonal_movement == DiagonalMovement.never:
- self.heuristic = manhatten
- else:
- self.heuristic = octile
- def check_neighbors(self, start, end, grid, open_list,
- open_value=True, backtrace_by=None):
- node = heapq.nsmallest(1, open_list)[0]
- open_list.remove(node)
- node.closed = True
- if not backtrace_by and node == end:
- return backtrace(end)
- neighbors = self.find_neighbors(grid, node)
- for neighbor in neighbors:
- if neighbor.closed:
- continue
- if backtrace_by and neighbor.opened == backtrace_by:
- if backtrace_by == BY_END:
- return bi_backtrace(node, neighbor)
- else:
- return bi_backtrace(neighbor, node)
- self.process_node(neighbor, node, end, open_list, open_value)
- return None
- def find_path(self, start, end, grid):
- start.g = 0
- start.f = 0
- return super(AStarFinder, self).find_path(start, end, grid)
- class ExecutionTimeException(Exception):
- def __init__(self, message):
- super(ExecutionTimeException, self).__init__(message)
- class ExecutionRunsException(Exception):
- def __init__(self, message):
- super(ExecutionRunsException, self).__init__(message)
- SQRT2 = math.sqrt(2)
- def backtrace(node):
- path = [(node.x, node.y)]
- while node.parent:
- node = node.parent
- path.append((node.x, node.y))
- path.reverse()
- return path
- def bi_backtrace(node_a, node_b):
- path_a = backtrace(node_a)
- path_b = backtrace(node_b)
- path_b.reverse()
- return path_a + path_b
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement