Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/local/anaconda3/envs/experiments/bin/python3
- #!/usr/bin/python3
- import torch
- import torch.nn as nn
- import time
- import random
- import sys
- import numpy
- from termcolor import colored
- import datetime
- import math
- import atexit
- #enable CUDA
- if torch.cuda.is_available():
- print("CUDA")
- else:
- print("CPU")
- sys.stdout.flush()
- use_greedy = True
- #print("\nParameters: ")
- #for arg in sys.argv:
- # print(arg, " ", end="")
- #print("\n")
- class GRU_TEST(nn.Module):
- def __init__(self, size, p, hidden, clip):
- super(GRU_TEST, self).__init__()
- self.size = size
- self.hidden_size = hidden
- self.clip = clip
- p -= 0
- self.r = torch.nn.Linear(hidden+(size*p), hidden)
- self.z = torch.nn.Linear(hidden+(size*p), hidden)
- self.h = torch.nn.Linear(hidden+(size*p), hidden)
- self.context = torch.zeros(hidden).cuda()
- self.tanh = torch.nn.Tanh()
- self.sigmoid = torch.nn.Sigmoid()
- self.context = torch.zeros(hidden).cuda()
- self.hidden = torch.zeros(hidden).cuda()
- def clear_internal_states(self):
- self.hidden = torch.zeros(hidden).cuda()
- self.outs = []
- self.targets = []
- def forward(self, inp):
- h = torch.cat((inp, self.hidden.detach()))
- #process layers
- rt = self.sigmoid(self.r(h))
- zt = self.sigmoid(self.z(h))
- ht = torch.cat(((self.hidden * rt), inp))
- ht = self.tanh(self.h(ht))
- self.hidden = (self.hidden * (1 - zt))
- self.hidden = self.hidden + zt * ht
- torch.nn.utils.clip_grad_norm_(self.parameters(), self.clip, 1)
- return self.hidden
- class Model(nn.Module):
- def __init__(self, size, p, dropout, hidden, clip):
- super(Model, self).__init__()
- self.gru1 = GRU_TEST(alphabet_size, n_prev, hidden, clip).float().cuda()
- self.gru2 = GRU_TEST(hidden, 1, hidden, clip).float().cuda()
- self.softmax = torch.nn.modules.activation.LogSoftmax(dim=0)
- self.dropout = torch.nn.Dropout(dropout)
- self.output_decoder = torch.nn.Linear(hidden, alphabet_size).cuda()
- self.epochs = 1
- self.batches = 1
- self.count = 0
- self.counter = 0
- self.outs = []
- self.targets = []
- self.output_counter = 0
- self.loss_function = torch.nn.BCEWithLogitsLoss().float()
- self.optimizer = torch.optim.Adam([
- {'params': self.parameters(),
- 'weight_decay': 0.25}
- ], lr=rate)
- def clear_internal_states(self):
- self.gru1.clear_internal_states()
- self.gru2.clear_internal_states()
- def forward(self, inp):
- x = torch.stack(inp).view(-1).detach()
- x = self.gru1(x)
- #x = self.gru2(x)
- x = self.dropout(x)
- x = self.output_decoder(x)
- x = self.softmax(x)
- return x
- def splash(a):
- if a:
- print("GRU Text Generator\nUsage:\n\n-f --filename: filename of input text - required\n-h --hidden: number of hidden layers, default 1\n-r --rate: learning rate\n-p --prev: number of previous states to observe, default 0.05")
- print("\nExample usage: <command> -f input.txt -h 5 -r 0.025")
- else:
- print("\nGRU Text Generator\n")
- print("2019\n")
- def getIndexFromLetter(letter, list):
- return list.index(letter)
- def getLetterFromIndex(i, list):
- return list[i]
- def parse(args, arg):
- for i in range(len(args)):
- if args[i] in arg:
- if len(args) < i+1:
- return ""
- if args[i+1].startswith("-"):
- splash(True)
- else:
- return args[i+1]
- return False
- def get_output(inp, greed):
- #if model.output_counter % 10 == 0:
- # greed = True
- #else:
- # greed = False
- #
- # model.output_counter += 1
- if not greed:
- inp = torch.nn.Softmax(dim=0)(inp)
- outchar = numpy.random.choice(alphabet, p=inp.cpu().detach().numpy())
- i = alphabet.index(outchar)
- else:
- i = torch.argmax(inp)
- outchar = alphabet[i]
- out = torch.zeros(len(inp)).cuda()
- out[i] = 1
- out = out * inp
- out[torch.argmax(out)] = 1
- return outchar, out
- def savemodel():
- print("Save model parameters? [y/n]➡")
- filename_input = input()
- if filename_input == 'y' or filename_input == 'Y' or filename_input.lower() == 'yes':
- filename = "Model-" + str(datetime.datetime.now()).replace(" ", "_")
- print("Save as filename [default: {}]➡".format(filename))
- filename_input = input()
- if not filename_input == "":
- filename = "Model-" + str(filename_input).replace(" ", "_")
- print("Saving model as {}...".format(filename))
- modelname = "./models/{}".format(filename)
- torch.save({
- 'model_state_dict': model.state_dict(),
- 'optimizer_state_dict': model.optimizer.state_dict()
- }, modelname)
- quit()
- def loadmodel():
- #load model parameters if checkpoint specified
- if not model_filename == False:
- try:
- checkpoint = torch.load(model_filename)
- model.load_state_dict(checkpoint['model_state_dict'])
- model.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
- except FileNotFoundError:
- print("Model not found.")
- quit()
- else:
- print("New model")
- atexit.register(savemodel)
- model_filename = None
- try:
- model_filename = parse(sys.argv, ["--load", "-l"])
- filename = parse(sys.argv, ["--filename", "-f"])
- if not filename or filename == "":
- splash()
- rate = float(parse(sys.argv, ["--rate", "-r"]))
- if not rate or rate == "":
- rate = 0.05
- hidden = int(parse(sys.argv, ["--hidden", "-h"]))
- if not hidden or hidden == "":
- hidden = 1
- clip = float(parse(sys.argv, ["--clip", "-c"]))
- if not clip:
- clip = 1
- epochs = int(parse(sys.argv, ["--epoch", "-e"]))
- if not epochs:
- epochs = 1
- nbatches = int(parse(sys.argv, ["--batch", "-b"]))
- if not nbatches:
- nbatches = 1
- momentum = float(parse(sys.argv, ["--momentum", "-m"]))
- if not momentum:
- momentum = 0.4
- n_prev = int(parse(sys.argv, ["--previous", "-p"]))
- if not n_prev:
- n_prev = 5
- dropout = float(parse(sys.argv, ["--dropout", "-d"]))
- if not dropout:
- dropout = 0.0
- sample_size = int(parse(sys.argv, ["--sample", "-s"]))
- if not sample_size:
- sample_size = 1000
- except:
- splash(True)
- quit()
- #open file
- master_words = []
- alphabet = []
- #writer = SummaryWriter()
- text = []
- e = 0
- c = 0
- randomness = 0
- with open(filename, "r") as f:
- # reads all lines and removes non alphabet words
- intext = f.read()
- for l in list(intext):
- if l == "\n": l = "¶"
- if l == "\x1b": print("XXX")
- text.append(l)
- for l in text:
- sys.stdout.flush()
- if l not in alphabet:
- alphabet.append(l)
- print("\r{}% - {}/{}".format(int(c/len(text)*100), c, len(text)), end="")
- c+=1
- epochs = 1
- alphabet_size = len(alphabet)
- splash(False)
- model = Model(alphabet_size, n_prev, dropout, hidden, clip).float().cuda()
- nchars = len(text)
- replay_memory = []
- graph_time = 0
- mem_max = torch.cuda.max_memory_allocated()
- print(mem_max)
- def select(out):
- r = random.randint(0,randomness)
- for i in range(r):
- out[torch.argmax(out)] = 0
- return out
- def get_outchar(out):
- return alphabet[torch.argmax(out)]
- def get_next():
- output = []
- try:
- for n in range(n_prev):
- letter = text[(model.count+n)]
- letter = letter.replace("\x1b", " ")
- if letter == "\n": letter = "¶"
- o = torch.autograd.Variable(torch.zeros(alphabet_size), requires_grad=True).cuda()
- if letter == "[": print(letter)
- o[alphabet.index(letter)] = 1
- output.append(o)
- model.count += 1
- target = torch.autograd.Variable(torch.zeros(alphabet_size), requires_grad=True).cuda()
- target[alphabet.index(letter)] = 1
- #target = alphabet.index(text[(model.count + n)])
- return output, target, False
- except IndexError:
- print("ZZZ")
- model.epochs += 1
- return None, None, True
- model.train(True)
- steps = 5000
- t = 0
- inp, target, done = get_next()
- generate_init = 0
- while True:
- #txt = colored("\nForward prop... Epoch: {} | Batch: {} | Working...".format(model.epochs, model.batches), attrs=['reverse'])
- #print(txt)
- torch.cuda.empty_cache()
- t = 0
- outs = []
- targets = []
- variation = []
- print('\n')
- done = False
- start = datetime.datetime.now()
- while t < steps:
- inp, target, done = get_next()
- if done:
- model.count = 0
- break
- t += 1
- out = model.forward(inp)
- char, out = get_output(out, use_greedy)
- #print(char, end="")
- #sys.stdout.flush()
- outs.append(out)
- targets.append(target)
- #inp.append(out)
- #inp.pop(0)
- field = ""
- for o in inp:
- field += alphabet[torch.argmax(o)]
- field += "| {} | {} |".format(char, alphabet[torch.argmax(target)])
- progress = int(100*(t/steps))
- mem_used = torch.cuda.memory_allocated()
- mem_avail = torch.cuda.memory_cached()
- percentage = 100 * (mem_used / mem_avail)
- if char not in variation:
- variation.append(char)
- txt = "\rForward prop... | Epoch: {} | Batch: {} | {}/{} | {} | Progress: {}% | Memory: {}/{} ({}%)".format(model.epochs, model.batches, t, steps, field, progress, mem_used, mem_avail, percentage)
- print(txt, end=" | ")
- #inp.append(out)
- #inp.pop(0)1000f
- end = datetime.datetime.now()
- print("\nTime: {} @ {}\n".format((end-start), end))
- i = 0
- txt = colored("\rBackward prop... | Epoch: {} | Batch: {} | Working...".format(model.epochs, model.batches),
- attrs=['reverse'])
- print(txt,end="")
- out = torch.stack(outs).cuda()
- target = torch.stack(targets).cuda()
- model.optimizer.zero_grad()
- loss = model.loss_function(out, target)
- loss.backward(retain_graph=True)
- model.optimizer.step()
- del loss, out, target
- i += 1
- targets.clear()
- outs.clear()
- time = datetime.datetime.now()
- variation_int = int(100 * (len(variation) / alphabet_size))
- print("\nVariation: {} | {}/{} | \nAt {} - Memory: {}/{} ({}%)".format(variation_int, len(variation), alphabet_size, time, mem_used, mem_avail, percentage))
- variation = []
- model.optimizer.zero_grad()
- i = 0
- model.batches += 1
- model.counter += 1
- model.count = 0
- inp, target, done = get_next()
- print("\n Generating text... \n")
- for i in range(1000):
- out = model.forward(inp)
- #sample argmax letter
- char, out = get_output(out, use_greedy)
- print(char, end="")
- sys.stdout.flush()
- inp.append(out)
- inp.pop(0)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement