Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import torch
- import numpy as np
- import torch.nn as nn
- import torch.nn.functional as F
- def normalized_columns_initializer(weights, std=1.0):
- out = torch.randn(weights.size())
- out *= std / torch.sqrt(out.pow(2).sum(1, keepdim=True).expand_as(out))
- return out
- def weights_init(m):
- classname =m.__class__.__name__
- if classname.find('Conv') != -1:
- weight_shape = list(m.weight.data.size())
- fan_in = np.prod(weight_shape[1:4])
- fan_out = np.prod(weight_shape[2:4]) * weight_shape[0]
- w_bound = np.sqrt(6. / (fan_in + fan_out))
- m.weight.data.uniform_(-w_bound, w_bound)
- m.bias.data.fill_(0)
- elif classname.find('Linear') != -1:
- weight_shape = list(m.weight.data.size())
- fan_in = weight_shape[1]
- fan_out = weight_shape[0]
- w_bound = np.sqrt(6. / (fan_in + fan_out))
- m.weight.data.uniform_(-w_bound, w_bound)
- m.bias.data.fill_(0)
- class LSTM_GA(nn.Module):
- def __init__(self, max_n_steps, vocab_size):
- super(LSTM_GA, self).__init__()
- # Image Processing
- self.conv1 = nn.Conv2d(3, 128, kernel_size=8, stride=4)
- self.conv2 = nn.Conv2d(128, 64, kernel_size=4, stride=2)
- self.conv3 = nn.Conv2d(64, 64, kernel_size=4, stride=2)
- # Instruction Processing
- self.gru_hidden_size = 256
- self.input_size = vocab_size
- self.embedding = nn.Embedding(self.input_size, 32)
- self.gru = nn.GRU(32, self.gru_hidden_size, batch_first=True)
- # Gated-Attention Layers
- self.attn_linear = nn.Linear(self.gru_hidden_size, 64)
- # Time-Embedding Layer, helps in stabilizing value prediction
- self.time_emb_dim = 32
- self.time_emb_layer = nn.Embedding(
- max_n_steps+1,
- self.time_emb_dim
- )
- # A2C-LSTM layers
- self.linear = nn.Linear(64*8*17, 256)
- self.lstm = nn.LSTMCell(256, 256)
- self.critic_linear = nn.Linear(256 + self.time_emb_dim, 1)
- self.actor_linear = nn.Linear(256 + self.time_emb_dim, 3)
- # Initializing weights
- self.apply(weights_init)
- self.actor_linear.weight.data = normalized_columns_initializer(
- self.actor_linear.weight.data, 0.01
- )
- self.actor_linear.bias.data.fill_(0)
- self.critic_linear.weight.data = normalized_columns_initializer(
- self.critic_linear.weight.data, 1.0
- )
- self.critic_linear.bias.data.fill_(0)
- self.lstm.bias_ih.data.fill_(0)
- self.lstm.bias_hh.data.fill_(0)
- # self.train()
- def _format(self, inputs):
- pass
- def forward(self, inputs):
- x, input_inst, (tx, hx, cx) = inputs
- # print(x.size(),)
- n_workers = x.size(0)
- # Get the image representation
- x = F.relu(self.conv1(x), inplace=False)
- x = F.relu(self.conv2(x), inplace=False)
- x_image_rep = F.relu(self.conv3(x), inplace=False)
- # Get the instruction representation
- # encoder_hidden = torch.zeros(1, 1, self.gru_hidden_size)
- ## Check here, we will be sending the entire embedded instruction matrix
- ## and not create it in a loop, we need to send padded instructions
- encoder_hidden = torch.zeros(1, n_workers, self.gru_hidden_size)
- embedded_instruction = self.embedding(input_inst)
- _, encoder_hidden = self.gru(embedded_instruction, encoder_hidden)
- # x_instr_rep = encoder_hidden.view(encoder_hidden.size(1), -1)
- x_instr_rep = encoder_hidden
- # Get the attention vector from the instruction representation
- x_attention = F.sigmoid(self.attn_linear(x_instr_rep))
- # Gated-Attention
- ## Need to change this as the number of instructions or environment has changed
- ## increased to n_workers
- x_attention = x_attention.squeeze(0).unsqueeze(2).unsqueeze(3)
- x_attention = x_attention.expand(n_workers, 64, 8, 17)
- assert x_image_rep.size() == x_attention.size()
- x = x_image_rep * x_attention
- x = x.view(x.size(0), -1)
- # A3C-LSTM
- x = F.relu(self.linear(x), inplace=False)
- new_hx, new_cx = self.lstm(x, (hx, cx))
- time_emb = self.time_emb_layer(tx)
- x = torch.cat((new_hx, time_emb.view(-1, self.time_emb_dim)), 1)
- return self.actor_linear(x), self.critic_linear(x), (new_hx, new_cx)
- def full_pass(self, inputs):
- '''
- inputs: combination of images_batch, instruction_batch, hx, cx, tx;
- every input is torch tensor of self.device type
- outputs:
- action: either scalar or numpy array depending on number of n_workers (Size: n_workers)
- is_exploratory: boolean array (Size: n_workers)
- logpa: torch tensor of self.device type
- entropy: torch tensor of self.device type
- value: torch tensor of self.device type
- hx: torch tensor of self.device type
- cx: torch tensor of self.device type
- '''
- logits, value, (hx, cx) = self.forward(inputs)
- dist = torch.distributions.Categorical(logits=logits)
- action = dist.sample()
- logpa = dist.log_prob(action).unsqueeze(-1)
- entropy = dist.entropy().unsqueeze(-1)
- action = action.item() if len(action) == 1 else action.detach().cpu().numpy()
- ## Check this once, look at axis
- is_exploratory = action != np.argmax(logits.detach().cpu().numpy(), axis=1)
- return action, is_exploratory, logpa, entropy, value, (hx, cx)
- def select_action(self, inputs):
- ## Check whether (hc, cx) is needed or not; looks like it is needed, but not sure
- ## take the output from forward method
- logits, _, (hx, cx) = self.forward(inputs)
- dist = torch.distributions.Categorical(logits=logits)
- action = dist.sample()
- action = action.item() if len(action) == 1 else action.detach().cpu().numpy()
- return action, (hx, cx)
- def select_greedy_action(self, inputs):
- ## Check whether (hc, cx) is needed or not; looks like it is needed, but not sure
- ## take the output from forward method
- logits, _, (hx, cx) = self.forward(inputs)
- return np.argmax(logits.detach().cpu().numpy()), (hx, cx)
- def evaluate_state(self, inputs):
- _, value, (hx, cx) = self.forward(inputs)
- return value, (hx, cx)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement