Advertisement
Guest User

Untitled

a guest
Dec 5th, 2019
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.83 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Sun Dec 1 14:27:10 2019
  4.  
  5. @author: Aditya Saxena
  6. """
  7.  
  8. import datetime
  9. import os
  10. import numpy as np
  11. import gym
  12. from gym import wrappers
  13. import pybullet_envs
  14.  
  15.  
  16. # Setting the Hyper Parameters
  17.  
  18. class Hp():
  19.  
  20. def __init__(self):
  21. self.nb_steps = 1000
  22. self.episode_length = 1000
  23. self.learning_rate = 0.02
  24. self.nb_directions = 32
  25. self.nb_best_directions = 32
  26. assert self.nb_best_directions <= self.nb_directions
  27. self.noise = 0.03
  28. self.seed = 1
  29. self.env_name = 'HalfCheetahBulletEnv-v0'
  30.  
  31.  
  32. # Normalizing the states
  33.  
  34. class Normalizer():
  35.  
  36. def __init__(self, nb_inputs):
  37. self.n = np.zeros(nb_inputs)
  38. self.mean = np.zeros(nb_inputs)
  39. self.mean_diff = np.zeros(nb_inputs)
  40. self.var = np.zeros(nb_inputs)
  41.  
  42. def observe(self, x):
  43. self.n += 1.
  44. last_mean = self.mean.copy()
  45. self.mean += (x - self.mean) / self.n
  46. self.mean_diff += (x - last_mean) * (x - self.mean)
  47. self.var = (self.mean_diff / self.n).clip(min=1e-2)
  48.  
  49. def normalize(self, inputs):
  50. obs_mean = self.mean
  51. obs_std = np.sqrt(self.var)
  52. return (inputs - obs_mean) / obs_std
  53.  
  54.  
  55. # Building the AI
  56.  
  57. class Policy():
  58.  
  59. def __init__(self, input_size, output_size):
  60. self.theta = np.zeros((output_size, input_size))
  61.  
  62. def evaluate(self, input, delta=None, direction=None):
  63. if direction is None:
  64. return self.theta.dot(input)
  65. elif direction == "positive":
  66. return (self.theta + hp.noise * delta).dot(input)
  67. else:
  68. return (self.theta - hp.noise * delta).dot(input)
  69.  
  70. def sample_deltas(self):
  71. return [np.random.randn(*self.theta.shape) for _ in range(hp.nb_directions)]
  72.  
  73. def update(self, rollouts, sigma_r):
  74. step = np.zeros(self.theta.shape)
  75. for r_pos, r_neg, d in rollouts:
  76. step += (r_pos - r_neg) * d
  77. self.theta += hp.learning_rate / (hp.nb_best_directions * sigma_r) * step
  78.  
  79.  
  80. # Exploring the policy on one specific direction and over one episode
  81.  
  82. def explore(env, normalizer, policy, direction=None, delta=None):
  83. state = env.reset()
  84. done = False
  85. num_plays = 0.
  86. sum_rewards = 0
  87. while not done and num_plays < hp.episode_length:
  88. normalizer.observe(state)
  89. state = normalizer.normalize(state)
  90. action = policy.evaluate(state, delta, direction)
  91. state, reward, done, _ = env.step(action)
  92. reward = max(min(reward, 1), -1)
  93. sum_rewards += reward
  94. num_plays += 1
  95. return sum_rewards
  96.  
  97.  
  98. # Training the AI
  99.  
  100. def train(env, policy, normalizer, hp):
  101. for step in range(hp.nb_steps):
  102.  
  103. # Initializing the perturbations deltas and the positive/negative rewards
  104. deltas = policy.sample_deltas()
  105. positive_rewards = [0] * hp.nb_directions
  106. negative_rewards = [0] * hp.nb_directions
  107.  
  108. # Getting the positive rewards in the positive directions
  109. for k in range(hp.nb_directions):
  110. positive_rewards[k] = explore(env, normalizer, policy, direction="positive", delta=deltas[k])
  111.  
  112. # Getting the negative rewards in the negative/opposite directions
  113. for k in range(hp.nb_directions):
  114. negative_rewards[k] = explore(env, normalizer, policy, direction="negative", delta=deltas[k])
  115.  
  116. # Gathering all the positive/negative rewards to compute the standard deviation of these rewards
  117. all_rewards = np.array(positive_rewards + negative_rewards)
  118. sigma_r = all_rewards.std()
  119.  
  120. # Sorting the rollouts by the max(r_pos, r_neg) and selecting the best directions
  121. scores = {k: max(r_pos, r_neg) for k, (r_pos, r_neg) in enumerate(zip(positive_rewards, negative_rewards))}
  122. order = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)[:hp.nb_best_directions]
  123. rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order]
  124.  
  125. # Updating our policy
  126. policy.update(rollouts, sigma_r)
  127.  
  128. # Printing the final reward of the policy after the update
  129. reward_evaluation = explore(env, normalizer, policy)
  130. print('Step:', step, 'Reward:', reward_evaluation)
  131.  
  132.  
  133.  
  134. # Running the main code
  135.  
  136. def mkdir(base, name):
  137. path = os.path.join(base, name)
  138. if not os.path.exists(path):
  139. os.makedirs(path)
  140. return path
  141.  
  142.  
  143. work_dir = mkdir('exp', 'brs')
  144. monitor_dir = mkdir(work_dir, 'monitor')
  145.  
  146. hp = Hp()
  147. np.random.seed(hp.seed)
  148. env = gym.make(hp.env_name)
  149. env = wrappers.Monitor(env, monitor_dir, force=True)
  150. nb_inputs = env.observation_space.shape[0]
  151. nb_outputs = env.action_space.shape[0]
  152. policy = Policy(nb_inputs, nb_outputs)
  153. normalizer = Normalizer(nb_inputs)
  154. train(env, policy, normalizer, hp)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement