Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import torch
- import torch.nn as nn
- import torch.optim as optim
- from torch.utils.data import DataLoader, Dataset
- from sklearn.preprocessing import LabelEncoder
- from sklearn.model_selection import train_test_split
- # Carica i dati
- data = pd.read_csv(r"C:/Users/crucillf/OneDrive - STMicroelectronics/Documents/CSV/329/dsrv.rule.diags.aut.2024-02-01_03_55_56.506428.csv")
- # Stampa i nomi delle colonne per verificare
- print("Colonne del file CSV:", data.columns)
- # La colonna da visualizzare è 'P'
- column_to_display = 'P'
- # Assicurati che la colonna esista nel DataFrame
- if column_to_display in data.columns:
- # Visualizza la colonna
- print(data[column_to_display])
- else:
- print(f"Errore: '{column_to_display}' non esiste nel DataFrame.")
- exit()
- # Prepara i dati per l'addestramento
- class LogDataset(Dataset):
- def __init__(self, logs, labels):
- self.logs = logs
- self.labels = labels
- def __len__(self):
- return len(self.logs)
- def __getitem__(self, idx):
- log = self.logs[idx]
- label = self.labels[idx]
- return log, label
- logs = data['P'].astype(str).tolist() # Converti tutti i valori in stringhe
- labels = data['P'].astype(str).tolist() # Converti tutti i valori in stringhe
- # Codifica le etichette
- label_encoder = LabelEncoder()
- labels = label_encoder.fit_transform(labels)
- # Dividi i dati in training e test set
- logs_train, logs_test, labels_train, labels_test = train_test_split(logs, labels, test_size=0.2, random_state=42)
- # Trova la lunghezza massima delle sequenze
- max_length = max(len(log) for log in logs)
- # Funzione per pad le sequenze
- def pad_sequence(seq, max_length):
- return seq + ' ' * (max_length - len(seq))
- # Pad le sequenze nei dataset
- logs_train = [pad_sequence(log, max_length) for log in logs_train]
- logs_test = [pad_sequence(log, max_length) for log in logs_test]
- # Crea un set di tutti i caratteri unici
- unique_chars = set(''.join(logs_train + logs_test))
- char_to_index = {char: idx for idx, char in enumerate(unique_chars)}
- # Crea i dataset e i dataloader
- train_dataset = LogDataset(logs_train, labels_train)
- test_dataset = LogDataset(logs_test, labels_test) # Correzione: aggiunto labels_test
- train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
- test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)
- # Definisci il modello RNN
- class RNNModel(nn.Module):
- def __init__(self, input_size, hidden_size, output_size, num_layers=1):
- super(RNNModel, self).__init__()
- self.hidden_size = hidden_size
- self.num_layers = num_layers
- self.embedding = nn.Embedding(input_size, hidden_size)
- self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
- self.fc = nn.Linear(hidden_size, output_size)
- def forward(self, x):
- x = self.embedding(x)
- h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
- c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
- out, _ = self.lstm(x, (h0, c0))
- out = self.fc(out[:, -1, :])
- return out
- # Parametri del modello
- input_size = len(unique_chars) # Numero di caratteri unici
- hidden_size = 64
- output_size = len(set(labels))
- num_layers = 1
- # Inizializza il modello, la loss function e l'optimizer
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- model = RNNModel(input_size, hidden_size, output_size, num_layers).to(device)
- criterion = nn.CrossEntropyLoss()
- optimizer = optim.AdamW(model.parameters(), lr=0.001)
- # Addestramento del modello
- model.train()
- num_epochs = 3 # Numero di epoche
- for epoch in range(num_epochs):
- epoch_loss = 0
- for logs, labels in train_dataloader:
- logs = torch.tensor([[char_to_index[char] for char in log] for log in logs], dtype=torch.long).to(device)
- labels = torch.tensor(labels, dtype=torch.long).clone().detach().to(device)
- optimizer.zero_grad()
- outputs = model(logs)
- loss = criterion(outputs, labels)
- loss.backward()
- optimizer.step()
- epoch_loss += loss.item()
- avg_loss = epoch_loss / len(train_dataloader)
- print(f'Epoca [{epoch+1}/{num_epochs}], Perdita: {avg_loss:.4f}')
- # Valutazione del modello
- model.eval()
- correct = 0
- total = 0
- with torch.no_grad():
- for logs, labels in test_dataloader:
- logs = torch.tensor([[char_to_index[char] for char in log] for log in logs], dtype=torch.long).to(device)
- labels = torch.tensor(labels, dtype=torch.long).clone().detach().to(device)
- outputs = model(logs)
- _, predicted = torch.max(outputs.data, 1)
- total += labels.size(0)
- correct += (predicted == labels).sum().item()
- print(f'Accuracy: {100 * correct / total}%')
- # Filtra i dati a partire dal 12 gennaio 2024 e con soglia minima di 1.0
- data['P'] = pd.to_numeric(data['P'], errors='coerce')
- data['Date'] = pd.to_datetime(data['P'], errors='coerce') # Usa la colonna 'P' come data
- start_date = pd.to_datetime('2024-01-12')
- filtered_data = data[(data['Date'] >= start_date) & (data['P'] > 1.0)]
- # Crea un nuovo DataFrame con i dati filtrati
- filtered_data = filtered_data[['P']]
- # Prepara i nuovi log per la predizione
- new_logs = filtered_data['P'].astype(str).tolist()
- new_logs = [pad_sequence(log, max_length) for log in new_logs]
- # Gestione dei caratteri sconosciuti
- def char_to_index_safe(char):
- return char_to_index.get(char, char_to_index[' ']) # Sostituisci con spazio se il carattere non è trovato
- new_logs_tensor = torch.tensor([[char_to_index_safe(char) for char in log] for log in new_logs], dtype=torch.long).to(device)
- # Aggiungi una dimensione batch se necessario
- if new_logs_tensor.dim() == 2:
- new_logs_tensor = new_logs_tensor.unsqueeze(1) # Aggiungi una dimensione batch
- outputs = model(new_logs_tensor)
- predictions = torch.argmax(outputs, dim=1)
- print(predictions)
Advertisement
Add Comment
Please, Sign In to add comment