Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import random, math
- import numpy as np
- import torch, torch.nn as nn
- from torch.autograd import Variable
- from collections import deque
- class ReplayBuffer(object):
- def __init__(self, capacity):
- self.buffer = deque(maxlen=capacity)
- def push(self, state, action, reward, next_state, done):
- self.buffer.append((state, action, reward, next_state, done))
- def sample(self, batch_size):
- return zip(*random.sample(self.buffer, batch_size))
- def __len__(self):
- return len(self.buffer)
- class DQN(nn.Module):
- def __init__(self, num_inputs, num_actions, hidden_size):
- super(DQN, self).__init__()
- self.layers = nn.Sequential(
- nn.Linear(num_inputs, hidden_size),
- nn.ReLU(),
- nn.Linear(hidden_size, hidden_size),
- nn.ReLU(),
- nn.Linear(hidden_size, num_actions),
- )
- def forward(self, x):
- return self.layers(x)
- def act(self, state, epsilon):
- if random.random() > epsilon:
- q_value_list = self.forward(state)
- _, action = q_value_list.max(0)
- else:
- action = random.randrange(6)
- return action
- def compute_td_loss(batch, q_list, nx_q_list, gamma=0.99):
- state, action, reward, nx_state, done = batch
- # to Variable
- state = Variable(torch.Tensor(state))
- action = Variable(torch.LongTensor(action))
- reward = Variable(torch.Tensor(reward))
- nx_state = Variable(torch.Tensor(nx_state))
- done = Variable(torch.Tensor(done))
- # compute loss
- q = q_list.gather(0, action)
- max_nx_q, _ = nx_q_list.max(0)
- y = reward + gamma * max_nx_q * (1 - done)
- loss = (y - q).pow(2).mean()
- return loss
- def prepro(I):
- # prepro 210x160x3 into 6400, from Karpathy's code
- I = I[35:195]
- I = I[::2, ::2, 0]
- I[I == 144] = 0
- I[I == 109] = 0
- I[I != 0] = 1
- I = I.astype(np.float).ravel()
- return Variable(torch.Tensor(I))
- def main():
- # inits
- import gym
- env = gym.make("Pong-v0")
- model = DQN(6400, env.action_space.n, hidden_size=128)
- optimizer = torch.optim.Adam(model.parameters())
- replay_buffer = ReplayBuffer(capacity=1000)
- # hyperparameters
- num_frames = 10000
- batch_size = 64
- num_iters = 1000
- # epsilion greedy decay
- epsilon_start = 1.0
- epsilon_final = 0.01
- epsilon_decay = 500
- epsilon_by_i = lambda i: epsilon_final + (epsilon_start - epsilon_final) * math.exp(-1. * i / epsilon_decay)
- while True:
- state = env.reset()
- for i in range(num_iters):
- # env.render()
- action = model.act(prepro(state), epsilon_by_i(i))
- nx_state, reward, done, _ = env.step(action)
- replay_buffer.push(state, action, reward, nx_state, done)
- if len(replay_buffer) > batch_size:
- # learning stage
- q_list = model(prepro(state))
- nx_q_list = model(prepro(nx_state))
- loss = compute_td_loss(replay_buffer.sample(batch_size),
- q_list,
- nx_q_list)
- optimizer.zero_grad()
- loss.backward()
- # update
- state = nx_state
- if done: break
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment