Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import cupy as cp
- import copy
- import gym
- import os
- import time
- from collections import deque
- from chainer import Chain, Variable, optimizers, initializers, serializers, cuda
- import chainer.functions as F
- import chainer.links as L
- from abc import ABCMeta, abstractclassmethod
- os.environ['PATH'] += ':/usr/local/cuda-8.0/bin:/usr/local/cuda-8.0/bin'
- class ConvNet(Chain):
- def __init__(self, n_out):
- initializer = initializers.HeNormal()
- super(ConvNet, self).__init__(
- c1=L.Convolution2D(3, 10, 10, stride=(4, 3), initialW=initializer),
- c2=L.Convolution2D(10, 10, 3, pad=1, initialW=initializer),
- l3=L.Linear(None, 160, initialW=initializer),
- l4=L.Linear(None, n_out, initialW=initializer)
- )
- def __call__(self, x):
- h = F.leaky_relu(self.c1(x))
- h = F.max_pooling_2d(h, 6, stride=3)
- h = F.leaky_relu(self.c2(h))
- h = F.max_pooling_2d(h, 7, stride=3)
- h = F.leaky_relu(self.l3(h))
- h = F.leaky_relu(self.l4(h))
- return h
- class Strategy(metaclass=ABCMeta):
- @abstractclassmethod
- def get_action(self, v_act, is_train=True):
- pass
- @abstractclassmethod
- def forward_step(self, step):
- pass
- class EpsilonGreedy(Strategy):
- def __init__(self):
- self.epsilon = 1
- self.decay = 0.001
- self.min = 0
- self.n_search = 1000
- def get_action(self, v_act, is_train=True):
- if cp.random.rand() < self.epsilon and is_train:
- return cp.random.randint(len(v_act))
- else:
- return cp.argmax(v_act)
- def forward_step(self, step):
- if self.epsilon > self.min and self.n_search < step:
- self.epsilon -= self.decay
- class Agent(metaclass=ABCMeta):
- @abstractclassmethod
- def get_action(self, st, is_train=True):
- pass
- class DQN(Agent):
- def __init__(self, strategy, n_act, n_mem=1000, seed=0):
- cp.random.seed(seed)
- self.n_act = n_act
- self.n_mem = n_mem
- self.n_batch = 100
- self.f_train = 50
- self.f_target_update = 100
- self.q_func = ConvNet(n_act)
- self.target_q_func = copy.deepcopy(self.q_func)
- self.q_func.to_gpu(0) # for using GPU
- self.target_q_func.to_gpu(0) # for using GPU
- self.optimizer = optimizers.Adam()
- self.optimizer.setup(self.q_func)
- self.strategy = strategy
- self.memory = deque(maxlen=self.n_mem)
- self.gamma = 0.99
- self.step = 0
- def stock_experience(self, st, act, r, st_next, is_term):
- self.memory.append((st, act, r, st_next, is_term))
- def experience_replay(self):
- memory = list(map(np.array, zip(*self.memory)))
- index = np.random.permutation(len(self.memory))
- for mask in np.split(index, self.n_mem / self.n_batch):
- batch = list(map(lambda a: cuda.to_gpu(a[mask]), memory))
- self.q_func.cleargrads()
- loss = self.forward(*batch)
- loss.backward()
- self.optimizer.update()
- def forward(self, st, act, r, st_next, is_term):
- s, s_next = Variable(st), Variable(st_next)
- q_out = self.q_func.__call__(s)
- target = copy.deepcopy(q_out.data)
- target = cp.asanyarray(target, dtype=cp.float32)
- q_max = self.target_q_func.__call__(s_next)
- q_max = list(map(cp.max, q_max.data))
- q_max = cp.asanyarray(q_max, dtype=cp.float32)
- index = cp.arange(self.n_batch)
- target[index, act] = r + self.gamma * cp.where(is_term, 0.0, q_max)
- loss = F.mean_squared_error(q_out, Variable(target))
- return loss
- def get_action(self, st, is_train=True):
- s = Variable(cuda.to_gpu(np.array([st])))
- v_act = self.q_func(s)
- v_act = v_act.data[0]
- act = self.strategy.get_action(v_act, is_train)
- return cuda.to_cpu(act)
- def train(self):
- if len(self.memory) >= self.n_mem:
- if self.step % self.f_train == 0:
- self.experience_replay()
- self.strategy.forward_step(self.step)
- if self.step % self.f_target_update == 0:
- self.target_q_func = copy.deepcopy(self.q_func)
- self.target_q_func.to_gpu(0) # for using GPU
- self.step += 1
- env = gym.make('MsPacman-v0')
- n_act = env.action_space.n
- agent = DQN(EpsilonGreedy(), n_act, 500)
- print("starting train...")
- for episode in range(100):
- observation = env.reset()
- score = 0
- while True:
- state = observation.reshape([3, 210, 160]).astype(cp.float32)
- action = agent.get_action(state)
- observation, reward, is_term, _ = env.step(action)
- state_next = observation.reshape([3, 210, 160]).astype(cp.float32)
- agent.stock_experience(state, action, reward, state_next, is_term)
- agent.train()
- score += reward
- if is_term:
- break
- print("episode", episode, " is finished. score is ", score, "point.")
- name = input('Please input model filename... >>')
- print("saving model...")
- model = copy.deepcopy(agent.q_func)
- model.to_cpu()
- serializers.save_npz(name + ".npz", model)
- print("starting test...")
- observation = env.reset()
- for t in range(1000):
- env.render()
- time.sleep(1.0 / 25)
- state = observation.reshape([3, 210, 160]).astype(cp.float32)
- action = agent.get_action(state, False)
- observation, _, is_term, _ = env.step(action)
- if is_term:
- break
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement