Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import random
- import torch
- from torch import nn
- from torch.autograd import Variable
- import numpy as np
- import matplotlib.pyplot as plt
- torch.manual_seed(1) # reproducible
- # Hyper Parameters
- TIME_STEP = 10 # rnn time step
- INPUT_SIZE = 1 # rnn input size
- LR = 0.02 # learning rate
- class RNN(nn.Module):
- def __init__(self):
- super(RNN, self).__init__()
- self.hidden_size = 32
- self.rnn = nn.RNN(
- input_size=INPUT_SIZE,
- hidden_size=self.hidden_size, # rnn hidden unit
- num_layers=1, # number of rnn layer
- batch_first=True, # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
- nonlinearity="tanh"
- )
- # self.out = nn.Linear(64, 1)
- self.layer_sizes = [self.hidden_size, 32, 32, 1]
- D_in, H1, H2, D_out = self.layer_sizes
- # print(D_in, H1, D_out)
- self.linear1 = nn.Linear(D_in, H1)
- self.activation1 = nn.Tanh()
- self.linear2 = nn.Linear(H1, H2)
- self.activation2 = nn.Tanh()
- self.linear3 = nn.Linear(H2, D_out)
- self.activation3 = nn.Tanh()
- def out(self, x):
- x = self.linear1(x)
- x = self.activation1(x)
- # print(x.size())
- x = self.linear2(x)
- x = self.activation2(x)
- # print(x.size())
- x = self.linear3(x)
- x = self.activation3(x)
- # print(x.size())
- return x
- def forward(self, x, h_state):
- # x (batch, time_step, input_size)
- # h_state (n_layers, batch, hidden_size)
- # r_out (batch, time_step, hidden_size)
- r_out, h_state = self.rnn(x, h_state)
- print("x", x.shape)
- print("h_state", h_state.shape)
- print("r_out", r_out.shape)
- r_out = r_out.view(-1, self.hidden_size)
- y = self.out(r_out)
- print("output", y.shape)
- return y, h_state
- def sample(x):
- y = np.sin(x) / 2
- return y
- def train_realtime(model, loss_fn, optimizer, n_steps=40):
- h_state = None # for initial hidden state
- for step in range(n_steps):
- start, end = step*np.pi*0.8, (step+1)*np.pi*0.8 # time range
- steps = np.linspace(start, end, TIME_STEP, dtype=np.float32)
- print(start, end)
- print(steps)
- x_np = steps # float32 for converting torch FloatTensor
- y_np = sample(x_np)
- print("y", y_np.shape)
- # print(x_np[np.newaxis, :, np.newaxis])
- x = Variable(torch.from_numpy(x_np[np.newaxis, :, np.newaxis])) # shape (batch, time_step, input_size)
- y = Variable(torch.from_numpy(y_np[np.newaxis, :, np.newaxis]))
- prediction, h_state = model(x, h_state) # rnn output
- # !! next step is important !!
- h_state = Variable(h_state.data) # repack the hidden state, break the connection from last iteration
- loss = loss_fn(prediction, y) # loss
- optimizer.zero_grad() # clear gradients for this training step
- loss.backward() # backpropagation, compute gradients
- optimizer.step() # apply gradients
- # plotting
- plt.plot(steps, y_np.flatten(), 'r-')
- plt.plot(steps, prediction.data.numpy().flatten(), 'b-')
- plt.draw()
- plt.pause(0.05)
- plt.ioff()
- plt.show()
- def test(model, n_steps=40):
- h_state = None # for initial hidden state
- for step in range(n_steps):
- start, end = step*np.pi*0.8, (step+1)*np.pi*0.8 # time range
- steps = np.linspace(start, end, TIME_STEP, dtype=np.float32)
- print(start, end)
- print(steps)
- x_np = steps # float32 for converting torch FloatTensor
- y_np = sample(x_np)
- print("y", y_np.shape)
- # print(x_np[np.newaxis, :, np.newaxis])
- x = Variable(torch.from_numpy(x_np[np.newaxis, :, np.newaxis])) # shape (batch, time_step, input_size)
- y = Variable(torch.from_numpy(y_np[np.newaxis, :, np.newaxis]))
- prediction, h_state = model(x, h_state) # rnn output
- # !! next step is important !!
- h_state = Variable(h_state.data) # repack the hidden state, break the connection from last iteration
- # plotting
- plt.plot(steps, y_np.flatten(), 'r-')
- plt.plot(steps, prediction.data.numpy().flatten(), 'b-')
- plt.draw()
- plt.pause(0.05)
- plt.ioff()
- plt.show()
- if __name__ == "__main__":
- rnn = RNN()
- print(rnn)
- optimizer = torch.optim.Adam(rnn.parameters(), lr=LR) # optimize all rnn parameters
- loss_fn = nn.MSELoss()
- plt.figure(1, figsize=(12, 5))
- plt.ion() # continuously plot
- train_realtime(model=rnn,
- loss_fn=loss_fn,
- optimizer=optimizer)
- test(model=rnn)
Add Comment
Please, Sign In to add comment