Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- PPO MLPlay Module
- This module implements a Proximal Policy Optimization (PPO) agent for Unity games
- using the MLGame3D framework and Unity ML-Agents PPO implementation.
- """
- import csv
- import os
- import time
- import numpy as np
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- from typing import Dict, Any, List
- # Check if CUDA is available
- device = (
- torch.device("cuda") if torch.cuda.is_available()
- else torch.device("cpu")
- )
- # Resume training configuration
- RESUME_TRAINING = False
- MODEL_SAVE_DIR = "./models/20250527_162128" if RESUME_TRAINING else f"./models/{time.strftime('%Y%m%d_%H%M%S')}"
- MODEL_LOAD_PATH = f"{MODEL_SAVE_DIR}/model_latest.pt"
- OPTIMIZER_LOAD_PATH = f"{MODEL_SAVE_DIR}/optimizer_latest.pt"
- # Built-in configuration
- TRAINING_MODE = True # Set to False to use pre-trained model
- # PPO hyperparameters
- LEARNING_RATE = 3e-4
- GAMMA = 0.99 # Discount factor
- GAE_LAMBDA = 0.95 # GAE parameter
- CLIP_RATIO = 0.2 # Lower clip ratio to encourage safer early updates
- VALUE_COEF = 0.5 # Value loss coefficient
- ENTROPY_COEF = 0.022 # Best: 0.03
- FINAL_ENTROPY_COEF = 0.003
- DECAY_RATE = 0.97
- MAX_GRAD_NORM = 0.5 # Gradient clipping
- UPDATE_EPOCHS = 6 # Number of iterations per update
- BUFFER_SIZE = 2048 # Experience buffer size
- BATCH_SIZE = 64 # Batch size
- UPDATE_FREQUENCY = 2048 # Update frequency
- SAVE_FREQUENCY = 5 # Save frequency (episodes)
- DECAY_STEPS = 100000
- # Model parameters
- HIDDEN_SIZE = 128
- # Reward weights
- REWARD_WEIGHTS = {
- "checkpoint": 1.5, # Passing checkpoint
- "progress": 2, # Moving toward goal
- "health": 0.5, # Health change
- "item_pickup": 0, # Picking up items
- "item_use": 0, # Using items
- "completion": 2 # Completing level
- }
- class ObservationProcessor:
- """Class for processing game observations"""
- def __init__(self):
- # 設定 dummy observation 用於初始化時計算大小
- dummy_obs = {
- "agent_position": [0.0, 0.0, 0.0],
- "target_position": [0.0, 0.0, 0.0],
- "agent_forward_direction": [0.0, 1.0],
- "terrain_grid": [[{"terrain_type": 0.0} for _ in range(5)] for _ in range(5)],
- "agent_health_normalized": 1.0,
- "last_checkpoint_index": -1.0,
- "agent_velocity": [0.0, 0.0],
- "other_players": [
- {"relative_position": [0.0, 0.0, 0.0]},
- {"relative_position": [0.0, 0.0, 0.0]},
- {"relative_position": [0.0, 0.0, 0.0]}
- ]
- }
- self.observation_size = self._calculate_observation_size(dummy_obs)
- print(f"Observation size calculated: {self.observation_size}")
- def process(self, observations: Dict[str, Any]) -> torch.Tensor:
- """
- Process observation data
- Args:
- observations: Game observation dictionary
- Returns:
- Processed observation tensor (dx, dz, forward_x, forward_z, 5x5 terrain)
- """
- # Processed observation includes: dx, dz, normalized distance, 5x5 terrain types, normalized mud positions
- flattened = self._flatten_observations(observations)
- return torch.tensor(flattened, dtype=torch.float32)
- def get_size(self) -> int:
- """Return the size of processed observations"""
- return self.observation_size
- def _flatten_observations(self, observations: Dict[str, Any]) -> List[float]:
- agent = observations["agent_position"]
- target = observations["target_position"]
- dx = target[0] - agent[0]
- dz = target[2] - agent[2]
- # Replace normalization with scaled dx, dz
- flattened = [dx / 30.0, dz / 30.0]
- # flattened.append(distance_to_target)
- agent_forward = np.array(observations["agent_forward_direction"])
- agent_forward_normalized = agent_forward / (np.linalg.norm(agent_forward) + 1e-6)
- flattened.extend(agent_forward_normalized.tolist())
- # Add normalized agent health
- # flattened.append(observations.get("agent_health_normalized", 1.0))
- # Add last checkpoint index
- # checkpoint_index = observations.get("last_checkpoint_index", -1.0)
- # normalized_checkpoint = checkpoint_index / 10.0
- # flattened.append(normalized_checkpoint)
- terrain_types = []
- for row in observations.get("terrain_grid", []):
- for cell in row:
- terrain_types.append(cell["terrain_type"])
- normalized_terrain = [terrain_type + 1 for terrain_type in terrain_types]
- flattened.extend(normalized_terrain)
- mud_positions = []
- for obj in observations.get("nearby_map_objects", []):
- if obj["object_type"] == 1.0: # mud
- dx, dz = obj["relative_position"]
- mud_positions.append(dx / 10.0)
- mud_positions.append(dz / 10.0)
- while len(mud_positions) < 10:
- mud_positions.append(1.1)
- flattened.extend(mud_positions)
- other_players_positions = []
- for player in observations.get("other_players", [])[:3]:
- dx = player["relative_position"][0] / 30.0
- dz = player["relative_position"][2] / 30.0
- other_players_positions.append(dx)
- other_players_positions.append(dz)
- flattened.extend(other_players_positions)
- # Add agent_velocity (normalized)
- # velocity = np.array(observations.get("agent_velocity", [0.0, 0.0]))
- # velocity_norm = np.linalg.norm(velocity)
- # normalized_velocity = (velocity / velocity_norm) if velocity_norm > 0 else np.array([0.0, 0.0])
- # flattened.extend(normalized_velocity.tolist())
- return flattened
- def _calculate_observation_size(self, sample_obs: Dict[str, Any]) -> int:
- """
- Calculate the size of the observation space
- Returns:
- Size of the flattened observation vector
- """
- obs = dict(sample_obs)
- obs.setdefault("agent_health_normalized", 1.0)
- obs.setdefault("last_checkpoint_index", -1.0)
- return len(self._flatten_observations(obs))
- class ActionProcessor:
- """Class for processing actions"""
- def __init__(self, action_space_info=None):
- self.action_space_info = action_space_info
- self.action_size = 2 # Output: 2 continuous actions (ax, az)
- def create_action(self, network_output):
- """
- Convert network output to game action
- Args:
- network_output: Neural network output tensor of shape (2,)
- network_output[:2] = mean of (ax, az)
- Returns:
- Tuple[List[float], List[int]]: continuous actions and fixed discrete actions
- """
- if network_output.dim() > 1:
- network_output = network_output.squeeze(0)
- continuous_action = network_output.cpu().numpy().astype(np.float32)
- # Discrete actions: fixed [0, 0]
- discrete_action = np.array([0, 0], dtype=np.int32)
- return continuous_action, discrete_action
- def get_size(self):
- """Return the size of the action space output"""
- return self.action_size # only 2 continuous outputs
- def _process_discrete_action(self, network_output):
- """Process discrete action from logits"""
- discrete_logits = network_output[-2:]
- discrete_probs = torch.sigmoid(discrete_logits)
- return [int(prob > 0.5) for prob in discrete_probs]
- class RewardCalculator:
- """Class for calculating rewards"""
- def __init__(self):
- self.reward_weights = REWARD_WEIGHTS
- self.prev_checkpoint_index = -1
- self.prev_distance_to_target = float('inf')
- self.prev_health = 0
- self.prev_inventory_count = 0
- def calculate(self, observations, reward, done, info, prev_observations):
- additional_reward = 0.0
- agent_pos = np.array(observations["agent_position"])
- target_pos = np.array(observations["target_position"])
- curr_dist = np.linalg.norm(agent_pos[[0, 2]] - target_pos[[0, 2]])
- prev_agent_pos = np.array(prev_observations["agent_position"])
- prev_dist = np.linalg.norm(prev_agent_pos[[0, 2]] - target_pos[[0, 2]])
- current_health = observations["agent_health_normalized"]
- prev_health = prev_observations["agent_health_normalized"]
- current_checkpoint = observations.get("last_checkpoint_index", -1)
- prev_checkpoint = prev_observations.get("last_checkpoint_index", -1)
- # Checkpoint reward
- if current_checkpoint > prev_checkpoint:
- additional_reward += self.reward_weights["checkpoint"]
- # Progress reward
- if prev_health > 0.0:
- progress = prev_dist - curr_dist
- additional_reward += self.reward_weights["progress"] * progress
- # Health reward
- # Death penalty
- if prev_health > 0.0 and current_health == 0.0:
- additional_reward += -2.0 * self.reward_weights["health"]
- # Complete reward
- if current_checkpoint == 1.0 and prev_checkpoint == 0.0:
- additional_reward += self.reward_weights["completion"]
- return additional_reward
- class ExperienceBuffer:
- """Experience buffer class"""
- def __init__(self, capacity, model):
- self.observations = []
- self.actions = []
- self.rewards = []
- self.dones = []
- self.next_observations = []
- self.action_log_probs = []
- self.values = []
- self.capacity = capacity
- self.model = model
- def add(self, observation, action, reward, done, next_observation, action_log_prob, value):
- """Add experience"""
- self.observations.append(observation)
- self.actions.append(action)
- self.rewards.append(reward)
- self.dones.append(done)
- self.next_observations.append(next_observation)
- self.action_log_probs.append(action_log_prob)
- self.values.append(value)
- def clear(self):
- """Clear buffer"""
- self.observations.clear()
- self.actions.clear()
- self.rewards.clear()
- self.dones.clear()
- self.next_observations.clear()
- self.action_log_probs.clear()
- self.values.clear()
- def get_batches(self, batch_size):
- """Get batch data"""
- indices = np.arange(len(self.observations))
- np.random.shuffle(indices)
- for start in range(0, len(indices), batch_size):
- end = start + batch_size
- batch_indices = indices[start:end]
- yield (
- [self.observations[i] for i in batch_indices],
- [self.actions[i] for i in batch_indices],
- [self.rewards[i] for i in batch_indices],
- [self.dones[i] for i in batch_indices],
- [self.next_observations[i] for i in batch_indices],
- [self.action_log_probs[i] for i in batch_indices],
- [self.values[i] for i in batch_indices],
- )
- def compute_advantages(self, gamma, lam):
- """Compute advantage function, returns (advantages, returns) (returns are not standardized)"""
- advantages = []
- returns = []
- gae = 0
- next_value = 0
- for t in reversed(range(len(self.rewards))):
- next_value = self.values[t + 1] if t + 1 < len(self.values) else 0
- delta = self.rewards[t] + gamma * next_value * (1 - self.dones[t]) - self.values[t]
- gae = delta + gamma * lam * (1 - self.dones[t]) * gae
- advantages.insert(0, gae)
- returns.insert(0, gae + self.values[t])
- return advantages, returns
- def __len__(self):
- return len(self.observations)
- class PPOModel(nn.Module):
- def __init__(self, observation_size, action_size):
- super(PPOModel, self).__init__()
- # Shared feature extractor
- self.feature_extractor = nn.Sequential(
- nn.Linear(observation_size, HIDDEN_SIZE),
- nn.Tanh(),
- nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE),
- nn.Tanh()
- )
- # if isinstance(action_size, tuple):
- # continuous_size, discrete_size = action_size
- # self.continuous_policy = nn.Sequential(
- # nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE),
- # nn.Tanh(),
- # nn.Linear(HIDDEN_SIZE, continuous_size * 2) # mean and log_std
- # )
- # self.discrete_policy = nn.Sequential(
- # nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE),
- # nn.Tanh(),
- # nn.Linear(HIDDEN_SIZE, discrete_size)
- # )
- # self.action_type = "hybrid"
- # elif isinstance(action_size, int):
- # if action_size > 10:
- # self.policy = nn.Sequential(
- # nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE),
- # nn.Tanh(),
- # nn.Linear(HIDDEN_SIZE, action_size * 2)
- # )
- # self.action_type = "continuous"
- # else:
- # self.policy = nn.Sequential(
- # nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE),
- # nn.Tanh(),
- # nn.Linear(HIDDEN_SIZE, action_size)
- # )
- # self.action_type = "discrete"
- self.policy = nn.Sequential(
- nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE),
- nn.Tanh(),
- nn.Linear(HIDDEN_SIZE, action_size * 2)
- )
- self.action_type = "continuous"
- self.value = nn.Sequential(
- nn.Linear(HIDDEN_SIZE, HIDDEN_SIZE),
- nn.Tanh(),
- nn.Linear(HIDDEN_SIZE, 1)
- )
- def forward(self, x):
- features = self.feature_extractor(x)
- if self.action_type == "hybrid":
- continuous_params = self.continuous_policy(features)
- discrete_logits = self.discrete_policy(features)
- value = self.value(features)
- return continuous_params, discrete_logits, value
- else:
- policy_output = self.policy(features)
- value = self.value(features)
- return policy_output, value
- def act(self, x):
- output, value = self.forward(x)
- if self.action_type == "continuous":
- mean, log_std = torch.chunk(output, 2, dim=-1)
- std = torch.exp(log_std.clamp(min=-2, max=0.7))
- dist = torch.distributions.Normal(mean, std)
- action = dist.sample()
- log_prob = dist.log_prob(action).sum(dim=-1)
- mean_list = mean.squeeze(0).tolist() if mean.dim() > 1 else mean.tolist()
- log_std_list = log_std.squeeze(0).tolist() if log_std.dim() > 1 else log_std.tolist()
- value_item = value.item() if isinstance(value, torch.Tensor) else value
- dx_dz_str = ""
- if x is not None and x.size(-1) >= 2:
- dx = x[0].item() if x.dim() == 1 else x[0, 0].item()
- dz = x[1].item() if x.dim() == 1 else x[0, 1].item()
- dx_dz_str = f",{dx:.6f},{dz:.6f}"
- with open("act_debug.csv", "a") as f:
- f.write(",".join([f"{x:.6f}" for x in mean_list + log_std_list]) + f",{value_item:.6f}{dx_dz_str}\n")
- return action, log_prob, value
- else:
- dist = torch.distributions.Categorical(logits=output)
- action = dist.sample()
- log_prob = dist.log_prob(action)
- return action, log_prob, value.squeeze(-1)
- def evaluate_actions(self, x, actions):
- output, value = self.forward(x)
- if self.action_type == "continuous":
- mean, log_std = torch.chunk(output, 2, dim=-1)
- std = torch.exp(log_std.clamp(min=-2, max=0.7))
- dist = torch.distributions.Normal(mean, std)
- log_probs = dist.log_prob(actions).sum(dim=-1)
- entropy = dist.entropy().sum(-1)
- elif self.action_type == "discrete":
- dist = torch.distributions.Categorical(logits=output)
- log_probs = dist.log_prob(actions)
- entropy = dist.entropy()
- else: # hybrid
- continuous_params, discrete_logits, _ = output
- mean, log_std = torch.chunk(continuous_params, 2, dim=-1)
- std = torch.exp(log_std.clamp(-2, 0.7))
- continuous_dist = torch.distributions.Normal(mean, std)
- continuous_actions, discrete_actions = actions
- continuous_log_probs = continuous_dist.log_prob(continuous_actions).sum(-1)
- continuous_entropy = continuous_dist.entropy().sum(-1)
- discrete_dist = torch.distributions.Categorical(logits=discrete_logits)
- discrete_log_probs = discrete_dist.log_prob(discrete_actions)
- discrete_entropy = discrete_dist.entropy()
- log_probs = continuous_log_probs + discrete_log_probs
- entropy = continuous_entropy + discrete_entropy
- return log_probs, entropy, value
- def save(self, path):
- os.makedirs(os.path.dirname(path), exist_ok=True)
- torch.save(self.state_dict(), path)
- def load(self, path):
- if os.path.exists(path):
- try:
- self.load_state_dict(torch.load(path, map_location=device))
- print(f"Model loaded from {path}")
- return True
- except RuntimeError as e:
- print(f"Error loading model from {path}: {e}")
- return False
- else:
- print(f"Model file not found: {path}")
- return False
- class MLPlay:
- """
- MLPlay class using PPO algorithm
- This class implements the PPO algorithm, which can train models during gameplay or use pre-trained models.
- """
- def __init__(self, action_space_info=None):
- """
- Initialize MLPlay instance
- Args:
- action_space_info: Action space information
- """
- # Set name
- self.name = "PPO_MLPlay"
- # Initialize components
- self.observation_processor = ObservationProcessor()
- self.action_processor = ActionProcessor(action_space_info)
- self.reward_calculator = RewardCalculator()
- # Training mode
- self.training_mode = TRAINING_MODE
- # Initialize state
- self.real_prev_observations = None
- self.prev_observations = None
- self.prev_action = None
- self.prev_action_log_prob = None
- self.episode_rewards = []
- self.total_steps = 0
- self.accumulate_steps = 0
- self.episode_count = 0
- # Create model directory
- os.makedirs(MODEL_SAVE_DIR, exist_ok=True)
- # Wait for first observation to initialize model
- self.model = None
- self.optimizer = None
- self.experience_buffer = None
- if not RESUME_TRAINING:
- for fname, header in [
- ("reward_log.csv", "episode,reward,policy_loss,value_loss,entropy,advantage,advantage_std\n"),
- ("log_std_monitor.csv", "policy_loss,value_loss,entrophy_loss,entrophy_coef\n"),
- ("value_debug.csv", "step,value_mean,target_mean,value_loss,grad_norm\n"),
- ("act_debug.csv", "episode,mean_x,mean_z,log_std_x,log_std_z,value,dx,dz\n")
- ]:
- with open(fname, "w") as f:
- f.write(header)
- print(f"PPO MLPlay initialized (Training mode: {self.training_mode})")
- def reset(self):
- """Reset MLPlay instance"""
- self.prev_observations = None
- self.prev_action = None
- self.prev_action_log_prob = None
- self.episode_rewards = []
- def update(self,
- observations: Dict[str, np.ndarray],
- done: bool = False,
- info: Dict[str, Any] = None) -> np.ndarray:
- """
- Process observations and return actions
- """
- try:
- raw_obs = observations
- if observations["current_time_normalized"] >= 1.0:
- self.real_prev_observations = observations
- self.prev_observations = None
- return np.array([0.0, 0.0], dtype=np.float32), np.array([0, 0], dtype=np.int32)
- if observations["last_checkpoint_index"] == 1.0:
- if self.real_prev_observations is not None and self.real_prev_observations["last_checkpoint_index"] == 1.0:
- self.real_prev_observations = observations
- self.prev_observations = None
- return np.array([0.0, 0.0], dtype=np.float32), np.array([0, 0], dtype=np.int32)
- if self.real_prev_observations is not None:
- prev_pos = self.real_prev_observations["agent_position"]
- prev_hp = self.real_prev_observations["agent_health_normalized"]
- curr_pos = raw_obs["agent_position"]
- curr_hp = raw_obs["agent_health_normalized"]
- if (prev_hp == 0.0 and curr_hp == 0.0) or (curr_pos[1] > 4) or (prev_pos[0] == curr_pos[0] and prev_pos[2] == curr_pos[2] and prev_pos[1] > 1.4):
- self.real_prev_observations = raw_obs
- return np.array([0.0, 0.0], dtype=np.float32), np.array([0, 0], dtype=np.int32)
- current_obs = self.observation_processor.process(raw_obs)
- if self.model is None:
- # Initialize model once we know input/output sizes
- self._initialize_model(self.observation_processor.get_size(), self.action_processor.get_size())
- with torch.no_grad():
- action_tensor, log_prob, value = self.model.act(current_obs)
- action, discrete_action = self.action_processor.create_action(action_tensor)
- if self.prev_observations is not None and self.training_mode:
- shaped_reward = self.reward_calculator.calculate(
- raw_obs, reward=0.0, done=done, info=info, prev_observations=self.prev_observations
- )
- self.experience_buffer.add(
- observation=self.observation_processor.process(self.prev_observations),
- action=self.prev_action,
- reward=shaped_reward,
- done=done,
- next_observation=current_obs,
- action_log_prob=self.prev_action_log_prob.detach() if self.prev_action_log_prob is not None else None,
- value=value.item()
- )
- self.episode_rewards.append(shaped_reward)
- self.total_steps += 1
- self.accumulate_steps += 1
- print(f"[Step {self.total_steps}] reward: {shaped_reward}, action_tensor: {action_tensor.numpy()}, log_prob: {log_prob.item():.4f}")
- if self.total_steps % UPDATE_FREQUENCY == 0:
- self._update_policy()
- self._save_model()
- self.experience_buffer.clear()
- self.real_prev_observations = raw_obs
- self.prev_observations = raw_obs
- self.prev_action = action_tensor
- self.prev_action_log_prob = log_prob
- # print(f"[Step {self.total_steps}] direction: {action}")
- return action, discrete_action
- except Exception as e:
- import traceback
- traceback.print_exc()
- return np.array([0.0, 0.0], dtype=np.float32), np.array([0, 0], dtype=np.int32)
- def _initialize_model(self, observation_size, action_size):
- """Initialize model and related components"""
- self.model = PPOModel(observation_size, action_size).to(device)
- self.base_learning_rate = LEARNING_RATE
- self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.base_learning_rate)
- self.experience_buffer = ExperienceBuffer(BUFFER_SIZE, self.model)
- if RESUME_TRAINING and os.path.exists(MODEL_LOAD_PATH) and os.path.exists(OPTIMIZER_LOAD_PATH):
- loaded = self.model.load(MODEL_LOAD_PATH)
- checkpoint = torch.load(OPTIMIZER_LOAD_PATH, map_location=device)
- self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
- self.total_steps = (checkpoint.get('episode', 0) + 1) * UPDATE_FREQUENCY
- self.episode_count = checkpoint.get('episode', 0)
- elif not self.training_mode:
- loaded = self.model.load(MODEL_LOAD_PATH)
- if not loaded:
- print(f"Failed to load model from {MODEL_LOAD_PATH}. Will run with untrained model.")
- def _update_policy(self):
- """Update PPO policy"""
- decay_factor = DECAY_RATE ** (self.total_steps / UPDATE_FREQUENCY)
- entropy_coef = max(FINAL_ENTROPY_COEF, ENTROPY_COEF * decay_factor)
- # decay_ratio = self.total_steps / DECAY_STEPS
- # entropy_coef = ENTROPY_COEF - (ENTROPY_COEF - FINAL_ENTROPY_COEF) * decay_ratio
- advantages, returns = self.experience_buffer.compute_advantages(GAMMA, GAE_LAMBDA)
- advantages = torch.tensor(advantages, dtype=torch.float32).to(device)
- returns = torch.tensor(returns, dtype=torch.float32).to(device)
- # Initialize accumulators for logging
- total_policy_loss = 0
- total_value_loss = 0
- total_entropy = 0
- batch_count = 0
- for _ in range(UPDATE_EPOCHS):
- for batch in self.experience_buffer.get_batches(BATCH_SIZE):
- obs_b, act_b, rew_b, done_b, next_obs_b, logp_b, val_b = batch
- obs_b = torch.stack(obs_b).to(device)
- act_b = torch.stack(act_b).to(device)
- logp_b = torch.stack(logp_b).to(device)
- val_b = torch.tensor(val_b, dtype=torch.float32).to(device)
- adv_b = advantages[:len(obs_b)]
- adv_b = (adv_b - adv_b.mean()) / (adv_b.std() + 1e-8)
- vt_b = returns[:len(obs_b)]
- new_log_probs, entropy, values = self.model.evaluate_actions(obs_b, act_b)
- ratio = torch.exp(new_log_probs - logp_b)
- surr1 = ratio * adv_b
- surr2 = torch.clamp(ratio, 1.0 - CLIP_RATIO, 1.0 + CLIP_RATIO) * adv_b
- policy_loss = -torch.min(surr1, surr2).mean()
- value_loss = F.mse_loss(values.squeeze(-1), vt_b.squeeze(-1))
- entropy_loss = entropy.mean()
- loss = policy_loss + VALUE_COEF * value_loss - entropy_coef * entropy_loss
- with open("log_std_monitor.csv", "a") as f:
- f.write(
- f"{self.episode_count},{policy_loss.item():.7f},{value_loss.item():.7f},{entropy_loss.item():.7f},{entropy_coef:.7f}\n"
- )
- self.optimizer.zero_grad()
- torch.autograd.set_detect_anomaly(True)
- loss.backward()
- critic_last_layer = self.model.value[-1]
- grad_norm = critic_last_layer.weight.grad.norm().item() if critic_last_layer.weight.grad is not None else 0.0
- nn.utils.clip_grad_norm_(self.model.parameters(), MAX_GRAD_NORM)
- self.optimizer.step()
- total_policy_loss += policy_loss.item()
- total_value_loss += value_loss.item()
- total_entropy += entropy_loss.item()
- batch_count += 1
- with open("value_debug.csv", "a") as f:
- f.write(f"{self.total_steps},{values.mean().item():.4f},{vt_b.mean().item():.4f},{value_loss.item():.4f},{grad_norm:.6f}\n")
- # Log statistics and total reward of latest episode to reward_log.csv
- total_reward = sum(self.episode_rewards)
- if batch_count > 0:
- avg_policy_loss = total_policy_loss / batch_count
- avg_value_loss = total_value_loss / batch_count
- avg_entropy = total_entropy / batch_count
- else:
- avg_policy_loss = 0.0
- avg_value_loss = 0.0
- avg_entropy = 0.0
- avg_advantage = advantages.mean().item()
- advantage_std = advantages.std().item()
- with open("reward_log.csv", "a") as f:
- f.write(
- f"{self.episode_count},{total_reward:.2f},{avg_policy_loss:.4f},{avg_value_loss:.4f},{avg_entropy:.6f},{avg_advantage:.4f},{advantage_std:.4f}\n"
- )
- self.episode_rewards = []
- self.episode_count += 1
- def _save_model(self):
- """Save model"""
- if self.model is not None:
- # Save latest model
- self.model.save(f"{MODEL_SAVE_DIR}/model_latest.pt")
- self.model.save(f"{MODEL_SAVE_DIR}/model_{self.episode_count}.pt")
- torch.save({
- 'optimizer_state_dict': self.optimizer.state_dict(),
- 'step': self.total_steps,
- 'episode': self.episode_count
- }, f"{MODEL_SAVE_DIR}/optimizer_latest.pt")
- torch.save({
- 'optimizer_state_dict': self.optimizer.state_dict(),
- 'step': self.total_steps,
- 'episode': self.episode_count
- }, f"{MODEL_SAVE_DIR}/optimizer_{self.episode_count}.pt")
Add Comment
Please, Sign In to add comment