Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cv2
- import random
- import warnings
- warnings.filterwarnings("ignore")
- import os
- os.environ['TF_CPP_MIN_LOG_LEVEL']='3'
- import numpy as np
- import gym
- import tensorflow as tf
- from tqdm import tqdm as tqdm
- from tensorflow.keras.models import Model
- from tensorflow.keras.layers import Conv2D, MaxPool2D, MaxPooling2D, Dropout, Flatten, Input, Dense
- from tensorflow.keras.optimizers import Adam
- from tensorflow.keras.callbacks import TensorBoard
- from collections import deque
- env = gym.make('Pong-v0')
- EPISODES = 300
- MODEL_NAME = 'my_first_ddqn'
- class TensorBoard_for_DQN(TensorBoard):
- # Overriding init to set initial step and writer (we want one log file for all .fit() calls)
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.step = 1 # Is this a attribute which comes in the TensorBoard class
- self.writer = tf.summary.create_file_writer(self.log_dir)
- self._log_write_dir = os.path.join(self.log_dir, MODEL_NAME)
- # Overriding this method to stop creating default log writer
- def set_model(self, model):
- pass
- # Overrided, saves logs with our step number
- # (otherwise every .fit() will start writing from 0th step)
- def on_epoch_end(self, epoch, logs=None):
- self.update_stats(**logs)
- # Overrided
- # We train for one batch only, no need to save anything at epoch end
- def on_batch_end(self, batch, logs=None):
- pass
- # Overrided, so won't close writer
- def on_train_end(self, _):
- pass
- def on_train_batch_end(self, batch, logs=None):
- pass
- # Custom method for saving own metrics
- # Creates writer, writes custom metrics and closes writer
- def update_stats(self, **stats):
- self._write_logs(stats, self.step)
- def _write_logs(self, logs, index):
- with self.writer.as_default():
- for name, value in logs.items():
- tf.summary.scalar(name, value, step=index)
- self.step += 1
- self.writer.flush()
- class model(Model):
- def __init__(self):
- super(model, self).__init__()
- self.lr = 0.01
- self.conv1 = Conv2D(filters=32, input_shape=(210, 160, 1), kernel_size=(3, 3), strides=1, padding='same', activation='elu')#(self.inp)
- self.conv2 = Conv2D(filters=32, kernel_size=(3, 3), strides=1, padding='same', activation='elu')#(self.conv1)
- self.mp2 = MaxPool2D(pool_size=(3, 3), strides=1, padding='same')#(self.conv2)
- self.conv3 = Conv2D(filters=64, kernel_size=(3, 3), strides=1, padding='same', activation='elu')#(self.mp2)
- self.mp3 = MaxPool2D(pool_size=(3, 3), strides=1, padding='same')#(self.conv3)
- self.conv4 = Conv2D(filters=64, kernel_size=(3, 3), strides=1, padding='same', activation='elu')#(self.mp3)
- self.mp4 = MaxPool2D(pool_size=(3, 3), strides=1, padding='same')#(self.conv4)
- self.flat = Flatten() #(self.mp6)
- self.value = Dense(1, activation=None)#(self.flat) # how good is a particular state
- self.advantage = Dense(env.action_space.n, activation=None)#(self.flat) # which is best action
- self.compile(optimizer=Adam(lr=self.lr), loss='mse', metrics=['accuracy'])
- def predict_q(self, state):
- state = tf.cast(cv2.cvtColor(state, cv2.COLOR_RGB2GRAY), tf.float32)
- #x = self.inp(state)
- x = self.conv1(x)
- x=self.conv2(x)
- x=self.mp2(x)
- x=self.conv3(x)
- x=self.mp3(x)
- x=self.conv4(x)
- x=self.mp4(x)
- x = self.flat(x)
- value = self.value(x)
- advantage = self.advantage(x)
- q = (value + (advantage - tf.math.reduce_mean(advantage, axis=1, keep_dims=True)))
- return q
- def predict_advantage(self, state):
- state = tf.cast(cv2.cvtColor(state, cv2.COLOR_RGB2GRAY), tf.float32)
- #x = self.inp(state)
- x = self.conv1(x)
- x=self.conv2(x)
- x=self.mp2(x)
- x=self.conv3(x)
- x=self.mp3(x)
- x=self.conv4(x)
- x=self.mp4(x)
- x = self.flat(x)
- # value = self.value(x)
- x = self.advantage(x)
- return x
- class agent():
- def __init__(self):
- # our parameters for learning
- self.discount = 0.9
- self.epsilon = 0.99
- self.epsilon_decay = 0.99 # 0.99
- self.min_epsilon = 0.1
- self.MINIBATCH_SIZE = 128
- self.lr = 0.01
- # Our memory
- self.REPLAY_MEMORY_SIZE = 5000
- self.MIN_REPLAY_MEMORY_SIZE = 1000
- self.replay_memory = deque(maxlen=self.REPLAY_MEMORY_SIZE)
- self.model_name = 'my_first_ddqn.h5'
- #our models -
- self.model = model()
- self.target_model = model()
- #self.model.compile(optimizer=Adam(lr=self.lr), loss='mse', metrics=['accuracy'])
- #self.target_model.compile(optimizer=Adam(lr=self.lr), loss='mse', metrics=['accuracy'])
- # when to update our target model
- self.UPDATE_TARGET_EVERY = 1000
- self.UPDATE_TARGET_COUNTER = 0
- # Visualisations -
- self.tensorboard = TensorBoard_for_DQN()
- def update_replay_memory(self, transition):
- self.replay_memory.append(transition)
- def choose_action(self, state):
- if np.random.random() < self.epsilon:
- action = np.random.choice(env.action_space.n)
- else: # we exploit
- actions = self.model.advantage(state)
- action = np.argmax(acions, axis=1)
- return action
- def decay_epsilon(self):
- # Decay epsilon
- if self.epsilon > self.min_epsilon:
- self.epsilon *= self.epsilon_decay
- self.epsilon = max(MIN_EPSILON, epsilon)
- def train(self):
- if len(self.replay_memory) < 1000:
- return
- # Let's make a minibatch -
- minibatch = random.sample(self.replay_memory, k=self.MINIBATCH_SIZE)
- # Get current states from minibatch, then query NN model for Q values
- current_states = np.array([transition[0] for transition in minibatch])/255
- current_qs_list = self.model.predict_q(current_states)
- new_current_states = np.array([transition[3] for transition in minibatch])/255
- future_qs_list = self.target_model.predict_q(new_current_states)
- x = []
- y = []
- for idx, (current_state, action, reward, new_current_state, done) in enumerate(minibatch):
- if not done:
- max_future_q = np.max(future_qs_list[idx])
- new_q = reward + self.discount * max_future_q
- else:
- new_q =reward
- # Update Q value for given state
- current_qs = current_qs_list[index]
- current_qs[action] = new_q
- x.append(current_state)
- y.append(current_qs)
- model.fit()
- self.model.fit(np.array(X)/255, np.array(y), batch_size=MINIBATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if terminal_state else None)
- if terminal_state:
- self.UPDATE_TARGET_COUNTER += 1
- if self.UPDATE_TARGET_COUNTER == self.UPDATE_TARGET_EVERY:
- self.target_model.set_weights(self.model.weights)
- self.UPDATE_TARGET_COUNTER = 0
- episodes = 3000
- agent = agent()
- ep_rewards = []
- SHOW_PREVIEW = False
- AGGREGATE_STATS_EVERY = 1000
- MIN_REWARD = 10
- for episode in tqdm(range(1, EPISODES + 1), ascii=True, unit='episodes'):
- agent.tensorboard.step = episode# Update tensorboard step every episode
- episode_reward = 0 # We reset episode reward after each episode
- current_state = env.reset() # Reset environment and get initial state
- done = False # To exit loop when we are done.
- while not done:
- # predict what our best action is -
- action = agent.choose_action(current_state)
- new_state, reward, done, info = env.step(action)
- # Transform new continous state to new discrete state and count reward
- episode_reward += reward
- if SHOW_PREVIEW and not episode % AGGREGATE_STATS_EVERY:
- env.render()
- # Every step we update replay memory and train main network
- agent.update_replay_memory((current_state, action, reward, new_state, done))
- agent.train()
- current_state = new_state
- # Append episode reward to a list and log stats (every given number of episodes)
- ep_rewards.append(episode_reward)
- if not episode % AGGREGATE_STATS_EVERY or episode == 1:
- average_reward = sum(ep_rewards[-AGGREGATE_STATS_EVERY:])/len(ep_rewards[-AGGREGATE_STATS_EVERY:])
- min_reward = min(ep_rewards[-AGGREGATE_STATS_EVERY:])
- max_reward = max(ep_rewards[-AGGREGATE_STATS_EVERY:])
- agent.tensorboard.update_stats(reward_avg=average_reward, reward_min=min_reward, reward_max=max_reward, epsilon=epsilon)
- # Save model, but only when min reward is greater or equal a set value
- if min_reward >= MIN_REWARD:
- agent.model.save(f'models/{MODEL_NAME}__{max_reward:_>7.2f}max_{average_reward:_>7.2f}avg_{min_reward:_>7.2f}min__{int(time.time())}.model')
- agent.decay_epsilon()
Add Comment
Please, Sign In to add comment