Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import random
- import copy
- from collections import deque, namedtuple
- import matplotlib.pyplot as plt
- import tensorflow as tf
- import gym
- Experience = namedtuple('Experience', 'state0, action, reward, state1, terminal')
- def sample_batch_indexes(low, high, size):
- r = range(low, high)
- batch_idxs = random.sample(r, size)
- return batch_idxs
- class EpsGreedyQPolicy:
- def __init__(self, eps=.1, eps_decay_rate=.99, min_eps=0.1):
- self.eps = eps
- self.eps_decay_rate = eps_decay_rate
- self.min_eps = min_eps
- def select_action(self, q_values, is_training=True):
- nb_actions = q_values.shape[0]
- if is_training:
- if np.random.uniform() < self.eps:
- action = np.random.randint(0, nb_actions)
- else:
- action = np.argmax(q_values)
- else:
- action = np.argmax(q_values)
- return action
- def decay_eps_rate(self):
- self.eps = self.eps*self.eps_decay_rate
- if self.eps < self.min_eps:
- self.eps = self.min_eps
- def build_model(input_shape, nb_output):
- model = tf.keras.models.Sequential()
- inputs = tf.placeholder(dtype=tf.float32, shape=[None,]+input_shape, name="input")
- model.add(tf.keras.layers.Dense(16, activation="relu", input_shape=[None,]+input_shape))
- model.add(tf.keras.layers.Dense(16, activation="relu"))
- model.add(tf.keras.layers.Dense(16, activation="relu"))
- model.add(tf.keras.layers.Dense(nb_output))
- outputs = model(inputs)
- return inputs, outputs, model
- class Memory:
- def __init__(self, limit, maxlen):
- self.actions = deque(maxlen=limit)
- self.rewards = deque(maxlen=limit)
- self.terminals = deque(maxlen=limit)
- self.observations = deque(maxlen=limit)
- self.maxlen = maxlen
- self.recent_observations = deque(maxlen=maxlen)
- def sample(self, batch_size):
- batch_idxs = sample_batch_indexes(0, len(self.observations) - 1, size=batch_size)
- for (i, idx) in enumerate(batch_idxs):
- terminal = self.terminals[idx-1]
- while terminal:
- idx = sample_batch_indexes(0, len(self.observations)-1, size=1)[0]
- terminal = self.terminals[idx-1]
- batch_idxs[i] = idx
- experiences = []
- for idx in batch_idxs:
- state0 = self.observations[idx]
- action = self.actions[idx]
- reward = self.rewards[idx]
- terminal = self.terminals[idx]
- state1 = self.observations[idx+1]
- experiences.append(Experience(state0=state0, action=action, reward=reward, state1=state1, terminal=terminal))
- return experiences
- def append(self, observation, action, reward, terminal=False):
- self.observations.append(observation)
- self.actions.append(action)
- self.rewards.append(reward)
- self.terminals.append(terminal)
- self.recent_observations.append(observation)
- class DQNAgent:
- def __init__(self, training=None, policy=None, gamma=.99, actions=None, memory=None, memory_interval=1,train_interval=1,
- batch_size=32, nb_steps_warmup=300, observation=None, input_shape=None):
- self.training = training
- self.policy = policy
- self.actions = actions
- self.gamma = gamma
- self.recent_observation = observation
- self.previous_observation = observation
- self.memory = memory
- self.memory_interval = memory_interval
- self.batch_size = batch_size
- self.recent_action_id = 0
- self.nb_steps_warmup = nb_steps_warmup
- self.sess = tf.InteractiveSession()
- self.model_inputs, self.model_outputs, self.model = build_model(input_shape, len(self.actions))
- self.target_model_inputs, self.target_model_outputs, self.target_model = build_model(input_shape, len(self.actions))
- target_model_weights = self.target_model.trainable_weights
- model_weights = self.model.trainable_weights
- self.update_target_model = [target_model_weights[i].assign(.999*target_model_weights[i]+.001*model_weights[i]) for i in range(len(target_model_weights))]
- self.train_interval = train_interval
- self.step = 0
- def compile(self):
- self.targets = tf.placeholder(dtype=tf.float32, shape=[None, 2], name="target_q")
- self.inputs= tf.placeholder(dtype=tf.int32, shape=[None], name="action")
- actions_one_hot = tf.one_hot(indices=self.inputs, depth=len(self.actions), on_value=1.0, off_value=0.0, name="action_one_hot")
- pred_q = tf.multiply(self.model_outputs, actions_one_hot)
- error = self.targets - pred_q
- square_error = .5 * tf.square(error)
- loss = tf.reduce_mean(square_error, axis=0, name="loss")
- optimizer = tf.train.AdamOptimizer(learning_rate=1e-3)
- self.train = optimizer.minimize(loss)
- self.sess.run(tf.initialize_all_variables())
- def act(self):
- action_id = self.forward()
- action = self.actions[action_id]
- return action
- def forward(self):
- q_values = self.compute_q_values(self.recent_observation)
- action_id = self.policy.select_action(q_values=q_values, is_training=self.training)
- self.recent_action_id = action_id
- return action_id
- def observe(self, observation, reward=None, is_terminal=None):
- self.previous_observation = copy.deepcopy(self.recent_observation)
- self.recent_observation = observation
- if self.training and reward is not None:
- if self.step % self.memory_interval == 0:
- self.memory.append(self.previous_observation, self.recent_action_id, reward, terminal=is_terminal)
- self.experience_replay()
- self.policy.decay_eps_rate()
- self.step += 1
- def experience_replay(self):
- if (self.step > self.nb_steps_warmup) and (self.step % self.train_interval == 0):
- experiences = self.memory.sample(self.batch_size)
- state0_batch = []
- reward_batch = []
- action_batch = []
- state1_batch = []
- terminal_batch = []
- for e in experiences:
- state0_batch.append(e.state0)
- state1_batch.append(e.state1)
- reward_batch.append(e.reward)
- action_batch.append(e.action)
- terminal_batch.append(0. if e.terminal else 1.)
- target_batch = np.zeros((self.batch_size, len(self.actions)))
- reward_batch = np.array(reward_batch)
- target_q_values = np.array(self.compute_target_q_value(state1_batch)) # compute maxQ'(s')
- discounted_reward_batch = (self.gamma * target_q_values)
- discounted_reward_batch *= terminal_batch
- targets = reward_batch + discounted_reward_batch # target = r + γ maxQ'(s')
- for idx, (action, target) in enumerate(zip(action_batch, targets)):
- target_batch[idx][action] = target
- self.train_on_batch(state0_batch, action_batch, target_batch)
- self.sess.run(self.update_target_model)
- def train_on_batch(self, state_batch, action_batch, targets):
- self.sess.run(self.train, feed_dict={self.model_inputs:state_batch, self.inputs:action_batch, self.targets:targets})
- def compute_target_q_value(self, state1_batch):
- q_values = self.sess.run(self.target_model_outputs, feed_dict={self.target_model_inputs: state1_batch})
- q_values = np.max(q_values, axis=1)
- return q_values
- def compute_q_values(self, state):
- q_values = self.sess.run(self.target_model_outputs, feed_dict={self.target_model_inputs: [state]})
- return q_values[0]
- def reset(self):
- self.recent_observation = None
- self.previous_observation = None
- self.recent_action_id = None
- if __name__ == '__main__':
- env = gym.make('CartPole-v0')
- np.random.seed(123)
- env.seed(123)
- nb_actions = env.action_space.n
- actions = np.arange(nb_actions)
- obs = env.reset()
- policy = EpsGreedyQPolicy(1.0, 0.999)
- memory = Memory(limit=50000, maxlen=1)
- agent = DQNAgent(actions=actions, memory=memory, observation=obs, input_shape=[len(obs)], policy=policy)
- agent.compile()
- nb_episode = 3000
- evaluate_interval = 10
- result = []
- evaluate_episode = []
- timestep_limit = env.spec.timestep_limit
- for episode in range(nb_episode):
- agent.reset()
- observation = env.reset()
- observation = copy.deepcopy(observation)
- agent.observe(observation)
- for t in range(timestep_limit):
- # env.render()
- action = agent.act()
- observation, reward, done, info = env.step(action)
- observation = copy.deepcopy(observation)
- agent.observe(observation, reward, done)
- if done:
- break
- # evaluate
- if episode % evaluate_interval == 0:
- evaluate_episode.append(episode)
- agent.training = False
- agent.reset()
- observation = env.reset()
- agent.observe(observation)
- for t in range(timestep_limit):
- # env.render()
- action = agent.act()
- observation, reward, done, info = env.step(action)
- agent.observe(observation)
- if done:
- result.append(t)
- break
- agent.training = True
- plt.ylabel("timesteps")
- plt.xlabel("episode")
- plt.ylim((0, 200))
- plt.plot(evaluate_episode, result)
- plt.savefig("result.png")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement