Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import torch
- import torch.nn as nn
- from torch.nn import CrossEntropyLoss
- from torch.utils.data import DataLoader
- from torch.utils.tensorboard import SummaryWriter
- try:
- from textloader import *
- except:
- from student_tp5.src.textloader import *
- try:
- from generate import *
- except:
- from student_tp5.src.generate import *
- import torch.nn.functional as F
- from datetime import datetime
- from pathlib import Path
- # from student_tp4.src.exo4 import TrumpDataset
- # TODO:
- class TrumpDataset(Dataset):
- def __init__(self,text,maxsent=None,maxlen=None):
- """ Dataset pour les tweets de Trump
- * text : texte brut
- * maxsent : nombre maximum de phrases.
- * maxlen : longueur maximale des phrases.
- """
- maxlen = maxlen or sys.maxsize
- full_text = normalize(text)
- self.phrases = [p[:maxlen].strip()+"." for p in full_text.split(".") if len(p)>0]
- if maxsent is not None:
- self.phrases=self.phrases[:maxsent]
- self.MAX_LEN = max([len(p) for p in self.phrases])
- def __len__(self):
- return len(self.phrases)
- def __getitem__(self,i):
- t = string2code(self.phrases[i])
- t = torch.cat([torch.zeros(self.MAX_LEN-t.size(0),dtype=torch.long),t])
- return t[:-1],t[1:]
- cle = CrossEntropyLoss(reduction='none')
- def maskedCrossEntropy(output: torch.Tensor, target: torch.LongTensor, padcar: int):
- """
- :param output: Tenseur length x batch x output_dim,
- :param target: Tenseur length x batch
- :param padcar: index du caractere de padding
- """
- # TODO: Implémenter maskedCrossEntropy sans aucune boucle, la CrossEntropy qui ne prend pas en compte les caractères de padding.
- mask = (target!=padcar).contiguous().view(-1)
- output = output.view(-1,output.size(2))
- target = target.contiguous().view(-1).long()
- return (cle(output, target)*mask).sum()/mask.sum()
- class RNN(nn.Module):
- # TODO: Recopier l'implémentation du RNN (TP 4)
- # TODO: Implémenter comme décrit dans la question 1
- def __init__(self, dimH, dimX, dimY):
- super().__init__()
- self.dimH=dimH
- self.dimX = dimX
- self.w1 = nn.Linear(dimX, dimH)
- self.w2 = nn.Linear(dimH, dimH)
- self.wd = nn.Linear(dimH, dimY)
- self.device=None
- self.batchDim = 0
- self.lengthDim=1
- def hzero(self, batch,dimH):
- return torch.rand(batch, dimH)
- def one_step(self, xi, hi):
- """ batch * L """
- return torch.tanh(self.w1(xi)+self.w2(hi))
- def forward(self, seq, h=None):
- """
- batch * length * dimx (2)
- """
- # print(seq, seq.shape)
- allH = torch.empty(seq.size(self.batchDim), seq.size(self.lengthDim), self.dimH, device=self.device)
- if h==None:
- h = self.hzero(seq.size(self.batchDim), self.dimH)
- h.to(self.device)
- for i in range(seq.size()[self.lengthDim]):
- x = seq[:,i,:].view(seq.size(self.batchDim), self.dimX)
- h = self.one_step(x, h)
- allH[:,i,:] = h
- return allH
- def decode(self, h):
- return F.softmax(self.wd(h), 1)
- class LSTM(RNN):
- # TODO: Implémenter un LSTM
- pass
- class GRU(nn.Module):
- # TODO: Implémenter un GRU
- pass
- class State:
- def __init__(self, model, optim, writePath):
- self.model = model
- self.optim = optim
- self.epoch, self.iteration = 0,0
- self.writePath = writePath
- def save(self,path):
- torch.save(self.model.state_dict(), path)
- @staticmethod
- def load(path):
- with path.open("rb") as fp:
- state = torch.load(fp) #on recommence depuis le modele sauvegarde
- return state
- class RNN_gen(torch.nn.Module):
- def __init__(self, reseau, nbChar, dimEmb, dimH, dimX, dimY,device):
- super().__init__()
- self.emb = nn.Embedding(nbChar, dimEmb)
- self.rnn = reseau( dimH, dimX, dimY)
- self.rnn.device = device
- def forward(self, x):
- x_enc = self.emb(x.long())
- res = self.rnn(x_enc)
- return self.rnn.decode(res)
- BATCH_SIZE = 32
- LENGTH = 10
- # TODO:
- PATH = "data/"
- with open(PATH+'trump_full_speech.txt') as f:
- text = f.read()
- ds = TrumpDataset(text,200,LENGTH)
- data_train = DataLoader(ds,batch_size=len(ds),drop_last=True, collate_fn = pad_collate_fn)
- # data_train_l = DataLoader(ds,batch_size=len(ds),shuffle=True,drop_last=True)
- len_emb = len(id2lettre)
- DIM_EMB = 50
- DIM_H = 10
- device=torch.device('cpu')
- model = RNN_gen(RNN,len_emb,DIM_EMB,DIM_H,DIM_EMB,len_emb,device)
- lossfunc = lambda yhat,y : maskedCrossEntropy(yhat,y,PAD_IX)
- alpha=0.0001
- epoch=200
- nameState='student_tp5/src/rnn7'
- dataLoader = data_train
- optmizer=torch.optim.SGD
- writePath = "student_tp5/runs/trump"+datetime.now().strftime("%Y%m%d-%H%M%S")
- savepath = Path(nameState+".pch")
- if savepath.is_file():
- state = State.load(savepath)
- state.model = state.model.to(device)
- else:
- model = model.to(device)
- optim = optmizer(params=model.parameters(),lr=alpha) ## on optimise selon w et b, lr : pas de gradient
- state = State(model, optim, writePath)
- for epoch in range (state.epoch, epoch):
- state.iter = 0
- for x,y in dataLoader:
- x = x.to(device)
- y = y.to(device)
- state.optim.zero_grad()
- predict = state.model(x)
- l = lossfunc(predict,y.long())
- l.backward()
- state.optim.step()
- state.iter += 1
- with savepath.open("wb") as fp:
- state.epoch = epoch + 1
- torch.save(state, fp)
- #affichage
- with torch.no_grad():
- x,y = next(iter(data_train))
- x = x.to(device)
- y = y.to(device)
- predict = state.model(x)
- l_train = lossfunc(predict,y.long()).item()
- print('epoch',epoch,'loss train',l_train)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement