Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # based on
- # https://github.com/pytorch/examples/blob/master/reinforcement_learning/actor_critic.py
- # http://rail.eecs.berkeley.edu/deeprlcourse-fa17/f17docs/lecture_5_actor_critic_pdf.pdf
- import argparse
- import gym
- import numpy as np
- from itertools import count
- from collections import namedtuple
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.distributions import Categorical
- import os
- os.environ["WANDB_BASE_URL"] = "http://api.wandb.ai"
- import wandb
- # ------------------------------------------------------------------------------------------------------------~
- # Cart Pole
- parser = argparse.ArgumentParser(description='PyTorch actor-critic example')
- # parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
- # help='discount factor (default: 0.99)')
- parser.add_argument('--seed', type=int, default=543, metavar='N',
- help='random seed (default: 543)')
- parser.add_argument('--render', action='store_true',
- help='render the environment')
- parser.add_argument('--log-interval', type=int, default=10, metavar='N',
- help='interval between training status logs (default: 10)')
- args = parser.parse_args()
- hyperparameter_defaults = dict(
- policy_lr=1e-2,
- n_episodes=2000,
- gamma_lr=1e-1, # used for the 'Adapt' option
- alpha_lr=1e0, # used for the 'SigmoidAdapt' option
- gamma_type='Adapt', # 'Adapt', 'SigmoidAdapt'
- use_paired=True,
- env_name='CartPole-v1', # 'Acrobot-v1', 'CartPole-v1'
- normalize_r2go=True # False | True
- )
- wandb.init(config=hyperparameter_defaults, project="GammaAdapt")
- config = wandb.config
- # ------------------------------------------------------------------------------------------------------------~
- env = gym.make(config.env_name)
- n_actions = env.action_space.n
- n_inputs = env.observation_space.shape[0]
- env.seed(args.seed)
- torch.manual_seed(args.seed)
- eps = np.finfo(np.float32).eps.item()
- # ------------------------------------------------------------------------------------------------------------~
- class DiscountFactor:
- def __init__(self):
- self.init_gamma = 0.001 if config.use_paired else 0.99
- self.max_gamma = 0.99
- self.gamma_type = config.gamma_type
- def get_tensor(self):
- # Return the tensor of gamma (possible to backprop)
- return torch.tensor(0) # dummy value
- def get_item(self):
- # Return the current value of gamma as s simple number (not for backprop)
- gamma = self.get_tensor()
- return gamma.item() # detach from the computation graph
- # ------------------------------------------------------------------------------------------------------------~
- class DiscountFactorAdaptSimple(DiscountFactor):
- def __init__(self):
- super().__init__()
- self.gamma = torch.tensor([self.init_gamma], requires_grad=True)
- self.gamma_optimizer = optim.Adam([self.gamma], lr=config.gamma_lr)
- def gd_step(self, loss):
- # Gradient Descent Step
- self.gamma_optimizer.zero_grad()
- loss.backward()
- self.gamma_optimizer.step()
- self.gamma.data = torch.clamp(self.gamma.data, 0, self.max_gamma)
- def get_tensor(self):
- # Return the tensor of gamma (possible to backprop)
- return self.gamma
- # ------------------------------------------------------------------------------------------------------------~
- class DiscountFactorAdaptSigmoid(DiscountFactor):
- # represent gamma = 1/(1+exp(alpha)) in which case you'll never need to clip,
- # perhaps making the optimization more stable.
- def __init__(self):
- super().__init__()
- init_alpha = np.log(self.init_gamma / (1 - self.init_gamma))
- self.alpha = torch.tensor([init_alpha], requires_grad=True)
- self.alpha_optimizer = optim.Adam([self.alpha], lr=config.alpha_lr)
- def gd_step(self, loss):
- # Gradient Descent Step
- self.alpha_optimizer.zero_grad()
- loss.backward()
- self.alpha_optimizer.step()
- def get_tensor(self):
- # Return the tensor of gamma (possible to backprop)
- gamma = 1 / (1 + torch.exp(-self.alpha))
- return gamma
- # ------------------------------------------------------------------------------------------------------------~
- class Policy(nn.Module):
- """
- implements both actor and critic in one model
- """
- def __init__(self):
- super(Policy, self).__init__()
- self.affine1 = nn.Linear(n_inputs, 128)
- # actor's layer
- self.action_head = nn.Linear(128, n_actions)
- # critic's layer
- self.value_head = nn.Linear(128, 1)
- # buffers (save info from last trajectory)
- self.ep_states = [] # list of states s_t in the last run trajectory
- self.ep_actions = [] # list of actions a_t in the last run trajectory
- self.ep_rewards = [] # list of rewards r_t in the last run trajectory
- self.ep_log_probs = [] # list of log(Pi(a_t|s_t)) in the last run trajectory
- self.ep_value_est = [] # list of estimated V(s_t) in the last run trajectory
- self.ep_len = None # length last run trajectory
- def forward(self, x):
- """
- forward of both actor and critic
- """
- x = F.relu(self.affine1(x))
- # actor: chooses action to take from state s_t
- # by returning probability of each action
- action_prob = F.softmax(self.action_head(x), dim=-1)
- # critic: evaluates being in the state s_t
- state_values = self.value_head(x)
- # return values for both actor and critic as a tuple of 2 values:
- # 1. a list with the probability of each action over the action space
- # 2. the value from state s_t
- return action_prob, state_values
- def run_model(self, state):
- state = torch.from_numpy(state).float()
- probs, state_value = self.forward(state)
- return probs, state_value
- def set_requires_grad(self, flag):
- for param in self.parameters():
- param.requires_grad = flag
- # ------------------------------------------------------------------------------------------------------------~
- def select_action(probs):
- # create a categorical distribution over the list of probabilities of actions
- m = Categorical(probs)
- # and sample an action using the distribution
- action = m.sample()
- log_prob = m.log_prob(action)
- return action.item(), log_prob
- # ------------------------------------------------------------------------------------------------------------~
- def run_trajectory(pol: Policy):
- episode_length_limit = 10000 # Don't infinite loop while learning
- # clear the memory of the previous episode
- del pol.ep_states[:]
- del pol.ep_actions[:]
- del pol.ep_rewards[:]
- del pol.ep_log_probs[:]
- del pol.ep_value_est[:]
- ep_tot_reward = 0
- t = 0
- # Init environment
- state = env.reset()
- for t in range(1, episode_length_limit):
- probs, value_est = pol.run_model(state)
- action, log_prob = select_action(probs)
- state, reward, done, _ = env.step(action)
- if args.render:
- env.render()
- pol.ep_states.append(state)
- pol.ep_actions.append(action)
- pol.ep_rewards.append(reward)
- pol.ep_log_probs.append(log_prob)
- pol.ep_value_est.append(value_est)
- ep_tot_reward += reward
- if done:
- break
- ep_len = t
- pol.ep_len = ep_len
- return ep_tot_reward, ep_len
- # ------------------------------------------------------------------------------------------------------------~
- def get_episode_objective(pol, gamma):
- """
- Training code. Calculates actor and critic loss
- """
- # calculate the "Reward-to-go"
- # (the cumulative discounted return starting at each time-step until the end of the episode)
- r2go = 0.0
- r2go_ep = torch.zeros(pol.ep_len) # the reward-to-go per time step in the episode
- for t in reversed(range(pol.ep_len)): # go from end of episode to start
- r = pol.ep_rewards[t]
- r2go = r + gamma * r2go
- r2go_ep[t] = r2go
- if config.normalize_r2go:
- # Normalize the Returns (for variance reduction)
- # Note: this is unjustified heuristic
- r2go_ep = (r2go_ep - r2go_ep.mean()) / (r2go_ep.std() + eps)
- # calculate the loss (objective)
- policy_losses = [] # list to save actor (policy) per-step losses
- value_losses = [] # list to save critic (value) per-step losses
- for t in range(pol.ep_len):
- r2go = r2go_ep[t]
- value_est = pol.ep_value_est[t]
- log_prob = pol.ep_log_probs[t]
- advantage = r2go - value_est.item()
- # note: we don't backprop trough the advantage
- # calculate actor (policy) loss
- policy_losses.append(-log_prob * advantage)
- # calculate critic (value) loss using L1 smooth loss
- value_losses.append(F.smooth_l1_loss(value_est.squeeze(), r2go))
- # sum up all the values of policy_losses and value_losses
- actor_loss = torch.stack(policy_losses).sum()
- critic_loss = torch.stack(value_losses).sum()
- loss = actor_loss + critic_loss
- return loss
- # ------------------------------------------------------------------------------------------------------------~
- def main():
- all_rewards = []
- gammas = []
- running_reward = 0
- # Initialize Gamma
- if config.gamma_type == 'Adapt':
- discount = DiscountFactorAdaptSimple()
- elif config.gamma_type == 'SigmoidAdapt':
- discount = DiscountFactorAdaptSigmoid()
- else:
- raise ValueError
- # Initialize policies:
- polP = Policy() # Protagonist policy
- polA = Policy() # Antagonist policy
- polP_optimizer = optim.Adam(polP.parameters(), lr=config.policy_lr)
- polA_optimizer = optim.Adam(polA.parameters(), lr=config.policy_lr)
- # run infinitely many episodes
- for i_episode in range(config.n_episodes):
- polA.set_requires_grad(True)
- polP.set_requires_grad(True)
- # get Protagonist loss
- ep_tot_rewardP, ep_lenP = run_trajectory(polP)
- lossP = get_episode_objective(polP, discount.get_item())
- polP_optimizer.zero_grad()
- lossP.backward() # perform backprop
- polP_optimizer.step()
- if config.use_paired:
- # get Antagonist loss
- ep_tot_rewardA, ep_lenA = run_trajectory(polA)
- lossA = get_episode_objective(polA, discount.get_item())
- polA_optimizer.zero_grad()
- lossA.backward() # perform backprop
- polA_optimizer.step()
- polA.set_requires_grad(False)
- polP.set_requires_grad(False)
- # Regret
- # get Protagonist loss
- ep_tot_rewardP, ep_lenP = run_trajectory(polP)
- lossP = get_episode_objective(polP, discount.get_tensor())
- # get Protagonist loss
- ep_tot_rewardA, ep_lenA = run_trajectory(polA)
- lossA = get_episode_objective(polA, discount.get_tensor())
- neg_regret = lossA - lossP # notice the signs flipped because here we talk about minimization objective
- # Take a gradient step with gamma
- discount.gd_step(neg_regret)
- # -------------------------------------------
- # neg_regret.backward() # perform backprop
- #
- # # take a step with polP to maximize -Regret (i.e, minimize +Regret)
- # polP.grad_step(lossP)
- #
- # # take a step with polA to maximize +Regret (i.e, minimize -Regret)
- # polA.grad_step(lossA)
- #
- #
- # ######################
- # # get Protagonist loss
- # ep_tot_rewardP, ep_lenP = run_trajectory(polP)
- # lossP = get_episode_AC_objective(polP, gamma)
- #
- # # get Protagonist loss
- # ep_tot_rewardA, ep_lenA = run_trajectory(polA)
- # lossA = get_episode_AC_objective(polA, gamma)
- ######################
- # # take an optimizer step with gamma to maximize +Regret (i.e, minimize -Regret)
- # gamma_optimizer.zero_grad()
- #
- # gamma_optimizer.step()
- # gamma = torch.clip(gamma, 0, 0.99)
- # update cumulative reward
- all_rewards.append(ep_tot_rewardP)
- gammas.append(discount.get_item())
- running_reward = 0.05 * ep_tot_rewardP + (1 - 0.05) * running_reward
- # log results
- if i_episode % args.log_interval == 0:
- print('Protagonist: Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
- i_episode, ep_tot_rewardP, running_reward))
- print(f'gamma = {discount.get_item():.4f} ')
- wandb.log({'Protagonist reward': np.mean(all_rewards)}, step=i_episode)
- wandb.log({'Gamma': np.mean(gammas)}, step=i_episode)
- gammas = []
- all_rewards = []
- # check if we have "solved" the cart pole problem
- # if running_reward > 0: #env.spec.reward_threshold:
- # print("Solved! Running reward is now {} and "
- # "the last episode runs to {} time steps!".format(running_reward, ep_lenP))
- # break
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement