Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # %%writefile load_data.py
- from transformers import BertTokenizer, BertForSequenceClassification, BatchEncoding, GPT2Tokenizer, AdamW
- from sklearn.utils.class_weight import compute_class_weight
- from torch.utils.data.distributed import DistributedSampler
- from torch.utils.data import DataLoader
- from model_builder import ReviewRatingClassifier
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional
- import torch.nn as nn
- import pandas as pd
- import numpy as np
- import datasets
- import torch
- import os
- NUM_WORKERS = os.cpu_count()
- @dataclass(frozen=True)
- class CreateDataset(torch.utils.data.Dataset):
- reviews: np.ndarray[str]
- labels: np.ndarray[int]
- tokenizer: BertTokenizer | GPT2Tokenizer
- max_len: int
- def __len__(self) -> int:
- return len(self.reviews)
- def __getitem__(self, item: int) -> Dict[str, torch.Tensor]:
- review = str(self.reviews[item])
- label = self.labels[item]
- encoding = self.tokenizer.encode_plus(
- review,
- add_special_tokens=True,
- max_length=self.max_len,
- return_token_type_ids=False,
- padding='max_length',
- truncation=True,
- return_attention_mask=True,
- return_tensors='pt',
- )
- return {
- # 'review_text': review,
- 'input_ids': encoding['input_ids'],
- 'attention_mask': encoding['attention_mask'],
- 'labels': torch.tensor(label, dtype=torch.long)
- }
- def create_datasets(x: List[str], y: List[int], tokenizer: BertTokenizer | GPT2Tokenizer) -> datasets.Dataset:
- # Assuming you have X_train and y_train as lists or numpy arrays
- data = {'review': x, 'label': y}
- df_train = pd.DataFrame(data)
- # Convert to Hugging Face datasets.Dataset
- _dataset = datasets.Dataset.from_pandas(df_train)
- # Repeat for validation data
- # data_val = {'review': x_val, 'label': y_val}
- # df_val = pd.DataFrame(data_val)
- # val_dataset = datasets.Dataset.from_pandas(df_val)
- # Tokenize the dataset using map
- def tokenize_function(example) -> BatchEncoding:
- return tokenizer(example['review'], padding="max_length", truncation=True, max_length=128)
- _dataset = _dataset.map(tokenize_function, batched=True)
- # val_dataset = val_dataset.map(tokenize_function, batched=True)
- # Remove columns that are not tensors (e.g., 'review' since Trainer expects tensors)
- _dataset = _dataset.remove_columns(['review'])
- # val_dataset = val_dataset.remove_columns(['review'])
- # Set format for PyTorch tensors
- _dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
- # val_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
- return _dataset
- # Custom learning rate scheduler
- @dataclass(frozen=True)
- class CyclicLRScheduler(torch.optim.lr_scheduler._LRScheduler):
- optimizer: torch.optim.Optimizer
- base_lr: float
- max_lr: float
- step_size_up: int = 2000
- step_size_down: int = None
- mode: str = 'triangular'
- gamma: float = 1.
- scale_fn = None
- scale_mode: str = 'cycle'
- cycle_momentum: bool = True
- base_momentum: float = 0.8
- max_momentum: float = 0.9
- last_epoch: int = -1
- # Use default_factory to create these lists when the instance is created
- base_lrs: List[float] = field(init=False)
- max_lrs: List[float] = field(init=False)
- base_momentums: List[float] = field(init=False)
- max_momentums: List[float] = field(init=False)
- total_size: int = field(init=False)
- def __post_init__(self):
- # We use __setattr__ because the class is frozen
- object.__setattr__(self, 'base_lrs', [self.base_lr] * len(self.optimizer.param_groups))
- object.__setattr__(self, 'max_lrs', [self.max_lr] * len(self.optimizer.param_groups))
- object.__setattr__(self, 'step_size_down',
- self.step_size_down if self.step_size_down is not None else self.step_size_up)
- object.__setattr__(self, 'total_size', self.step_size_up + self.step_size_down)
- object.__setattr__(self, 'base_momentums', [self.base_momentum] * len(self.optimizer.param_groups))
- object.__setattr__(self, 'max_momentums', [self.max_momentum] * len(self.optimizer.param_groups))
- @property
- def get_lr(self) -> List[float]:
- cycle = np.floor(1 + self.last_epoch / self.total_size)
- x = 1 + self.last_epoch / self.total_size - cycle
- if x <= 0.5:
- scale_factor = x * 2
- else:
- scale_factor = (1 - x) * 2
- lrs = []
- for base_lr, max_lr in zip(self.base_lrs, self.max_lrs):
- base_height = (max_lr - base_lr) * scale_factor
- lr = base_lr + base_height
- lrs.append(lr)
- return lrs
- def check_dataset_shapes(dataset: datasets.Dataset) -> None:
- for idx in range(len(dataset)):
- item = dataset[idx]
- assert item['input_ids'].shape[1] == 40, f"\nError: input_ids at index {idx} is not 40 in length"
- assert item['attention_mask'].shape[1] == 40, f"\nError: attention_mask at index {idx} is not 40 in length"
- def load_data_objs(
- batch_size: int,
- rank: int,
- world_size: int,
- epochs: int,
- x_train_path: str,
- y_train_path: str,
- x_val_path: str,
- y_val_path: str,
- gpu: bool,
- gpu_id: int,
- learning_rate: float,
- num_workers: int,
- lr_scheduler: Optional[str] = None,
- ) -> tuple[DataLoader, DataLoader, nn.Module, nn.CrossEntropyLoss, torch.optim.Optimizer, Optional[torch.optim.lr_scheduler._LRScheduler]]:
- def load_tensor(path: str, name: str) -> np.ndarray[str] | np.ndarray[int]:
- if not os.path.isfile(path):
- raise FileNotFoundError(f"{name} file not found: {path}")
- try:
- return np.load(path, allow_pickle=True)
- except Exception as er:
- raise RuntimeError(f"Error loading {name} from {path}: {str(er)}")
- try:
- xtrain = load_tensor(x_train_path, "X_train.npy")
- ytrain = load_tensor(y_train_path, "y_train.npy")
- xval = load_tensor(x_val_path, "X_val.npy")
- yval = load_tensor(y_val_path, "y_val.npy")
- except Exception as e:
- print(f"Error loading data: {str(e)}")
- raise
- # Ensure that the number of reviews matches the number of labels
- assert len(xtrain) == len(ytrain), "Mismatch between X_train and y_train lengths"
- assert len(xval) == len(yval), "Mismatch between X_val and y_val lengths"
- # tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', use_fast=True)
- tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
- tokenizer.pad_token = tokenizer.eos_token
- # Compute class weights
- class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(ytrain), y=ytrain)
- # model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=5)
- model: ReviewRatingClassifier = ReviewRatingClassifier(num_classes=5, unfreeze_layers=10)
- # optimizer = (torch.optim.AdamW(params=model.parameters(), lr=learning_rate, weight_decay=1e-4))
- optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-4)
- # train_dts: torch.utils.data.Dataset = CreateDataset(xtrain, ytrain, tokenizer, 128)
- # val_dts: torch.utils.data.Dataset = CreateDataset(xval, yval, tokenizer, 128)
- train_dts = create_datasets(xtrain, ytrain, tokenizer)
- val_dts = create_datasets(xval, yval, tokenizer)
- # check_dataset_shapes(train_dts)
- if gpu:
- criterion = nn.CrossEntropyLoss(weight=torch.tensor(class_weights).float().to(gpu_id))
- # criterion = nn.CrossEntropyLoss()
- train_dtl = DataLoader(train_dts, batch_size=batch_size, shuffle=False, pin_memory=True,
- sampler=DistributedSampler(
- train_dts, num_replicas=world_size, rank=rank), num_workers=num_workers, )
- val_dtl = DataLoader(val_dts, batch_size=1, shuffle=False, pin_memory=True, sampler=DistributedSampler(
- val_dts, num_replicas=world_size, rank=rank), num_workers=num_workers, )
- else:
- criterion = nn.CrossEntropyLoss(weight=torch.tensor(class_weights).float())
- train_dtl = DataLoader(train_dts, batch_size=batch_size,
- shuffle=False, pin_memory=True, num_workers=num_workers, )
- val_dtl = DataLoader(val_dts, batch_size=batch_size,
- shuffle=False, pin_memory=True, num_workers=num_workers, )
- scheduler = None
- if lr_scheduler:
- LR_SCHEDULER = {
- "cyclic_lr": CyclicLRScheduler(optimizer, base_lr=0.0001, max_lr=0.01, step_size_up=2000, mode='min'),
- # requires metric to step
- "reduce_lr": torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2),
- "one_cycle_lr": torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.01, epochs=epochs,
- steps_per_epoch=len(train_dtl), anneal_strategy='cos'),
- "cosine": torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5)
- }
- if lr_scheduler in LR_SCHEDULER:
- scheduler = LR_SCHEDULER[lr_scheduler]
- else:
- raise ValueError(f"""Invalid lr_scheduler value: {
- lr_scheduler}. Valid options are: {list(LR_SCHEDULER.keys())}""")
- return train_dtl, val_dtl, model, criterion, optimizer, scheduler
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement