Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from ple.games.flappybird import FlappyBird
- from ple import PLE
- import random
- import math
- import os
- import pickle
- import matplotlib.pyplot as plt
- import numpy as np
- import time
- class MCAgent:
- def __init__(self, gamma=1, eps=0.1, alpha=0.1):
- self.Q = {}
- self.G = 0
- self.pol = {}
- self.scores = []
- self.frames = []
- self.sar_triple = []
- self.gamma = gamma
- self.eps = eps
- self.alpha = alpha
- self.returns = {}
- for i in range(15):
- for j in range(15):
- for k in range(15):
- for l in range(15):
- state = (i, j, k, l)
- self.Q[state, 1] = 0
- self.Q[state, 0] = 0
- self.pol[state] = (1 - self.eps, 0)
- self.returns[state, 1] = []
- self.returns[state, 0] = []
- def reward_values(self, a, b, c):
- """ returns the reward values used for training
- Note: These are only the rewards used for training.
- The rewards used for evaluating the agent will always be
- 1 for passing through each pipe and 0 for all other state
- transitions.
- """
- return {"positive": a, "tick": b, "loss": c}
- def observe(self, s1, a, r, s2, end):
- """ this function is called during training on each step of the game where
- the state transition is going from state s1 with action a to state s2 and
- yields the reward r. If s2 is a terminal state, end==True, otherwise end==False.
- Unless a terminal state was reached, two subsequent calls to observe will be for
- subsequent steps in the same episode. That is, s1 in the second call will be s2
- from the first call.
- """
- if end:
- return
- self.returns[s1, a].append(r)
- # print("sum", self.returns[s1, a], "len", len(self.returns[s1, a]) )
- self.Q[s1, a] = sum(self.returns[s1, a])/len(self.returns[s1, a])
- argmax = 0 if self.Q[s1, 0] > self.Q[s1, 1] else 1
- self.pol[s1] = ((1 - self.eps) + (self.eps / 2), argmax)
- print("r: ", r, "Q", self.Q[s1, a], "pol", self.pol[s1][1])
- def state_translate(self, state):
- #Discretization of environment
- if state["player_vel"] < -8:
- state["player_vel"] = -8
- if state["next_pipe_top_y"] < 0:
- state["next_pipe_top_y"] = 0
- if state["next_pipe_dist_to_player"] < 0:
- state["next_pipe_dist_to_player"] = 0
- if state["player_y"]:
- state["player_y"] = 0
- player_y = math.floor((state["player_y"] * (15/513)))
- next_pipe_top_y = math.floor(state["next_pipe_top_y"]*(15/513))
- next_pipe_dist_to_player = math.floor(
- state["next_pipe_dist_to_player"]*(15/310))
- player_vel = math.floor((state["player_vel"] + 8) * (15/19))
- state = (player_y, next_pipe_top_y,
- next_pipe_dist_to_player, player_vel)
- return state
- def training_policy(self, state):
- """ Returns the index of the action that should be done in state while training the agent.
- Possible actions in Flappy Bird are 0 (flap the wing) or 1 (do nothing).
- training_policy is called once per frame in the game while training
- """
- rand = random.uniform(0, 1)
- if rand < self.pol[self.state_translate(state)][0]:
- return self.pol[self.state_translate(state)][1]
- else:
- return random.randint(0, 1)
- def writeToFile(self, file):
- if os.path.exists(file):
- os.remove(file)
- with open(file, 'wb') as f:
- pickle.dump(self.pol, f)
- def readFromFile(self, file):
- with open(file, 'rb') as f:
- self.pol = pickle.loads(f.read())
- def policy(self, state):
- """ Returns the index of the action that should be done in state when training is completed.
- Possible actions in Flappy Bird are 0 (flap the wing) or 1 (do nothing).
- policy is called once per frame in the game (30 times per second in real-time)
- and needs to be sufficiently fast to not slow down the game.
- """
- return self.pol[self.state_translate(state)][1]
- def run_game(nb_episodes, agent, a, b, c, train=True):
- """ Runs nb_episodes episodes of the game with agent picking the moves.
- An episode of FlappyBird ends with the bird crashing into a pipe or going off screen.
- """
- reward_values = agent.reward_values(a, b, c)
- env = PLE(FlappyBird(), fps=30, display_screen=(not train), force_fps=train, rng=None,
- reward_values = reward_values)
- env.init()
- oldState = agent.state_translate(env.game.getGameState())
- score = 0
- frame = 0
- count = 0
- while nb_episodes > 0:
- #Training or testing
- frame += 1
- if train:
- action = agent.training_policy(env.game.getGameState())
- reward = env.act(env.getActionSet()[action])
- newState = agent.state_translate(env.game.getGameState())
- agent.sar_triple.append((oldState, action, reward))
- oldState = newState
- else:
- action = agent.policy(env.game.getGameState())
- reward = env.act(env.getActionSet()[action])
- score += reward
- # reset the environment if the game is over
- if env.game_over():
- if train:
- n = 0 #Loop counter
- agent.sar_triple.reverse() #Iterate in reverse order
- old_sar = (0, 0, 0) #First loop will be terminal state
- for sar in agent.sar_triple:
- agent.G += sar[2]
- end = True if n == 0 else False #We want to look at 2 states at a time
- n += 1
- agent.observe(old_sar[0], old_sar[1], agent.G * agent.gamma**n, sar[0], end)
- old_sar = sar
- agent.scores.append(score)
- agent.frames.append(frame)
- env.reset_game()
- nb_episodes -= 1
- agent.sar_triple = []
- score = 0
- count += 1
- agent.G = 0
- print("Iteration ", count)
- def run():
- agent = MCAgent(0.995, 0.1, 0.1)
- #reward structure
- a = 1.0
- b = 0.0
- c = -5.0
- run_game(5000, agent, a, b, c)
- #See training results
- plt.plot(agent.frames, agent.scores)
- plt.show()
- #Save the policy
- agent.writeToFile('mc_agent.txt')
- #Test the policy
- agent.readFromFile('mc_agent.txt')
- input("Press enter to watch the agent try his best")
- run_game(50, agent, a, b, c, False)
- run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement