Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- Created on Sun Dec 1 14:27:10 2019
- @author: Aditya Saxena
- """
- import datetime
- import os
- import numpy as np
- import gym
- from gym import wrappers
- import pybullet_envs
- # Setting the Hyper Parameters
- class Hp():
- def __init__(self):
- self.nb_steps = 1000
- self.episode_length = 1000
- self.learning_rate = 0.02
- self.nb_directions = 32
- self.nb_best_directions = 32
- assert self.nb_best_directions <= self.nb_directions
- self.noise = 0.03
- self.seed = 1
- self.env_name = 'HalfCheetahBulletEnv-v0'
- # Normalizing the states
- class Normalizer():
- def __init__(self, nb_inputs):
- self.n = np.zeros(nb_inputs)
- self.mean = np.zeros(nb_inputs)
- self.mean_diff = np.zeros(nb_inputs)
- self.var = np.zeros(nb_inputs)
- def observe(self, x):
- self.n += 1.
- last_mean = self.mean.copy()
- self.mean += (x - self.mean) / self.n
- self.mean_diff += (x - last_mean) * (x - self.mean)
- self.var = (self.mean_diff / self.n).clip(min=1e-2)
- def normalize(self, inputs):
- obs_mean = self.mean
- obs_std = np.sqrt(self.var)
- return (inputs - obs_mean) / obs_std
- # Building the AI
- class Policy():
- def __init__(self, input_size, output_size):
- self.theta = np.zeros((output_size, input_size))
- def evaluate(self, input, delta=None, direction=None):
- if direction is None:
- return self.theta.dot(input)
- elif direction == "positive":
- return (self.theta + hp.noise * delta).dot(input)
- else:
- return (self.theta - hp.noise * delta).dot(input)
- def sample_deltas(self):
- return [np.random.randn(*self.theta.shape) for _ in range(hp.nb_directions)]
- def update(self, rollouts, sigma_r):
- step = np.zeros(self.theta.shape)
- for r_pos, r_neg, d in rollouts:
- step += (r_pos - r_neg) * d
- self.theta += hp.learning_rate / (hp.nb_best_directions * sigma_r) * step
- # Exploring the policy on one specific direction and over one episode
- def explore(env, normalizer, policy, direction=None, delta=None):
- state = env.reset()
- done = False
- num_plays = 0.
- sum_rewards = 0
- while not done and num_plays < hp.episode_length:
- normalizer.observe(state)
- state = normalizer.normalize(state)
- action = policy.evaluate(state, delta, direction)
- state, reward, done, _ = env.step(action)
- reward = max(min(reward, 1), -1)
- sum_rewards += reward
- num_plays += 1
- return sum_rewards
- # Training the AI
- def train(env, policy, normalizer, hp):
- for step in range(hp.nb_steps):
- # Initializing the perturbations deltas and the positive/negative rewards
- deltas = policy.sample_deltas()
- positive_rewards = [0] * hp.nb_directions
- negative_rewards = [0] * hp.nb_directions
- # Getting the positive rewards in the positive directions
- for k in range(hp.nb_directions):
- positive_rewards[k] = explore(env, normalizer, policy, direction="positive", delta=deltas[k])
- # Getting the negative rewards in the negative/opposite directions
- for k in range(hp.nb_directions):
- negative_rewards[k] = explore(env, normalizer, policy, direction="negative", delta=deltas[k])
- # Gathering all the positive/negative rewards to compute the standard deviation of these rewards
- all_rewards = np.array(positive_rewards + negative_rewards)
- sigma_r = all_rewards.std()
- # Sorting the rollouts by the max(r_pos, r_neg) and selecting the best directions
- scores = {k: max(r_pos, r_neg) for k, (r_pos, r_neg) in enumerate(zip(positive_rewards, negative_rewards))}
- order = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)[:hp.nb_best_directions]
- rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order]
- # Updating our policy
- policy.update(rollouts, sigma_r)
- # Printing the final reward of the policy after the update
- reward_evaluation = explore(env, normalizer, policy)
- print('Step:', step, 'Reward:', reward_evaluation)
- # Running the main code
- def mkdir(base, name):
- path = os.path.join(base, name)
- if not os.path.exists(path):
- os.makedirs(path)
- return path
- work_dir = mkdir('exp', 'brs')
- monitor_dir = mkdir(work_dir, 'monitor')
- hp = Hp()
- np.random.seed(hp.seed)
- env = gym.make(hp.env_name)
- env = wrappers.Monitor(env, monitor_dir, force=True)
- nb_inputs = env.observation_space.shape[0]
- nb_outputs = env.action_space.shape[0]
- policy = Policy(nb_inputs, nb_outputs)
- normalizer = Normalizer(nb_inputs)
- train(env, policy, normalizer, hp)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement