Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import matplotlib.pyplot as pyplot
- import math
- import numpy as np
- import gym
- import random
- import time
- class QLearner:
- def __init__(self):
- self.knowledge = {}
- self.environment = gym.make('LunarLander-v2')
- self.attempt_no = 1
- self.reward_sum_history = []
- self.upper_bounds = [
- 1.0,
- 1.0,
- 2.0,
- 2.0,
- 2.0,
- math.radians(180)
- ]
- self.lower_bounds = [
- -1.0,
- -1.0,
- -2.0,
- -2.0,
- -2.0,
- -math.radians(180)
- ]
- def learn(self, max_attempts, config):
- self.config = config
- self.max_attempts = max_attempts
- self.alpha = self.config['initial_alpha']
- self.eps = self.config['initial_eps']
- return [self.attempt() for _ in range(max_attempts)]
- def attempt(self):
- observation = self.discretise(self.environment.reset())
- done = False
- reward_sum = 0.0
- while not done:
- # self.environment.render()
- action = self.pick_action(observation)
- new_observation, reward, done, info = self.environment.step(action)
- new_observation = self.discretise(new_observation)
- self.update_knowledge(action, observation, new_observation, reward)
- observation = new_observation
- reward_sum += reward
- # print(reward_sum)
- self.reward_sum_history.append(reward_sum)
- self.exp_update_learning_parameters()
- self.attempt_no += 1
- return reward_sum
- def discretise(self, observation):
- bins = np.linspace(self.lower_bounds[0], self.upper_bounds[0], self.config['x_bucket_size'])
- pos_x = np.digitize([observation[0]], bins)
- bins = np.linspace(self.lower_bounds[1], self.upper_bounds[1], self.config['y_bucket_size'])
- pos_y = np.digitize([observation[1]], bins)
- bins = np.linspace(self.lower_bounds[2], self.upper_bounds[2], self.config['v_x_bucket_size'])
- vel_x = np.digitize([observation[2]], bins)
- bins = np.linspace(self.lower_bounds[3], self.upper_bounds[3], self.config['v_y_bucket_size'])
- vel_y = np.digitize([observation[3]], bins)
- bins = np.linspace(self.lower_bounds[4], self.upper_bounds[4], self.config['angle_bucket_size'])
- angle = np.digitize([observation[4]], bins)
- bins = np.linspace(self.lower_bounds[5], self.upper_bounds[5], self.config['v_angle_bucket_size'])
- ang_v = np.digitize([observation[5]], bins)
- return pos_x[0], pos_y[0], vel_x[0], vel_y[0], angle[0], ang_v[0], observation[6], observation[7]
- def pick_action(self, observation):
- if random.random() < self.eps:
- return random.randint(0, 3)
- action_dict = {key: self.knowledge.get((observation, key), 0.0) for key in range(0, 4)}
- action = max(action_dict.keys(), key=(lambda key: action_dict[key]))
- return action
- def update_knowledge(self, action, observation, new_observation, reward):
- self.knowledge[(observation, action)] = \
- (1.0 - self.alpha) * self.knowledge.get((observation, action), 1) + self.alpha * (
- reward + self.config['gamma'] * max(
- self.knowledge.get((new_observation, 0), 1),
- self.knowledge.get((new_observation, 1), 1),
- self.knowledge.get((new_observation, 2), 1),
- self.knowledge.get((new_observation, 3), 1)
- ))
- def exp_update_learning_parameters(self):
- self.alpha = self.config['initial_alpha'] * math.exp(-self.config['k_alpha'] * self.attempt_no)
- self.eps = self.config['initial_eps'] * math.exp(-self.config['k_eps'] * self.attempt_no)
- # print(self.alpha)
- # print(self.eps)
- def lin_update_learning_parameters(self):
- eps_start = self.config['eps_start']
- eps_end = self.config['eps_end']
- eps_perc = self.config['eps_perc']
- delta_eps = - (eps_start - eps_end) / (eps_perc * self.max_attempts)
- self.eps = max(eps_end, self.eps + delta_eps)
- def moving_average(self, y, N):
- cumsum = np.cumsum(np.insert(y, 0, 0))
- return (cumsum[N:] - cumsum[:-N]) / float(N)
- def reword_sum_history_stats(self):
- y = self.reward_sum_history
- std_dev = np.std(y)
- mean = np.mean(y)
- return mean, std_dev
- def format_config(self, dict):
- i = 0
- result = ''
- for key, value in dict.items():
- result += '{}:{} '.format(key, value)
- i += 1
- if (i % 2 == 0):
- result += '\n'
- return result
- def plot_reword_sum_history(self):
- y = self.reward_sum_history
- len_y = len(y)
- x = range(len_y)
- y_mean = [np.mean(y)] * len_y
- moving_avg = self.moving_average(y, 6)
- x_moving_avg = range(len(moving_avg))
- fig, ax = pyplot.subplots()
- # title = ax.set_title()
- fig.tight_layout()
- fig.subplots_adjust(top=0.70)
- data_line = ax.plot(x, y, label='Quality')
- moving_average_line = ax.plot(x_moving_avg, moving_avg, label='Quality Moving Avg')
- # mean_line = ax.plot(x, y_mean, 'r', label='Mean', linestyle='--')
- legend = ax.legend(loc='upper left')
- filename = self.format_config(self.config) + time.strftime("%Y%m%d-%H%M%S") + '.png'
- pyplot.savefig(filename)
- pyplot.close()
- def evaluate_learning_process(config):
- learner = QLearner()
- learner.learn(2000, config)
- learner.plot_reword_sum_history()
- def main():
- config = {}
- config['initial_alpha'] = 0.9
- config['initial_eps'] = 0.3
- config['gamma'] = 0.7
- config['k_alpha'] = 0.01
- config['k_eps'] = 0.009
- config['x_bucket_size'] = 2
- config['y_bucket_size'] = 2
- config['v_x_bucket_size'] = 6
- config['v_y_bucket_size'] = 10
- config['angle_bucket_size'] = 10
- config['v_angle_bucket_size'] = 8
- evaluate_learning_process(config)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement