Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- print("Loading external modules...")
- print("Loading pytorch...")
- import torch
- import torch.nn as nn
- import torch.optim as optim
- from torch.utils.data import Dataset, DataLoader
- print("Finished loading pytorch!")
- print("Loading sklearn / scikit-learn...")
- from sklearn.feature_extraction.text import TfidfVectorizer
- from sklearn.preprocessing import LabelEncoder
- from sklearn.model_selection import train_test_split
- from sklearn.preprocessing import MultiLabelBinarizer
- print("Finished loading sklearn / scikit-learn...")
- print("Loading other modules...")
- import numpy as np
- import matplotlib.pyplot as plt
- import random
- print("Finished loading other modules")
- print("Finished loading all external modules!")
- print("Loading internal modules...")
- import ml.dataset
- import ml.config
- import ml.utils
- import ml.devices
- print("Finished loaded internal modules!")
- gpu_available = ml.devices.is_cuda_available()
- # Check if pytorch is compiled with CUDA enabled
- try:
- torch.cuda.current_device() # Try to get current CUDA device, throws AssertionError if not
- torch_cuda_compiled = True
- print("[DEVICE] PyTorch compiled with CUDA enabled")
- except:
- torch_cuda_compiled = False
- gpu_available = False
- print("[DEVICE] PyTorch not compiled with CUDA enabled")
- # Check for devices
- if (gpu_available and ml.config.use_gpu): # Prefer GPU and GPU is availbale
- print("[GPU] CUDA Devices available:", ml.devices.get_device_count())
- print("[GPU] Current CUDA Device:", ml.devices.get_device_name(ml.devices.get_current_device()))
- device = torch.device("cuda")
- elif (gpu_available == False and ml.config.use_gpu): # Prefer GPU / CUDA but not available, using CPU
- if (torch_cuda_compiled):
- print("[GPU] Failed to use GPU, CUDA device not found")
- else:
- print("[GPU] Failed to use GPU, PyTorch not compiled with CUDA enabled")
- print("[CPU] Defaulting to CPU")
- device = torch.device("cpu")
- else:
- print("[GPU] CUDA Device found but not used, use GPU is false")
- print("[CPU] Defaulting to CPU")
- device = torch.device("cpu")
- device_type = ""
- if (device.type == "cpu"):
- device_type = "CPU"
- elif (device.type == "cuda"):
- device_type = "GPU"
- else:
- device_type = "UNKNOWN"
- print("[DEVICE] Using device:", device_type)
- # Original training data
- inputs = ml.dataset.multi_texts
- labels = ml.dataset.multi_labels
- ml.config.random_split_seed = random.randint(0,9999) # Generate random split seed
- # Split original inputs into training and temporary testing sets
- inputs_text_train, inputs_text_temp_test, labels_train, labels_temp_test = train_test_split(inputs, labels, test_size=ml.config.test_split_ratio, random_state=ml.config.random_split_seed)
- # Split the temporary test set into validation and test sets
- inputs_text_val, inputs_text_test, labels_val, labels_test = train_test_split(inputs_text_temp_test, labels_temp_test, test_size=ml.config.validation_split_ratio, random_state=ml.config.random_split_seed)
- # Convert texts to TF-IDF vectors
- vectorizer = TfidfVectorizer()
- X_train = vectorizer.fit_transform(inputs_text_train)
- X_test = vectorizer.transform(inputs_text_test)
- X_val = vectorizer.transform(inputs_text_val)
- # Labels are already in a form that resembles one-hot encoding, so no need to transform them
- y_train = labels_train
- y_val = labels_val
- y_test = labels_test
- mlb = MultiLabelBinarizer()
- mlb.fit(labels)
- # One-hot encoding
- """
- print("Converting labels to one-hot encodings...")
- mlb = MultiLabelBinarizer()
- y = mlb.fit_transform(labels)
- # Split original inputs into training and temporary testing sets
- inputs_text_train, inputs_text_temp_test, labels_train, labels_temp_test = train_test_split(inputs, labels, test_size=ml.config.test_split_ratio, random_state=ml.config.random_split_seed)
- # Split the temporary test set into validation and test sets
- inputs_text_val, inputs_text_test, labels_val, labels_test = train_test_split(inputs_text_temp_test, labels_temp_test, test_size=ml.config.validation_split_ratio, random_state=ml.config.random_split_seed)
- # Split original inputs into training and testing sets
- #inputs_text_train, inputs_text_test = train_test_split(inputs, test_size=ml.config.test_split_ratio, random_state=ml.config.random_split_seed)
- # Convert texts to TF-IDF vectors
- vectorizer = TfidfVectorizer()
- X_train = vectorizer.fit_transform(inputs_text_train)
- X_test = vectorizer.transform(inputs_text_test)
- X_val = vectorizer.transform(inputs_text_val)
- # Convert labels to integers
- if (ml.config.enable_timing): start = ml.utils.timer()
- print("Converting labels to integers...")
- #le = LabelEncoder()
- #y_train = le.fit_transform(labels_train)
- #y_test = le.transform(labels_test)
- #y_val = le.transform(labels_val)
- y_train = mlb.fit_transform(labels_train)
- y_test = mlb.transform(labels_test)
- y_val = mlb.transform(labels_val)
- print("Finished converting labels to integers!")
- if (ml.config.enable_timing): end = ml.utils.timer()
- if (ml.config.enable_timing): print(f"Converting labels took {ml.utils.elapsed_time(start, end)} seconds")
- """
- # Unique labels is the labels list without duplicates
- # Unique labels holds 1 of each possible output / label
- # Unique int labels holds the index or id of the label
- # Number of unique labels is the length of labels list without duplicates
- #unique_labels = le.classes_
- #unique_labels = mlb.classes_
- unique_int_labels = ml.utils.array_to_list(np.unique(y_train,), "%i")
- num_unique_labels = len(np.unique(y_train))
- label_counts = {}
- if (ml.config.display_train_labels or ml.config.display_train_label_counts):
- if (ml.config.display_train_labels): print("\nLabels:")
- for l_idx in range(len(y_train)):
- for i in range(len(labels[l_idx])):
- if labels[l_idx][i] not in label_counts:
- label_counts[labels[l_idx][i]] = 0
- label_counts[labels[l_idx][i]] += 1
- if (ml.config.display_train_labels): print(f" {inputs[l_idx]}: {str(labels[l_idx])}")
- if (ml.config.display_train_label_counts):
- for key, value in label_counts.items():
- print("Label:", key, "- Count:", value)
- if (ml.config.display_train_unique_labels):
- print("\nUnique Labels:")
- for u_l_idx in range(num_unique_labels):
- break
- #print(" "+unique_labels[u_l_idx]+":"+str(unique_int_labels[u_l_idx]))
- print()
- # Split data into training and testing sets
- #X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=ml.config.test_split_ratio, random_state=ml.config.random_split_seed) # X: inputs, y: labels
- # Define the Dataset
- class TextDataset(Dataset):
- def __init__(self, X, y):
- self.X = torch.from_numpy(X.toarray()).float() # Convert to PyTorch tensor
- self.y = torch.from_numpy(np.array(y)).float() # Labels should be floats
- def __len__(self):
- return len(self.y)
- def __getitem__(self, idx):
- return self.X[idx], self.y[idx], idx # Return idx along with the data and labels
- # Create Dataloaders
- if (ml.config.enable_timing): start = ml.utils.timer()
- print("Creating Dataloaders...")
- train_data = TextDataset(X_train, y_train)
- test_data = TextDataset(X_test, y_test)
- val_data = TextDataset(X_val, y_val) # Validation data
- train_loader = DataLoader(train_data, batch_size=ml.config.train_batch_size, shuffle=ml.config.shuffle_train_data)
- test_loader = DataLoader(test_data, batch_size=ml.config.test_batch_size)
- val_loader = DataLoader(val_data, batch_size=ml.config.validation_batch_size) # Validation DataLoader
- print("Finished creating Dataloaders!")
- if (ml.config.enable_timing): end = ml.utils.timer()
- if (ml.config.enable_timing): print(f"Created Dataloaders in {ml.utils.elapsed_time(start, end)} seconds")
- # Long Short-Term Memory Model (LSTM)
- class LSTMClassifier(nn.Module):
- def __init__(self, input_size, hidden_size, output_size, num_layers):
- super(LSTMClassifier, self).__init__()
- self.hidden_size = hidden_size
- self.num_layers = num_layers
- self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=False)
- self.fc = nn.Linear(hidden_size, output_size)
- def forward(self, x):
- # Initialize hidden state with zeros
- h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
- # Initialize cell state
- c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
- # Forward propagate LSTM
- out, _ = self.lstm(x, (h0, c0)) # out: tensor of shape (batch_size, seq_length, hidden_size)
- # Decode the hidden state of the last time step
- out = self.fc(out[:, -1, :])
- return out
- # Multi-class classifier model
- class MultiLabelClassifier(nn.Module):
- def __init__(self, input_size, hidden_size, num_classes):
- super(MultiLabelClassifier, self).__init__()
- print("Input size:", input_size)
- print("Hidden size:", hidden_size)
- print("Num classes:", num_classes)
- self.layer1 = nn.Linear(input_size, hidden_size)
- self.layer2 = nn.Linear(hidden_size, num_classes)
- def forward(self, x):
- out = self.layer1(x)
- out = nn.functional.relu(out)
- return self.layer2(out)
- def save(self, path):
- torch.save(model.state_dict(), path)
- # Linear model
- class LinearModel(nn.Module):
- def __init__(self, input_dim, output_dim):
- super(LinearModel, self).__init__()
- self.fc1 = nn.Linear(input_dim, ml.config.hidden_units)
- self.fc2 = nn.Linear(ml.config.hidden_units, output_dim)
- def forward(self, x):
- x = nn.functional.relu(self.fc1(x))
- x = self.fc2(x)
- return nn.functional.log_softmax(x, dim=1)
- # LINEAR
- # Create the model
- #model = LinearModel(X_train.shape[1], len(mlb.classes_)).to(device)
- # Define the loss function and the optimizer
- #criterion = nn.CrossEntropyLoss()
- #optimizer = optim.SGD(model.parameters(), lr=ml.config.learning_rate)
- # Multi Label Classifier
- #model = MultiLabelClassifier(X_train.shape[1], ml.config.hidden_units, len(mlb.classes_)).to(device)
- model = MultiLabelClassifier(X_train.shape[1], ml.config.hidden_units, len(y_train[0])).to(device)
- criterion = nn.BCEWithLogitsLoss()
- optimizer = torch.optim.SGD(model.parameters(), lr=ml.config.learning_rate)
- # LSTM
- #model = LSTMClassifier(X_train.shape[1], ml.config.hidden_units, len(y_train[0]), num_layers=1).to(device)
- # Train the model
- if (ml.config.enable_timing): start = ml.utils.timer()
- print(f"\nTraining model...")
- total_step = len(train_loader)
- step_iter = 0
- loss_list = [] # List for storing average loss per step
- acc_list = [] # List for storing average accuracy per step
- epoch_loss_list = [] # List for storing average loss per epoch
- epoch_acc_list = [] # List for storing average accuracy per epoch
- val_epoch_loss_list = [] # List for storing average loss per epoch for validation data
- val_epoch_acc_list = [] # List for storing average accuracy per epoch for validation data
- val_loss_list = [] # List for storing loss for validation data
- val_acc_list = [] # List for storing accuracy for validation data
- epoch_timers = []
- for epoch in range(ml.config.num_epochs+1):
- if (ml.config.enable_epoch_average_time): epoch_start = ml.utils.timer()
- epoch_correct = 0 # Total correct predictions in the epoch
- epoch_total = 0 # Total labels processed in the epoch
- for i, data in enumerate(train_loader):
- """
- inputs, labels, indices = data
- inputs = inputs.to(device)
- labels = labels.to(device)
- optimizer.zero_grad()
- outputs = model(inputs)
- loss = criterion(outputs, labels)
- loss_list.append(loss.item())
- loss.backward()
- optimizer.step()
- """
- inputs, labels, indices = data
- inputs = inputs.to(device)
- labels = labels.to(device).float()
- # Forward pass
- outputs = model(inputs)
- loss = criterion(outputs, labels)
- loss_list.append(loss.item())
- # Backward and optimize
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- # Track the accuracy
- total = labels.size(0)
- _, predicted = torch.max(outputs.data, 1)
- # Get predictions: apply the threshold to the outputs
- predicted = (torch.sigmoid(outputs.data) > ml.config.threshold).float()
- # Calculate the number of correctly predicted labels
- correct = (predicted == labels).sum().item() / (labels.size(0) * labels.size(1))
- #correct = (predicted == labels).sum().item()
- epoch_correct += correct # Increment by number of correct predictions
- epoch_total += total # Increment by number of total predictions
- acc_list.append(correct / total)
- if (ml.config.display_step_info):
- print(f'Epoch [{epoch}/{ml.config.num_epochs}] - Step [{i+1}/{total_step}] - Loss: {loss.item():.4f} - Accuracy: {correct}/{total} ({100*correct / total:.2f}%)')
- elif (ml.config.display_train_progress):
- print(f"Training: {100*(epoch)/(ml.config.num_epochs):.2f}% - Epoch: [{epoch}/{ml.config.num_epochs}] - Step [{step_iter-total_step+1}/{(total_step*ml.config.num_epochs)}]", end="\r")
- step_iter += 1
- # Add average loss and accuracy for this epoch to respective lists
- # Calcute mean (average) before resetting accuracy and loss lists
- epoch_loss_list.append(np.mean(loss_list))
- epoch_acc_list.append(np.mean(acc_list))
- # Reset loss and accuracy lists
- loss_list = []
- acc_list = []
- if (ml.config.display_epoch_info): # Print at end of epoch
- print(f'Epoch {epoch} - Loss: {np.mean(loss_list):.4f} - Accuracy: {epoch_correct}/{epoch_total} ({100 * np.mean(acc_list):.2f}%)')
- if (ml.config.enable_epoch_average_time):
- epoch_end = ml.utils.timer()
- epoch_timers.append(ml.utils.elapsed_time(epoch_start, epoch_end))
- # Validation
- model.eval() # Set the model to evaluation mode
- with torch.no_grad():
- val_correct = 0
- val_total = 0
- for data in val_loader:
- inputs, labels, indices = data
- inputs = inputs.to(device)
- labels = labels.to(device).float()
- outputs = model(inputs)
- loss = criterion(outputs, labels)
- val_loss_list.append(loss.item())
- # Track the accuracy
- total = labels.size(0)
- _, predicted = torch.max(outputs.data, 1)
- # Get predictions: apply the threshold to the outputs
- predicted = (torch.sigmoid(outputs.data) > ml.config.threshold).float()
- # Calculate the number of correctly predicted labels
- correct = (predicted == labels).sum().item() / (labels.size(0) * labels.size(1))
- #correct = (predicted == labels).sum().item()
- val_correct += correct # Increment by number of correct predictions
- val_total += total # Increment by number of total predictions
- val_acc_list.append(correct / total)
- # Add average loss and accuracy for this epoch to respective lists
- val_epoch_loss_list.append(np.mean(val_loss_list))
- val_epoch_acc_list.append(np.mean(val_acc_list))
- # Reset loss and accuracy lists for the next epoch
- val_loss_list = []
- val_acc_list = []
- model.train() # Set the model back to training mode
- print("\nFinished training model!")
- if (ml.config.enable_timing): end = ml.utils.timer()
- if (ml.config.enable_timing): print(f"Training model took {ml.utils.elapsed_time(start, end)} seconds")
- if (ml.config.enable_epoch_average_time): print(f"Average epoch training time: {ml.utils.list_average(epoch_timers, ml.config.time_average_precision)} seconds")
- if (ml.config.auto_show_graph):
- ml.utils.show_graph(epoch_loss_list, epoch_acc_list, val_epoch_loss_list, val_epoch_acc_list)
- # Test the model on testing data
- if (ml.config.enable_timing): start = ml.utils.timer()
- print(f"\nTesting model on training data...")
- model.eval()
- with torch.no_grad():
- #test_iter = 0
- for data in test_loader:
- inputs, labels, indices = data # Update to accept indices
- inputs = inputs.to(device)
- labels = labels.to(device).float()
- outputs = model(inputs)
- _, predicted = torch.max(outputs, 1)
- #if (ml.config.display_test_info):
- #for p in range(len(predicted)):
- #original_text = inputs_text_test[indices[p]] # Access the original text using the index
- #print(f"Batch: [{p+1}/{ml.config.batch_size}] - Predicted: {le.inverse_transform([predicted[p].cpu()])[0]} - Expected: {le.inverse_transform([labels[p].cpu()])[0]}")
- #print(f"Batch: [{p+1}/{ml.config.batch_size}] - Predicted: {mlb.inverse_transform([predicted[p].cpu()])[0]} - Expected: {mlb.inverse_transform([labels[p].cpu()])[0]}")
- #test_iter += 1
- print("Finished testing model!")
- if (ml.config.enable_timing): end = ml.utils.timer()
- if (ml.config.enable_timing): print(f"Testing model took {ml.utils.elapsed_time(start, end)} seconds")
- def predict(input_text):
- #with torch.no_grad():
- if (ml.config.enable_timing): start = ml.utils.timer()
- print("Predicting using model...")
- new_text = vectorizer.transform([input_text])
- new_text = torch.tensor(new_text.toarray()).float().to(device)
- output = model(new_text)
- predicted = (torch.sigmoid(outputs) > ml.config.threshold).float()
- print("Finished predicting!")
- if (ml.config.enable_timing): end = ml.utils.timer()
- if (ml.config.enable_timing): print(f"Prediction took {ml.utils.elapsed_time(start, end)} seconds")
- return output
- # Prediction
- while True:
- prompt = input("\nPrompt: ")
- if (prompt.startswith("/")):
- if (prompt == "/exit"):
- print("Exitting...")
- exit()
- elif (prompt == "/device"):
- print(f"Device {device_type}")
- elif (prompt == "/graph"):
- ml.utils.show_graph(epoch_loss_list, epoch_acc_list, val_epoch_loss_list, val_epoch_acc_list)
- else:
- output = predict(prompt)
- #print("Output:", output.detach().numpy())
- #ml.utils.print_formatted_output(output)
- probabilities = ml.utils.output_to_probabilities(output)
- probabilities = torch.sigmoid(output)
- # Convert probabilities to numpy array for easier manipulation
- probabilities = probabilities.detach().cpu().numpy()
- # Apply threshold to get predicted labels
- predicted_labels = (probabilities > ml.config.threshold).astype(int)
- print("Predictions:")
- for i in range(len(labels[0])):
- prob = probabilities[0][i]*100
- print(f"{predicted_labels[0][i]} - {ml.utils.round_down(prob, ml.config.probabilities_round_precision)}")
- #sorted_probabilities, formatted_probabilities = ml.utils.sort_probabilities(probabilities, encoder=mlb) # Sorted probabilities and indices
- #print(f"Prediction: {next(iter(formatted_probabilities))} ({round(formatted_probabilities[next(iter(formatted_probabilities))], ml.config.probabilities_round_precision)})") # Print first element in probabilities dict
- #ml.utils.print_formatted_probabilities(formatted_probabilities)
Advertisement
Add Comment
Please, Sign In to add comment