Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pickle
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.autograd import Variable
- import sklearn
- from sklearn.feature_extraction.text import CountVectorizer
- from sklearn.model_selection import train_test_split
- from collections import Counter
- torch.manual_seed(2)
- CUDA_LAUNCH_BLOCKING=1
- torch.backends.cudnn.enabled=False
- class ConvNet(nn.Module):
- def __init__(self):
- super(ConvNet, self).__init__()
- V = 50000
- D = 300
- C = 1
- Ci = 1
- Co = 100
- Ks = [3,4,5,5,5]
- self.embed = nn.Embedding(V, D)
- # self.convs1 = [nn.Conv2d(Ci, Co, (K, D)) for K in Ks]
- self.convs1 = nn.ModuleList([nn.Conv2d(Ci, Co, (K, D)) for K in Ks])
- '''
- self.conv13 = nn.Conv2d(Ci, Co, (3, D))
- self.conv14 = nn.Conv2d(Ci, Co, (4, D))
- self.conv15 = nn.Conv2d(Ci, Co, (5, D))
- '''
- self.dropout = nn.Dropout(0.2)
- self.fc1 = nn.Linear(len(Ks)*Co, C)
- def conv_and_pool(self, x, conv):
- x = F.relu(conv(x)).squeeze(3) # (N, Co, W)
- x = F.max_pool1d(x, x.size(2)).squeeze(2)
- return x
- def forward(self, x):
- x = self.embed(x) # (N, W, D)
- x = x.unsqueeze(1) # (N, Ci, W, D)
- x = [F.relu(conv(x)).squeeze(3) for conv in self.convs1] # [(N, Co, W), ...]*len(Ks)
- x = [F.max_pool1d(i, i.size(2)).squeeze(2) for i in x] # [(N, Co), ...]*len(Ks)
- x = torch.cat(x, 1)
- '''
- x1 = self.conv_and_pool(x,self.conv13) #(N,Co)
- x2 = self.conv_and_pool(x,self.conv14) #(N,Co)
- x3 = self.conv_and_pool(x,self.conv15) #(N,Co)
- x = torch.cat((x1, x2, x3), 1) # (N,len(Ks)*Co)
- '''
- x = self.dropout(x) # (N, len(Ks)*Co)
- logit = self.fc1(x) # (N, C)
- return logit
- class LSTMClassifier(nn.Module):
- def __init__(self, embedding_dim, hidden_dim, vocab_size, use_gpu, batch_size, dropout=0.3, num_layers=5):
- super(LSTMClassifier, self).__init__()
- self.hidden_dim = hidden_dim
- self.use_gpu = use_gpu
- self.batch_size = batch_size
- self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim, dropout=dropout, num_layers=num_layers)
- self.hidden2label = nn.Linear(hidden_dim, 1)
- self.num_layers = num_layers
- self.hidden = self.init_hidden()
- self.normalization = nn.BatchNorm1d(embedding_dim)
- def init_hidden(self):
- # first is the hidden h
- # second is the cell c
- if self.use_gpu:
- return (torch.zeros(self.num_layers, self.batch_size, self.hidden_dim, dtype = torch.float32),
- torch.zeros(self.num_layers, self.batch_size, self.hidden_dim, dtype = torch.float32))
- else:
- return (torch.zeros(self.num_layers, self.batch_size, self.hidden_dim, dtype = torch.float32),
- torch.zeros(self.num_layers, self.batch_size, self.hidden_dim, dtype = torch.float32))
- def forward(self, review):
- # x = self.embeddings(review).view(review.shape[1], self.batch_size, -1)
- review = self.normalization(review)
- lstm_out, self.hidden = self.lstm(review.view(1, self.batch_size, -1), self.hidden)
- # y = self.hidden2label(lstm_out[-1])
- bruh = self.hidden[0].transpose(0, 1).contiguous().view(self.batch_size, -1)
- out=self.hidden2label(bruh)
- return out
- # return y
- def computeAccuracy(scores, Y, silent=False):
- totalError = 0
- percentageError = 0
- for i in range(len(scores)):
- # guess = torch.argmax(scores[i])
- if(not silent):
- print("Guess: ", scores[i].item())
- print("True value: ", Y[i].item())
- # if (Y[i] == guess):
- # correctGuesses +=1
- totalError += (Y[i]-scores[i])**2
- percentageError += (abs(Y[i]-scores[i])/max(Y[i], 0.1))
- accuracy = totalError/len(scores)
- # print("correctGuesses", correctGuesses)
- return accuracy, percentageError/len(scores)
- def start(batch_size = 100, hidden_dim = 100, n_epochs = 10,
- num_layers=1, lr=0.01, weight_decay=0.95,momentum=0.9,
- dropout=0, trainingExamples = 3000, testExamples=400):
- data, vocabLen = pickle.load(open("vectorizedReviews.pkl", "rb"))
- embedding_dim = 500
- # # Create your dictionary that maps vocab words to integers here
- training_data, test_data = train_test_split(data, test_size=0.2)
- # print(training_data)
- # tmp = []
- # for i in range(len(X)):
- # tmp.append([X[i], Y[i]])
- # print(tmp)
- # print(X)
- # DETTA VAR SÃ… VI GJORDE INNAN. Pre-Github
- # vectorizer = CountVectorizer(analyzer = "word", \
- # tokenizer = None, \
- # preprocessor = None, \
- # stop_words = "english", \
- # max_features = 1000)
- #
- #atom://teletype/portal/32456f34-0df8-4ba8-a2fb-95d928e6c81a
- Xtrain = Variable(torch.zeros(trainingExamples, training_data[0]["content"].shape[0], dtype=torch.long))
- # Xtrain = Variable(10*torch.randn(trainingExamples,1,dtype=torch.long))
- Xtest = Variable(torch.zeros(testExamples, test_data[0]["content"].shape[0], dtype=torch.long))
- # Xtest = Variable(10*torch.randn(testExamples,1,dtype=torch.long))
- Ytrain = Variable(torch.zeros(trainingExamples, dtype=torch.float32))
- # Ytrain = Variable(Xtrain*Xtrain)
- Ytest = Variable(torch.zeros(testExamples, dtype=torch.float32))
- # Ytest = Variable(Xtest*Xtest)
- for i in range(trainingExamples):
- Xtrain[i] = torch.from_numpy(training_data[i]["content"])
- Ytrain[i] = training_data[i]["score"]
- for i in range(testExamples):
- Xtest[i] = torch.from_numpy(test_data[i]["content"])
- Ytest[i] = test_data[i]["score"]
- # Xtrain = vectorizer.fit_transform(textDataTrain).toarray()
- if (torch.cuda.is_available()):
- lstm = LSTMClassifier(embedding_dim, hidden_dim, vocabLen+5, True, batch_size, num_layers=num_layers, dropout=dropout)
- else:
- # lstm = LSTMClassifier(embedding_dim, hidden_dim, vocabLen+5, False, batch_size, num_layers=num_layers, dropout=dropout)
- lstm = ConvNet()
- lossFunction = nn.MSELoss()
- optimizer = torch.optim.Adam(lstm.parameters(), lr = lr, weight_decay = weight_decay)
- # trainingData = torch.zeros(round(X.shape[0]/batch_size), batch_size, X.shape[1])
- # Xtrain = Xtrain.view(Xtrain.shape[0], 1, Xtrain.shape[1])
- # Xtest = Xtest.view(Xtest.shape[0], 1, Xtest.shape[1])
- lstm.train(True)
- for epoch in range(n_epochs):
- print(epoch)
- lossSum=0
- testXBatch = Variable(Xtest[0:batch_size])
- testYBatch = Variable(Ytest[0:batch_size])
- for i in range(round(len(Xtrain)/batch_size)-1):
- i_start = i*batch_size
- i_end=(i+1)*batch_size
- Xbatch = Variable(Xtrain[i_start:i_end, :])
- Ybatch = Variable(Ytrain[i_start:i_end])
- # lstm.hidden = lstm.init_hidden()
- scores = lstm(Xbatch)
- loss = lossFunction(scores, Ybatch.view(len(Ybatch), 1))
- testLoss = lossFunction(lstm(testXBatch), testYBatch.view(len(testYBatch), 1))
- lossSum+=testLoss
- lstm.zero_grad()
- loss.backward()
- optimizer.step()
- print(lossSum)
- # lstm.train(False)
- lstm.eval()
- accuracy = []
- baseLineAccuracy = []
- percentageError = []
- baseLinePercentageError = []
- meanScore = torch.Tensor.mean(Ytrain).item()
- with torch.no_grad():
- for i in range(round(len(Xtest)/batch_size)-1):
- print(i)
- i_start = i*batch_size
- i_end=(i+1)*batch_size
- Xbatch = Xtest[i_start:i_end, :]
- Ybatch = Ytest[i_start:i_end]
- scores = lstm(Xbatch)
- baseLineScores = meanScore*torch.ones(batch_size,1)
- result = computeAccuracy(scores, Ybatch.view(len(Ybatch), 1))
- baseLineResult = computeAccuracy(baseLineScores, Ybatch.view(len(Ybatch), 1), silent=True)
- accuracy.append(result[0])
- baseLineAccuracy.append(baseLineResult[0])
- percentageError.append(result[1])
- baseLinePercentageError.append(baseLineResult[1])
- totalAccuracy = sum(accuracy)/len(accuracy)
- baseLineTotalAccuracy = sum(baseLineAccuracy)/len(baseLineAccuracy)
- print(len(percentageError))
- totalPercentageAccuracy = sum(percentageError)/len(percentageError)
- baseLineTotalPercentageAccuracy = sum(baseLinePercentageError)/len(baseLinePercentageError)
- # print(accuracy)
- print("totalAccuracy:", totalAccuracy.item())
- print("BaselineAccuracy", baseLineTotalAccuracy.item())
- print("average percentage error", totalPercentageAccuracy.item())
- print("base line percentage error", baseLineTotalPercentageAccuracy.item())
- return totalAccuracy.item()
- # def start(batch_size = 100, hidden_dim = 120, n_epochs = 10, num_layers=1, lr=0.01, weight_decay=0.09,momentum=0.9, dropout=0, testexampel, tarining):
- def main():
- trainingExamples = 80
- testExamples = 40
- batch_size = 20
- hidden_dim = 4
- n_epochs = 10
- num_layers = 1
- lr = 0.0005
- momentum = 0.95
- weight_decay = 0
- file = open("cross-search.txt", "a")
- for _ in range(9,0 ,-1):
- # momentum *=0.1
- for _ in range(0, 10, 1):
- # weight_decay*=0.1
- for _ in range(1,10,1):
- # lr *= 0.001
- accuracy = start(trainingExamples=trainingExamples, testExamples=testExamples, batch_size=batch_size, hidden_dim=hidden_dim,
- n_epochs=n_epochs, num_layers=num_layers, lr=lr, weight_decay=weight_decay, momentum=momentum)
- print("________________________________")
- print("hidden Dims: ",hidden_dim)
- print("num_layers", num_layers)
- print("trainingExamples: " ,trainingExamples)
- print("testExamples: " ,testExamples)
- print("batch_size: " ,batch_size)
- print("n_epochs: " ,n_epochs)
- print("lr: " ,lr)
- print("weight_decay: " ,weight_decay)
- print("momentum: " ,momentum)
- print("absolute error: " ,accuracy)
- file.write("________________________________"+"\n")
- file.write("hidden Dims: "+str(hidden_dim)+"\n")
- file.write("num_layers"+str(num_layers)+"\n")
- file.write("trainingExamples: "+str(trainingExamples)+"\n")
- file.write("testExamples: "+str(testExamples)+"\n")
- file.write("batch_size: "+str(batch_size)+"\n")
- file.write("n_epochs: "+str(n_epochs)+"\n")
- file.write("lr: " +str(lr)+"\n")
- file.write("weight_decay: " +str(weight_decay)+"\n")
- file.write("momentum: " +str(momentum)+"\n")
- file.write("absolute error: " +str(accuracy)+"\n")
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement