Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/local/anaconda3/envs/experiments/bin/python3
- import torch
- import torch.nn as nn
- import sys
- import math
- from termcolor import colored
- import datetime
- import atexit
- import tensorboardX
- import os
- writer = tensorboardX.SummaryWriter()
- #enable CUDA
- if torch.cuda.is_available():
- print("CUDA")
- else:
- print("CPU")
- sys.stdout.flush()
- use_greedy = False
- #print("\nParameters: ")
- #for arg in sys.argv:
- # print(arg, " ", end="")
- #print("\n")
- class GRU_TEST(nn.Module):
- def __init__(self, size, prev, batch_size, hidden, clip):
- super(GRU_TEST, self).__init__()
- self.size = size
- self.hidden_size = hidden
- self.clip = clip
- self.r = torch.nn.Linear(size*prev*batch_size+hidden, hidden)
- self.z = torch.nn.Linear(size*prev*batch_size+hidden, hidden)
- self.h = torch.nn.Linear(size*prev*batch_size+hidden, hidden)
- self.context = torch.zeros(hidden).cuda()
- self.tanh = torch.nn.Tanh()
- self.sigmoid = torch.nn.Sigmoid()
- self.hidden = torch.zeros(hidden).cuda()
- def reset(self):
- del self.hidden
- self.hidden = torch.zeros(hidden).cuda()
- def forward(self, inp):
- h = torch.cat((inp, self.hidden))
- #process layers
- rt = self.sigmoid(self.r(h))
- zt = self.sigmoid(self.z(h))
- ht = torch.cat(((self.hidden * rt), inp))
- ht = self.tanh(self.h(ht))
- self.hidden = (self.hidden * (1 - zt))
- self.hidden = self.hidden + zt * ht
- torch.nn.utils.clip_grad_norm_(self.parameters(), self.clip, 1)
- return self.hidden
- class Model(nn.Module):
- def __init__(self, size, prev, batch_size, momentum, dropout, hidden, clip):
- super(Model, self).__init__()
- self.gru1 = GRU_TEST(size, prev, batch_size, hidden, clip).cuda()
- self.gru1.reset()
- self.softmax = torch.nn.modules.activation.LogSoftmax(dim=0)
- self.dropout = torch.nn.Dropout(dropout)
- self.output_decoder = torch.nn.Linear(hidden, alphabet_size*batch_size).cuda()
- self.epochs = 1
- self.batches = 1
- self.counter = 0
- self.runs = 0
- self.loss_function = torch.nn.CrossEntropyLoss()
- self.optimizer = torch.optim.Adam([
- {'params': self.parameters(),
- 'weight_decay': 0.25}
- ], lr=rate)
- #self.optimizer = torch.optim.SGD(params=self.parameters(), lr=rate, momentum=momentum)
- def clear_internal_states(self):
- self.gru1.reset()
- std = 1.0/math.sqrt(self.gru1.hidden_size)
- for p in self.parameters():
- p.data.uniform_(-std, std)
- def forward(self, inp):
- x = torch.autograd.Variable((inp.detach()).view(-1))
- x = self.gru1(x)
- x = self.dropout(x)
- x = self.output_decoder(x)
- x = x.view(nbatches, -1)
- return x
- def splash(a):
- if a:
- print("GRU Text Generator\nUsage:\n\n-f --filename: filename of input text - required\n-h --hidden: number of hidden layers, default 1\n-r --rate: learning rate\n-p --prev: number of previous states to observe, default 0.05")
- print("\nExample usage: <command> -f input.txt -h 5 -r 0.025")
- else:
- print("\nGRU Text Generator\n")
- print("Alphabet size: {}".format(alphabet_size))
- print("Hyperparameters:")
- params = sys.argv
- params.pop(0)
- for a in list(params):
- print(a, " ",end="")
- os.system("clear")
- print("\n")
- print(datetime.datetime.now())
- print("\n")
- def getIndexFromLetter(letter, list):
- return list.index(letter)
- def getLetterFromIndex(i, list):
- return list[i]
- def parse(args, arg):
- for i in range(len(args)):
- if args[i] in arg:
- if len(args) < i+1:
- return ""
- if args[i+1].startswith("-"):
- splash(True)
- else:
- return args[i+1]
- return False
- def savemodel():
- print("Save model parameters? [y/n]β‘")
- filename_input = input()
- if filename_input == 'y' or filename_input == 'Y' or filename_input.lower() == 'yes':
- filename = "Model-" + str(datetime.datetime.now()).replace(" ", "_")
- print("Save as filename [default: {}]β‘".format(filename))
- filename_input = input()
- if not filename_input == "":
- filename = "Model-" + str(filename_input).replace(" ", "_")
- print("Saving model as {}...".format(filename))
- modelname = "./models/{}".format(filename)
- torch.save({
- 'model_state_dict': model.state_dict(),
- 'optimizer_state_dict': model.optimizer.state_dict()
- }, modelname)
- quit()
- def loadmodel():
- #load model parameters if checkpoint specified
- if not model_filename == False:
- try:
- checkpoint = torch.load(model_filename)
- model.load_state_dict(checkpoint['model_state_dict'])
- model.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
- except FileNotFoundError:
- print("Model not found.")
- quit()
- else:
- print("New model")
- atexit.register(savemodel)
- model_filename = None
- try:
- model_filename = parse(sys.argv, ["--load", "-l"])
- filename = parse(sys.argv, ["--filename", "-f"])
- if not filename or filename == "":
- splash()
- rate = float(parse(sys.argv, ["--rate", "-r"]))
- if not rate or rate == "":
- rate = 0.05
- hidden = int(parse(sys.argv, ["--hidden", "-h"]))
- if not hidden or hidden == "":
- hidden = 1
- clip = float(parse(sys.argv, ["--clip", "-c"]))
- if not clip:
- clip = 1
- epochs = int(parse(sys.argv, ["--epoch", "-e"]))
- if not epochs:
- epochs = 1
- nbatches = int(parse(sys.argv, ["--batch", "-b"]))
- if not nbatches:
- nbatches = 1
- momentum = float(parse(sys.argv, ["--momentum", "-m"]))
- if not momentum:
- momentum = 0.4
- n_prev = int(parse(sys.argv, ["--previous", "-p"]))
- if not n_prev:
- n_prev = 5
- dropout = float(parse(sys.argv, ["--dropout", "-d"]))
- if not dropout:
- dropout = 0.0
- sample_size = int(parse(sys.argv, ["--sample", "-s"]))
- if not sample_size:
- sample_size = 1000
- temperature = float(parse(sys.argv, ["--temperature", "-t"]))
- if not temperature:
- temperature = 1.0
- except:
- splash(True)
- quit()
- #open file
- master_words = []
- alphabet = []
- #writer = SummaryWriter()
- text = []
- e = 0
- c = 0
- randomness = 0
- with open(filename, "r") as f:
- # reads all lines and removes non alphabet words
- intext = f.read()
- for l in list(intext):
- if l == "\n": l = "ΒΆ"
- if l == "\x1b": print("XXX")
- text.append(l)
- for l in text:
- sys.stdout.flush()
- if l not in alphabet:
- alphabet.append(l)
- print("\r{}% - {}/{}".format(int(c/len(text)*100), c, len(text)), end="")
- c+=1
- epochs = 1
- alphabet_size = len(alphabet)
- splash(False)
- model = Model(alphabet_size, n_prev, nbatches, momentum, dropout, hidden, clip).cuda()
- nchars = len(text)
- graph_time = 0
- mem_max = torch.cuda.max_memory_allocated()
- field = [" " for _ in range(10)]
- def one_hot(char):
- output = torch.zeros(alphabet_size).cuda()
- output[alphabet.index(char)] = 1
- return output
- def get_input_vector(chars):
- out = []
- for b in range(nbatches):
- batch = []
- for c in chars:
- batch.append(one_hot(c))
- o = torch.stack(batch)
- out.append(o)
- out = torch.stack(out).cuda()
- return out
- def get_output(inp):
- inp = torch.nn.Softmax(dim=1)(inp / temperature)
- sample = torch.multinomial(inp, 1)[:]
- return alphabet[sample]
- model.counter = n_prev
- while True:
- t = 0
- variation = []
- done = False
- total_loss = 0
- total_time = 0
- mem = 0
- loss = 0
- model.optimizer.zero_grad()
- model.clear_internal_states()
- steps = 2500
- model.runs += 1
- field = [text[model.counter+x] for x in range(n_prev)]
- start = datetime.datetime.timestamp(datetime.datetime.now())
- while t < steps:
- #get target char
- target = alphabet.index(text[(model.counter+1)%len(text)])
- #make target vector
- target = [target for _ in range(nbatches)]
- target = torch.tensor(target).cuda()
- #forward pass
- inp = get_input_vector(field)
- out = model.forward(inp)
- #get outputs
- char = []
- for o in out.split(1):
- a = get_output(o)
- char.append(a)
- f = ''.join(str(e) for e in field)
- c = ''.join(str(e) for e in char)
- progress = int(100*(t / steps))
- txt = colored("\r β² {} | Forward prop... | Progress: {}% | Epoch: {} | Batch: {} | {} | {} | ...".format(model.runs, progress, model.epochs, model.batches, f, c),
- attrs=['reverse'])
- print(txt,end="")
- model.counter += 1
- l = model.loss_function(out, target)
- writer.add_scalar('loss', l, model.counter)
- loss += l
- t += 1
- if model.counter > len(text):
- model.epochs += 1
- model.batches = 0
- model.counter = 0
- print("\nxz")
- field.append(alphabet[target[0]])
- field.pop(0)
- end = datetime.datetime.timestamp(datetime.datetime.now())
- total_time = int(end - start)
- writer.add_scalar('time', torch.tensor(total_time), model.runs)
- print("\nBackward prop...")
- sys.stdout.flush()
- model.batches += 1
- writer.add_scalar('total_loss', int(loss), model.counter)
- loss.backward()
- model.optimizer.step()
- field = [text[x] for x in range(n_prev)]
- if model.runs % 1 == 0:
- variety = []
- print("\nGenerating...\n")
- for i in range(1000):
- inp = get_input_vector(field)
- out = model.forward(inp)
- char = []
- for o in out.split(1):
- #a = get_output(o)
- a = alphabet[torch.argmax(o)]
- char.append(a)
- print(char[0], end="")
- sys.stdout.flush()
- if char[0] not in variety:
- variety.append(char[0])
- field.append(char[0])
- field.pop(0)
- variety = int(100*(len(variety)/alphabet_size))
- print("\nVariety: {}\n".format(variety))
- writer.add_scalar('variety', variety, model.runs)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement