Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #! /usr/bin/python3
- import pygame
- import random
- import numpy as np
- import math
- import sys
- import cv2
- import tensorflow as tf
- from tensorflow.keras import layers, models
- from collections import deque
- # Game environment parameters
- render = 0
- ACTION_SPACE = 7 # Actions: [NOOP, MOVE_UP, MOVE_DOWN, MOVE_LEFT, MOVE_RIGHT, SHOOT_UP, SHOOT_DOWN]
- # Initialize Pygame
- pygame.init()
- # Screen dimensions
- WIDTH, HEIGHT = 400, 300
- IMG_SHAPE = (84, 84) # Desired image size for CNN input
- # Colors
- WHITE = (255, 255, 255)
- BLACK = (0, 0, 0)
- RED = (255, 0, 0)
- GREEN = (0, 255, 0)
- # Game variables
- clock = pygame.time.Clock()
- score = 0
- font = pygame.font.Font(None, 36)
- class FPSGameEnv:
- def __init__(self):
- pygame.init()
- self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
- pygame.display.set_caption("Simple FPS Game with DQN")
- self.clock = pygame.time.Clock()
- self.reset()
- def reset(self):
- # Reset player and enemies
- self.player_pos = [WIDTH // 2, HEIGHT // 2]
- self.player_speed = 5
- self.player_size = 25
- self.player_health = 5
- self.score = 0
- self.enemies = []
- self.bullets = []
- self.steps = 0
- # Enemy variables
- self.enemy_size = 20
- self.enemy_speed = 2
- self.spawn_rate = 25 # Increase to spawn enemies faster
- # Bullet variables
- self.bullet_size = 5
- self.bullet_speed = 10
- self.bullets = []
- return self.get_state()
- def spawn_enemy(self):
- x = random.randint(0, WIDTH - 40)
- y = random.randint(-100, -40)
- self.enemies.append([x, y])
- def draw_enemies(self):
- for enemy in self.enemies:
- pygame.draw.rect(self.screen, RED, (enemy[0], enemy[1], self.enemy_size, self.enemy_size))
- def handle_bullets(self):
- for bullet in self.bullets[:]:
- bullet[0] += bullet[2]
- bullet[1] += bullet[3]
- # Remove bullet if it goes off-screen
- if not (0 <= bullet[0] <= WIDTH and 0 <= bullet[1] <= HEIGHT):
- self.bullets.remove(bullet)
- def draw_bullets(self):
- for bullet in self.bullets:
- pygame.draw.circle(self.screen, GREEN, (bullet[0], bullet[1]), self.bullet_size)
- def update_score(self):
- score_text = font.render(f"Score: {self.score}", True, WHITE)
- health_text = font.render(f"Health: {self.player_health}", True, WHITE)
- self.screen.blit(score_text, (10, 10))
- self.screen.blit(health_text, (10, 40))
- def get_screen_image(self):
- # Capture the game screen as an image and resize to the desired input shape
- screen_data = pygame.surfarray.array3d(self.screen)
- gray_screen = cv2.cvtColor(screen_data, cv2.COLOR_RGB2GRAY) # Convert to grayscale
- resized_screen = cv2.resize(gray_screen, IMG_SHAPE) # Resize to IMG_SHAPE
- return np.expand_dims(resized_screen, axis=-1) # Add channel dimension
- def get_state(self):
- return np.array([self.player_pos[0], self.player_pos[1], self.score, self.player_health])
- def step(self, action):
- # Define actions based on action ID
- if action == 1 and self.player_pos[1] > 0:
- self.player_pos[1] -= 5
- elif action == 2 and self.player_pos[1] < HEIGHT:
- self.player_pos[1] += 5
- elif action == 3 and self.player_pos[0] > 0:
- self.player_pos[0] -= 5
- elif action == 4 and self.player_pos[0] < WIDTH:
- self.player_pos[0] += 5
- elif action == 5:
- # Shoot bullet towards mouse position
- mouse_x = random.randint(0,WIDTH)
- mouse_y = random.randint(self.player_pos[1],HEIGHT)
- dx, dy = mouse_x - self.player_pos[0], mouse_y - self.player_pos[1]
- dist = math.hypot(dx, dy)
- dist = dist + 1.e-6
- dx, dy = dx / dist, dy / dist
- self.bullets.append([self.player_pos[0], self.player_pos[1], dx * self.bullet_speed, dy * self.bullet_speed])
- elif action == 6:
- # Shoot bullet towards mouse position
- mouse_x = random.randint(0,WIDTH)
- mouse_y = random.randint(0,self.player_pos[1])
- dx, dy = mouse_x - self.player_pos[0], mouse_y - self.player_pos[1]
- dist = math.hypot(dx, dy)
- dist = dist + 1.e-6
- dx, dy = dx / dist, dy / dist
- self.bullets.append([self.player_pos[0], self.player_pos[1], dx * self.bullet_speed, dy * self.bullet_speed])
- # Update game elements
- reward, done = self.update_game()
- return self.get_state(), reward, done
- def detect_collision(self, obj1, obj2, size1, size2):
- dx = obj1[0] - obj2[0]
- dy = obj1[1] - obj2[1]
- distance = math.sqrt(dx ** 2 + dy ** 2)
- return distance < (size1 + size2) / 2
- def update_game(self):
- # Spawn enemies occasionally
- if random.randint(1, 100) < 10:
- self.spawn_enemy()
- # Move enemies down
- for enemy in self.enemies:
- enemy[1] += self.enemy_speed
- if self.detect_collision(enemy, self.player_pos, 40, 50):
- self.player_health -= 1
- self.enemies.remove(enemy)
- if self.player_health <= 0:
- return -100, True # End game if health is depleted
- # Handle Bullets
- self.handle_bullets()
- # Collision detection
- for enemy in self.enemies[:]:
- if self.detect_collision(enemy, self.player_pos, self.enemy_size, self.player_size):
- self.enemies.remove(enemy)
- self.player_health -= 1
- if self.player_health <= 0:
- print("Game Over")
- self.score += -100
- else:
- for bullet in self.bullets[:]:
- if self.detect_collision(bullet, enemy, self.bullet_size, self.enemy_size):
- self.bullets.remove(bullet)
- self.enemies.remove(enemy)
- self.score += 2
- break
- # Reward for surviving and score increment
- self.score += 1
- return self.score, False
- def close(self):
- pygame.quit()
- class DQNAgent:
- def __init__(self, state_size, action_size):
- self.state_size = state_size
- self.action_size = action_size
- self.memory = deque(maxlen=2000)
- self.gamma = 0.95 # Discount factor
- self.epsilon = 1.0 # Exploration rate
- self.epsilon_min = 0.01
- self.epsilon_decay = 0.995
- self.learning_rate = 0.001
- #self.model = self._build_model()
- self.discount_factor = 0.99
- self.exploration_rate = 1.0
- self.exploration_decay = 0.995
- self.qvalues = {}
- def __hash(self, state, action):
- return hash((state.tobytes(), action))
- def get_q_value(self, state, action):
- # Return the Q-value for a given state-action pair; default to 0 if not present
- state = np.reshape(state, [1, self.state_size])
- #return self.qvalues.get((tuple(state), action), 0.0)
- return self.qvalues.get(self.__hash(state, action), 0.0)
- def update_q_value(self, state, action, reward, next_state):
- # Find the maximum Q-value for the next state
- max_next_q = max([self.get_q_value(next_state, a) for a in range(self.action_size)], default=0)
- # Q-learning formula to update Q-value of current state-action pair
- current_q = self.get_q_value(state, action)
- new_q = current_q + self.learning_rate * (reward + self.discount_factor * max_next_q - current_q)
- # Update Q-value in the dictionary
- #self.qvalues[(tuple(state), action)] = new_q
- self.qvalues[self.__hash(state, action)] = new_q
- def get_best_action(self, state):
- # Retrieve the action with the highest Q-value for the current state
- q_values = [self.get_q_value(state, action) for action in range(self.action_size)]
- max_q = max(q_values)
- # Return the action with the maximum Q-value (break ties randomly)
- best_actions = [action for action, q in enumerate(q_values) if q == max_q]
- return random.choice(best_actions)
- def choose_action(self, state):
- # Epsilon-greedy policy to select action
- if random.random() < self.exploration_rate:
- return random.randint(0, self.action_size - 1) # Random action
- else:
- return self.get_best_action(state)
- def decay_exploration(self):
- # Decay the exploration rate
- self.exploration_rate = max(0.01, self.exploration_rate * self.exploration_decay)
- #def _build_model(self):
- # # Create a neural network using TensorFlow and Keras
- # model = models.Sequential([
- # layers.Dense(24, input_dim=self.state_size, activation='relu'),
- # layers.Dense(24, activation='relu'),
- # layers.Dense(self.action_size, activation='linear')
- # ])
- # #model = keras.Sequential(
- # # [
- # # keras.Input(shape=(WIDTH, HEIGHT, 3)),
- # # layers.Conv2D(32, 5, strides=2, activation="relu"),
- # # layers.Conv2D(32, 3, activation="relu"),
- # # layers.Dense(self.action_size, activation='linear')
- # # ]
- # #)
- # model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate),
- # loss='mse')
- # model.summary()
- # return model
- def act(self, state):
- state = np.reshape(state, [1, self.state_size])
- # Epsilon-greedy action selection
- return self.choose_action(state)
- def remember(self, state, action, reward, next_state, done):
- # Store experience in memory
- self.memory.append((state, action, reward, next_state, done))
- def replay(self, batch_size):
- # Train the network using experience replay
- if len(self.memory) < batch_size:
- return
- minibatch = random.sample(self.memory, batch_size)
- for state, action, reward, next_state, done in minibatch:
- target = reward
- if not done:
- next_state = np.reshape(next_state, [1, self.state_size])
- #target = (reward + self.gamma *
- # np.amax(self.model.predict(next_state)[0]))
- #target_f = self.model.predict(np.reshape(state, [1, self.state_size]))
- target_f[0][action] = target
- #self.model.fit(np.reshape(state, [1, self.state_size]), target_f, epochs=1, verbose=0)
- if self.epsilon > self.epsilon_min:
- self.epsilon *= self.epsilon_decay
- # Instantiate the environment and agent
- env = FPSGameEnv()
- agent = DQNAgent(state_size=4, action_size=ACTION_SPACE)
- batch_size = 32
- episodes = 5000
- for e in range(episodes):
- env.screen.fill(BLACK)
- state = env.reset()
- state = np.reshape(state, [1, 4])
- agent.decay_exploration()
- done = False
- while (not done):
- action = agent.act(state)
- next_state, reward, done = env.step(action)
- next_state = np.reshape(next_state, [1, 4])
- agent.update_q_value(state, action, reward, next_state)
- agent.remember(state, action, reward, next_state, done)
- state = next_state
- if done:
- print(f"episode: {e}/{episodes}, score: {env.score}, e: {agent.epsilon:.2}")
- break
- #if len(agent.memory) > batch_size:
- # agent.replay(batch_size)
- # Draw everything
- if render:
- env.screen.fill(BLACK)
- pygame.draw.rect(env.screen, WHITE, (env.player_pos[0] - env.player_size//2, env.player_pos[1] - env.player_size//2, env.player_size, env.player_size))
- env.draw_enemies()
- env.draw_bullets()
- env.update_score()
- pygame.display.flip()
- #clock.tick(30)
- env.close()
- #print (qvalues)
- #agent.model.save("fps_game.h5")
Advertisement
Add Comment
Please, Sign In to add comment