import os import torch import torch.optim as optim from torch import nn import math from transformers import AutoTokenizer from typing import Any from huggingface_hub import hf_hub_download from safetensors.torch import load_file, save_file # For loading and saving the safetensors WEIGHTS_DIR = "weights" WEIGHTS_PATH = os.path.join(WEIGHTS_DIR, "model.safetensors") tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased") class Config(object): def __init__(self): self.n_heads = 12 self.n_layers = 6 self.pad_token_id = tokenizer.pad_token_id self.embedding_dim = 768 self.ffn_dim = 3072 self.max_position_embeddings = 512 self.vocab_size = 30522 self.eps = 1e-12 self.attention_head_size = self.embedding_dim // self.n_heads self.all_head_size = self.n_heads * self.attention_head_size self.n_classes = 1 self.device = 'cpu' config = Config() class DistilEmbeddings(nn.Module): def __init__(self, config): super().__init__() # Word and Position embeddings self.word_embeddings = nn.Embedding(config.vocab_size, config.embedding_dim, padding_idx=config.pad_token_id) self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.embedding_dim) # Layer Norm and Dropout self.LayerNorm = nn.LayerNorm(config.embedding_dim, eps=config.eps) self.dropout = nn.Dropout(0.1) def forward(self, input_ids: torch.Tensor) -> torch.Tensor: """ Parameters: - input_ids (torch.Tensor): torch.tensor(bs, max_seq_length) The token ids to embed. Returns: torch.tensor(bs, max_seq_length, hidden_size) The embedded tokens (plus position embeddings and no segment embeddings for our task). """ # 1. Word Embeddings input_embeds = self.word_embeddings(input_ids) # (bs, max_seq_length, dim) seq_length = input_embeds.size(1) # 2. Position Embeddings position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device) # (max_seq_length) position_ids = position_ids.unsqueeze(0).expand_as(input_ids) # (bs, max_seq_length) position_embeddings = self.position_embeddings(position_ids) # (bs, max_seq_length, dim) # 3. Combination embeddings = input_embeds + position_embeddings # 4. Normalization embeddings = self.LayerNorm(embeddings) # (bs, max_seq_length, dim) return self.dropout(embeddings) class MultiHeadSelfAttention(nn.Module): def __init__(self, config): super().__init__() self.heads = config.n_heads self.head_dim = config.attention_head_size self.q = nn.Linear(config.embedding_dim, config.embedding_dim, bias=True) self.k = nn.Linear(config.embedding_dim, config.embedding_dim, bias=True) self.v = nn.Linear(config.embedding_dim, config.embedding_dim, bias=True) self.out = nn.Linear(config.embedding_dim, config.embedding_dim, bias=True) self.drop = nn.Dropout(0.1) def forward(self, x, attn_mask=None): bs, s, embed_dim = x.shape q = self.q(x).view(bs, s, self.heads, self.head_dim).transpose(1, 2) # (bs, n_heads, q_length, dim_per_head) k = self.k(x).view(bs, s, self.heads, self.head_dim).transpose(1, 2) v = self.v(x).view(bs, s, self.heads, self.head_dim).transpose(1, 2) scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim) # (bs, n_heads, q_length, k_length) if attn_mask is not None: if attn_mask.dtype != torch.bool: attn_mask = attn_mask.bool() attn_mask = attn_mask.unsqueeze(1).unsqueeze(2) # (B,1,1,S) scores = scores.masked_fill(~attn_mask, -1e9) weights = torch.softmax(scores, dim=-1) # (bs, n_heads, q_length, k_length) weights = self.drop(weights) context = torch.matmul(weights,v ) # (bs, n_heads, q_length, dim_per_head) context = context.transpose(1, 2).contiguous().view(bs, s, embed_dim) # (bs, q_length, dim) return self.drop(self.out(context)) class FeedForward(nn.Module): def __init__(self, config): super().__init__() self.fc1 = nn.Linear(config.embedding_dim, config.ffn_dim) self.fc2 = nn.Linear(config.ffn_dim, config.embedding_dim) self.drop = nn.Dropout(0.1) self.act = nn.GELU() def forward(self, x): x = self.fc1(x) x = self.act(x) x = self.drop(x) x = self.fc2(x) x = self.drop(x) return x class TransformerBlock(nn.Module): def __init__(self, config): super().__init__() self.attn = MultiHeadSelfAttention(config) self.attn_ln = nn.LayerNorm(config.embedding_dim, eps=config.eps) self.ffn = FeedForward(config) self.ffn_ln = nn.LayerNorm(config.embedding_dim, eps=config.eps) def forward(self, x, attn_mask=None): # 1. Attention Sub-layer (Input + Attention(Input) -> LayerNorm) attn_output = self.attn(x, attn_mask) x = self.attn_ln(x + attn_output) # Residual + LayerNorm # 2. FFN Sub-layer (Input + FFN(Intermediate) -> LayerNorm) ffn_output = self.ffn(x) x = self.ffn_ln(x + ffn_output) # Residual + LayerNorm return x class DistilBertEncoder(nn.Module): def __init__(self, config): super().__init__() self.config = config self.emb = DistilEmbeddings(config) self.layers = nn.ModuleList( [TransformerBlock(config) for _ in range(config.n_layers)] ) def forward(self, input_ids, attn_mask=None): x = self.emb(input_ids) for blk in self.layers: x = blk(x, attn_mask) return x # (bs, s, 768) class Model(nn.Module): def __init__(self, encoder, config): super().__init__() self.encoder = encoder self.drop = nn.Dropout(0.1) self.out = nn.Linear(config.embedding_dim, config.n_classes) def forward(self, input_ids, attn_mask=None): h = self.encoder(input_ids, attn_mask) cls = h[:, 0, :] # CLS token # Output are raw scores/predictions, no final activation (like sigmoid) needed # for the BCE loss. return self.out(self.drop(cls)) # scores (bs, n_classes) # --- WEIGHT MAPPING FUNCTION --- def map_hf_to_custom_keys(model: nn.Module, pretrained_state_dict: dict): """ Maps keys from HuggingFace DistilBERT state_dict to our custom DistilBertEncoder. """ mapped_weights = {} config = model.config # Debug: print available HF keys (now removed for clarity) # print("Available HF keys:", [k for k in pretrained_state_dict.keys() if 'embedding' in k]) # 1. Embeddings Mapping emb_mapping = { 'emb.word_embeddings': 'distilbert.embeddings.word_embeddings', 'emb.position_embeddings': 'distilbert.embeddings.position_embeddings', 'emb.LayerNorm': 'distilbert.embeddings.LayerNorm', } for custom_prefix, hf_prefix in emb_mapping.items(): for suffix in ['.weight', '.bias']: hf_key = hf_prefix + suffix custom_key = custom_prefix + suffix if hf_key in pretrained_state_dict: mapped_weights[custom_key] = pretrained_state_dict[hf_key] # Save the weight/bias values under custom key # 2. Transformer Blocks Mapping (all 6 layers) for i in range(config.n_layers): # Attention for hf_key, custom_key in [('q_lin', 'q'), ('k_lin', 'k'), ('v_lin', 'v'), ('out_lin', 'out')]: for suffix in ['.weight', '.bias']: hf_full_key = f'distilbert.transformer.layer.{i}.attention.{hf_key}{suffix}' custom_full_key = f'layers.{i}.attn.{custom_key}{suffix}' if hf_full_key in pretrained_state_dict: mapped_weights[custom_full_key] = pretrained_state_dict[hf_full_key] # Save the weight/bias values under custom key # FFN (FeedForward) for hf_key, custom_key in [('lin1', 'fc1'), ('lin2', 'fc2')]: for suffix in ['.weight', '.bias']: hf_full_key = f'distilbert.transformer.layer.{i}.ffn.{hf_key}{suffix}' custom_full_key = f'layers.{i}.ffn.{custom_key}{suffix}' if hf_full_key in pretrained_state_dict: mapped_weights[custom_full_key] = pretrained_state_dict[hf_full_key] # Save the weight/bias values under custom key # LayerNorms for ln_name, hf_ln_prefix in [('attn_ln', 'sa_layer_norm'), ('ffn_ln', 'output_layer_norm')]: for suffix in ['.weight', '.bias']: hf_full_key = f'distilbert.transformer.layer.{i}.{hf_ln_prefix}{suffix}' custom_full_key = f'layers.{i}.{ln_name}{suffix}' if hf_full_key in pretrained_state_dict: mapped_weights[custom_full_key] = pretrained_state_dict[hf_full_key] # Save the weight/bias values under custom key return mapped_weights # --- WEIGHT INITIALIZATION FUNCTION --- def initialize_encoder_with_weights(encoder: nn.Module): """ Initializes the custom encoder: - Tries to load local safetensors from WEIGHTS_PATH - If missing, downloads with huggingface_hub, saves to safetensors, then loads - Maps HF keys -> our keys and loads with strict=False """ os.makedirs(WEIGHTS_DIR, exist_ok=True) # 1) Try local safetensors first pretrained_state_dict = None if os.path.exists(WEIGHTS_PATH): try: print(f"[weights] Loading local safetensors: {WEIGHTS_PATH}") pretrained_state_dict = load_file(WEIGHTS_PATH) # dict[str, Tensor] except Exception as e: print(f"[weights][WARN] Could not load local file ({e}). Will try downloading.") # 2) Download with huggingface_hub if not found locally if pretrained_state_dict is None: try: print("[weights] Downloading DistilBERT base uncased via huggingface_hub...") # 'distilbert-base-uncased' stores weights in 'model.safetensors' hf_file = hf_hub_download( repo_id="distilbert-base-uncased", filename="model.safetensors" ) pretrained_state_dict = load_file(hf_file) # HF keys -> Tensors # Persist for next runs try: save_file(pretrained_state_dict, WEIGHTS_PATH) print(f"[weights] Saved to {WEIGHTS_PATH}") except Exception as e_save: print(f"[weights][WARN] Could not cache weights to {WEIGHTS_PATH}: {e_save}") except Exception as e: print(f"[weights][FATAL] Failed to obtain pretrained weights: {e}") print("[weights] Aborting process.") exit(0) # 3) Map keys and load try: mapped = map_hf_to_custom_keys(encoder, pretrained_state_dict) incompatible = encoder.load_state_dict(mapped, strict=False) # Fill in what it can match, ignore the rest if hasattr(incompatible, "missing_keys") and hasattr(incompatible, "unexpected_keys"): print("[load] Missing keys:", incompatible.missing_keys) print("[load] Unexpected keys:", incompatible.unexpected_keys) print("[load] Pre-trained DistilBERT encoder weights successfully loaded.") except Exception as e: print(f"[load][FATAL] Loading mapped weights failed: {e}") print("[load] Aborting process.") exit(0) # 1. INPUT DATA SIMULATOR (Will be replaced by our load_data function later) def data_simulator_generator(comments: list, tokenizer: Any, max_len: int, n_classes: int): """ SIMULATOR: Yields tokenized data dictionaries and random regression labels (0.0 to 1.0). This mimics the required output of the load_data generator. """ for comment in comments: # Simulate tokenization using the requested parameters tokenized_output = tokenizer( comment, max_length=max_len, padding='max_length', truncation=True, return_tensors='pt' ) # Extract inputs (x_i) # Squeeze removes the batch dimension: (1, L) -> (L) x_i = {k: v.squeeze(0) for k, v in tokenized_output.items()} # Simulate labels (y_i): 6 random toxicity scores (0.0 to 1.0) y_i = torch.rand(n_classes) yield x_i, y_i # 2. TRAINING LOOP PROTOTYPE def prototype_train_step(model, loss_fn, optimizer, data_gen): """ Executes a single step to demonstrate the model's runnability. """ model.train() # 1. Load Batch try: x, labels = next(data_gen) except StopIteration: print("Simulator exhausted.") return None # Add batch dimension (B=1) for model input: (L) -> (1, L) input_ids = x['input_ids'].unsqueeze(0) attn_mask = x['attention_mask'].unsqueeze(0) labels = labels.unsqueeze(0) # 2. Forward Pass optimizer.zero_grad() predictions = model(input_ids, attn_mask) # 3. Calculate Loss (MSE for regression) loss = loss_fn(predictions, labels) # 4. Backward Pass and Update loss.backward() optimizer.step() print(f"Prototype training step completed (Regression/MSE). Loss: {loss.item():.6f}") return loss.item() # 3. EXECUTION OF PROTOTYPE # Simulate comments taken from train.csv SAMPLE_COMMENTS = [ "I hope you die a miserable death.", "This movie was good.", "Why don't you people just leave?", "That is so stupid it makes me angry.", "Wow I'm gonna be sick.", ] # Initialize Data Simulator data_generator = data_simulator_generator( comments=SAMPLE_COMMENTS, tokenizer=tokenizer, max_len=128, n_classes=config.n_classes ) # Initialize Encoder and Load pretrained Weights encoder = DistilBertEncoder(config) initialize_encoder_with_weights(encoder) # Initialize Model model = Model(encoder, config) # print(model) # Initialize Loss (Binary Cross-Entropy with Logits) and Optimizer (Adam) loss_function = nn.BCEWithLogitsLoss() optimizer = optim.Adam(model.parameters(), lr=1e-4) # Print Model Summary for name, param in model.named_parameters(): # print(f"{name:60s} {tuple(param.shape)}") pass # Print total and trainable parameters num_params = sum(p.numel() for p in model.parameters()) trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) print(f"Total params: {num_params:,}, Trainable: {trainable:,}") # Run the Prototype Training Step print("--- Running Prototype Train Step ---") prototype_train_step(model, loss_function, optimizer, data_generator) print("------------------------------------")