Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import torch
- import torch.nn as nn
- from torch.autograd import Variable
- import matplotlib.pyplot as plt
- import numpy as np
- BATCH_SIZE = 1
- INPUT_DIM = 1
- OUTPUT_DIM = 1
- DTYPE = np.float64
- TIMESTEPS = 50
- class Net(nn.Module):
- def __init__(self, input_dim, hidden_dim, output_dim, hidden_layers):
- super(Net, self).__init__()
- self.input_dim = input_dim
- self.hidden_dim = hidden_dim
- self.output_dim = output_dim
- self.hidden_layers = hidden_layers
- self.rnn = nn.RNN(input_dim, hidden_dim, hidden_layers)
- self.h2o = nn.Linear(hidden_dim, output_dim)
- def forward(self, x, hidden):
- output, hidden = self.rnn(x, hidden)
- output = self.h2o(output)
- return output, hidden
- def init_hidden(self, batch_size):
- h_0 = Variable(torch.zeros(self.hidden_layers, batch_size, self.hidden_dim)).cuda()
- if DTYPE == np.float32:
- return h_0.float()
- else:
- return h_0.double()
- def weights_init(m):
- if isinstance(m, nn.RNN):
- nn.init.xavier_uniform(m.weight_ih_l0.data)
- nn.init.orthogonal(m.weight_hh_l0.data)
- nn.init.constant(m.bias_ih_l0.data, 0)
- nn.init.constant(m.bias_hh_l0.data, 0)
- if isinstance(m, nn.Linear):
- nn.init.xavier_uniform(m.weight.data)
- nn.init.constant(m.bias.data, 0)
- def pad_sequence(sequence, timesteps):
- if sequence.shape[0] % timesteps == 0:
- return sequence, timesteps
- else:
- pad = timesteps - (sequence.shape[0] % timesteps)
- return np.pad(sequence, ((0, pad), (0, 0)), 'constant'), pad
- data = np.loadtxt('data/mg17.csv', delimiter=',', dtype=DTYPE)
- X_data = data[:, [0]]
- Y_data = data[:, [1]]
- trX_data = X_data[:4000, :]
- trY_data = Y_data[:4000, :]
- vlX = torch.from_numpy(np.expand_dims(X_data[4000:5000, :], axis=1)).cuda()
- vlY = torch.from_numpy(np.expand_dims(Y_data[4000:5000, :], axis=1)).cuda()
- if np.isnan(TIMESTEPS):
- trX = torch.from_numpy(np.expand_dims(trX_data, axis=1)).cuda()
- trY = torch.from_numpy(np.expand_dims(trY_data, axis=1)).cuda()
- tr_seq_lengths = [trX.shape[0]]
- else:
- trX, _ = pad_sequence(trX_data, TIMESTEPS)
- trX = torch.from_numpy(np.reshape(trX, (TIMESTEPS, -1, INPUT_DIM))).cuda()
- trY, tr_pad = pad_sequence(trY_data, TIMESTEPS)
- trY = torch.from_numpy(np.reshape(trY, (TIMESTEPS, -1, INPUT_DIM))).cuda()
- tr_seq_lengths = np.full((trX.shape[1],), TIMESTEPS, dtype=np.int)
- tr_seq_lengths[-1] = tr_pad
- loss_fcn = nn.MSELoss()
- for r in range(5):
- model = Net(INPUT_DIM, 10, OUTPUT_DIM, 1).cuda()
- if DTYPE == np.float32:
- model = model.float()
- else:
- model = model.double()
- model.apply(weights_init)
- optimizer = torch.optim.Adam(model.parameters(), lr=0.01, eps=2e-16)
- epochs = 2000
- tr_loss_plt = np.zeros((epochs,))
- vl_loss_plt = np.zeros((epochs,))
- for e in range(epochs):
- hidden = model.init_hidden(BATCH_SIZE)
- model.train()
- tot_loss = 0
- for seq in range(trX.shape[1]):
- x = Variable(trX[:tr_seq_lengths[seq], [seq], :])
- y = Variable(trY[:tr_seq_lengths[seq], [seq], :])
- hidden.detach_()
- model.zero_grad()
- output, hidden = model(x, hidden)
- loss = loss_fcn(output, y)
- loss.backward()
- optimizer.step()
- tot_loss += loss.cpu().data
- tot_loss /= trX.shape[1]
- tr_loss_plt[e] = tot_loss
- model.eval()
- hidden = model.init_hidden(BATCH_SIZE)
- x = Variable(vlX)
- y = Variable(vlY)
- output, _ = model(x, hidden)
- loss = loss_fcn(output, y)
- vl_loss_plt[e] = loss.cpu().data
- # print("Epoch", e + 1, "TR:", tr_loss_plt[e], "VL:", vl_loss_plt[e])
- # plt.clf()
- # plt.plot(tr_loss_plt)
- # plt.plot(vl_loss_plt)
- # plt.xlabel("epoch")
- # plt.ylabel("loss")
- # plt.legend(["TR", "VL"])
- # plt.savefig("pytorch-mg-rnn-trunk("+str(r)+").png")
- print("TR:", tr_loss_plt[-1], "VL:", vl_loss_plt[-1])
Advertisement
RAW Paste Data
Copied
Advertisement