Spaces:
Runtime error
Runtime error
| # Description: Classification models | |
| from transformers import AutoModel, AutoTokenizer, BatchEncoding, TrainingArguments, Trainer | |
| from functools import partial | |
| from huggingface_hub import snapshot_download | |
| from huggingface_hub.constants import HF_HUB_CACHE | |
| from accelerate import Accelerator | |
| from accelerate.utils import find_executable_batch_size as auto_find_batch_size | |
| from datasets import load_dataset, Dataset | |
| from torch.utils.data import DataLoader | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import numpy as np | |
| import json | |
| import os | |
| from tqdm import tqdm | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from sklearn.metrics import ( | |
| ConfusionMatrixDisplay, | |
| accuracy_score, | |
| classification_report, | |
| confusion_matrix, | |
| f1_score, | |
| recall_score | |
| ) | |
| BASE_PATH = os.path.dirname(os.path.abspath(__file__)) | |
| class MultiHeadClassification(nn.Module): | |
| """ | |
| MultiHeadClassification | |
| An easy to use multi-head classification model. It takes a backbone model and a dictionary of head configurations. | |
| It can be used to train multiple classification tasks at once using a single backbone model. | |
| Apart from joint training, it also supports training individual heads separately, providing a simple way to freeze | |
| and unfreeze heads. | |
| Example: | |
| >>> from transformers import AutoModel, AutoTokenizer | |
| >>> from torch.optim import AdamW | |
| >>> import torch | |
| >>> import time | |
| >>> import torch.nn as nn | |
| >>> | |
| >>> # Manually load backbone model to create model | |
| >>> backbone = AutoModel.from_pretrained('BAAI/bge-m3') | |
| >>> model = MultiHeadClassification(backbone, {'binary': 2, 'sentiment': 3, 'something': 4}).to('cuda') | |
| >>> print(model) | |
| >>> # Load tokenizer for data preprocessing | |
| >>> tokenizer = AutoTokenizer.from_pretrained('BAAI/bge-m3') | |
| >>> # some training data | |
| >>> inputs = tokenizer("Hello, my dog is cute", return_tensors="pt", padding=True, truncation=True) | |
| >>> optimizer = AdamW(model.parameters(), lr=5e-4) | |
| >>> samples = tokenizer(["Hello, my dog is cute", "Hello, my dog is cute", "I like turtles"], return_tensors="pt", padding=True, truncation=True).to('cuda') | |
| >>> labels = {'binary': torch.tensor([0, 0, 1]), 'sentiment': torch.tensor([0, 1, 2]), 'something': torch.tensor([0, 1, 2])} | |
| >>> model.freeze_backbone() | |
| >>> model.train(True) | |
| >>> for i in range(10): | |
| ... optimizer.zero_grad() | |
| ... outputs = model(samples) | |
| ... loss = sum([nn.CrossEntropyLoss()(outputs[name].cpu(), labels[name]) for name in model.heads.keys()]) | |
| ... loss.backward() | |
| ... optimizer.step() | |
| ... print(loss.item()) | |
| ... #time.sleep(1) | |
| ... print(model(samples)) | |
| >>> # Save full model | |
| >>> model.save('model.pth') | |
| >>> # Save head only | |
| >>> model.save_head('binary', 'binary.pth') | |
| >>> # Load full model | |
| >>> model = MultiHeadClassification(backbone, {}).to('cuda') | |
| >>> model.load('model.pth') | |
| >>> # Load head only | |
| >>> model = MultiHeadClassification(backbone, {}).to('cuda') | |
| >>> model.load_head('binary', 'binary.pth') | |
| >>> # Adding new head | |
| >>> model.add_head('new_head', 3) | |
| >>> print(model) | |
| >>> # extend dataset with data for new head | |
| >>> labels['new_head'] = torch.tensor([0, 1, 2]) | |
| >>> # Freeze all heads and backbone | |
| >>> model.freeze_all() | |
| >>> # Only unfreeze new head | |
| >>> model.unfreeze_head('new_head') | |
| >>> model.train(True) | |
| >>> for i in range(10): | |
| ... optimizer.zero_grad() | |
| ... outputs = model(samples) | |
| ... loss = sum([nn.CrossEntropyLoss()(outputs[name].cpu(), labels[name]) for name in model.heads.keys()]) | |
| ... loss.backward() | |
| ... optimizer.step() | |
| ... print(loss.item()) | |
| >>> print(model(samples)) | |
| Args: | |
| backbone (transformers.PreTrainedModel): A pretrained transformer model | |
| head_config (dict): A dictionary with head configurations. The key is the head name and the value is the number | |
| of classes for that head. | |
| """ | |
| def __init__(self, backbone, head_config, dropout=0.1, l2_reg=0.01): | |
| super().__init__() | |
| self.backbone = backbone | |
| self.num_heads = len(head_config) | |
| self.heads = nn.ModuleDict({ | |
| name: nn.Linear(backbone.config.hidden_size, num_classes) | |
| for name, num_classes in head_config.items() | |
| }) | |
| self.do = nn.Dropout(dropout) | |
| self.l2_reg = l2_reg | |
| self.device = 'cpu' | |
| self.torch_dtype = torch.float16 | |
| self.head_config = head_config | |
| def forward(self, x, head_names=None) -> dict: | |
| """ | |
| Forward pass of the model. | |
| Requires tokenizer output as input. The input should be a dictionary with keys 'input_ids', 'attention_mask'. | |
| Args: | |
| x (dict): Tokenizer output | |
| head_names (list): (optional) List of head names to return logits for. If None, returns logits for all heads. | |
| Returns: | |
| dict: A dictionary with head names as keys and logits as values | |
| """ | |
| x = self.backbone(**x, return_dict=True, output_hidden_states=True).last_hidden_state[:, 0, :] | |
| x = self.do(x) | |
| if head_names is None: | |
| return {name: head(x) for name, head in self.heads.items()} | |
| return {name: head(x) for name, head in self.heads.items() if name in head_names} | |
| def get_l2_loss(self): | |
| """ | |
| Getter for L2 regularization loss | |
| Returns: | |
| torch.Tensor: L2 regularization loss | |
| """ | |
| l2_loss = torch.tensor(0.).to(self.device) | |
| for param in self.parameters(): | |
| if param.requires_grad: | |
| l2_loss += torch.norm(param, 2) | |
| return (self.l2_reg * l2_loss).to(self.device) | |
| def to(self, *args, **kwargs): | |
| super().to(*args, **kwargs) | |
| if isinstance(args[0], torch.dtype): | |
| self.torch_dtype = args[0] | |
| elif isinstance(args[0], str): | |
| self.device = args[0] | |
| return self | |
| def load_head(self, head_name, path): | |
| """ | |
| Load head from a file | |
| Args: | |
| head_name (str): Name of the head | |
| path (str): Path to the file | |
| Returns: | |
| None | |
| """ | |
| model = torch.load(path, map_location=self.device) | |
| if head_name in self.heads: | |
| num_classes = model['weight'].shape[0] | |
| self.heads[head_name].load_state_dict(model) | |
| self.to(self.torch_dtype).to(self.device) | |
| self.head_config[head_name] = num_classes | |
| return | |
| assert model['weight'].shape[1] == self.backbone.config.hidden_size | |
| num_classes = model['weight'].shape[0] | |
| self.heads[head_name] = nn.Linear(self.backbone.config.hidden_size, num_classes) | |
| self.heads[head_name].load_state_dict(model) | |
| self.head_config[head_name] = num_classes | |
| self.to(self.torch_dtype).to(self.device) | |
| def save_head(self, head_name, path): | |
| """ | |
| Save head to a file | |
| Args: | |
| head_name (str): Name of the head | |
| path (str): Path to the file | |
| """ | |
| torch.save(self.heads[head_name].state_dict(), path) | |
| def save(self, path): | |
| """ | |
| Save the full model to a file | |
| Args: | |
| path (str): Path to the file | |
| """ | |
| torch.save(self.state_dict(), path) | |
| def load(self, path): | |
| """ | |
| Load the full model from a file | |
| Args: | |
| path (str): Path to the file | |
| """ | |
| self.load_state_dict(torch.load(path, map_location=self.device)) | |
| self.to(self.torch_dtype).to(self.device) | |
| def save_backbone(self, path): | |
| """ | |
| Save the backbone to a file | |
| Args: | |
| path (str): Path to the file | |
| """ | |
| self.backbone.save_pretrained(path) | |
| def load_backbone(self, path): | |
| """ | |
| Load the backbone from a file | |
| Args: | |
| path (str): Path to the file | |
| """ | |
| self.backbone = AutoModel.from_pretrained(path) | |
| self.to(self.torch_dtype).to(self.device) | |
| def freeze_backbone(self): | |
| """ Freeze the backbone """ | |
| for param in self.backbone.parameters(): | |
| param.requires_grad = False | |
| def unfreeze_backbone(self): | |
| """ Unfreeze the backbone """ | |
| for param in self.backbone.parameters(): | |
| param.requires_grad = True | |
| def freeze_head(self, head_name): | |
| """ | |
| Freeze a head by name | |
| Args: | |
| head_name (str): Name of the head | |
| """ | |
| for param in self.heads[head_name].parameters(): | |
| param.requires_grad = False | |
| def unfreeze_head(self, head_name): | |
| """ | |
| Unfreeze a head by name | |
| Args: | |
| head_name (str): Name of the head | |
| """ | |
| for param in self.heads[head_name].parameters(): | |
| param.requires_grad = True | |
| def freeze_all_heads(self): | |
| """ Freeze all heads """ | |
| for head_name in self.heads.keys(): | |
| self.freeze_head(head_name) | |
| def unfreeze_all_heads(self): | |
| """ Unfreeze all heads """ | |
| for head_name in self.heads.keys(): | |
| self.unfreeze_head(head_name) | |
| def freeze_all(self): | |
| """ Freeze all """ | |
| self.freeze_backbone() | |
| self.freeze_all_heads() | |
| def unfreeze_all(self): | |
| """ Unfreeze all """ | |
| self.unfreeze_backbone() | |
| self.unfreeze_all_heads() | |
| def add_head(self, head_name, num_classes): | |
| """ | |
| Add a new head to the model | |
| Args: | |
| head_name (str): Name of the head | |
| num_classes (int): Number of classes for the head | |
| """ | |
| self.heads[head_name] = nn.Linear(self.backbone.config.hidden_size, num_classes) | |
| self.heads[head_name].to(self.torch_dtype).to(self.device) | |
| self.head_config[head_name] = num_classes | |
| def remove_head(self, head_name): | |
| """ | |
| Remove a head from the model | |
| """ | |
| if head_name not in self.heads: | |
| raise ValueError(f'Head {head_name} not found') | |
| del self.heads[head_name] | |
| del self.head_config[head_name] | |
| def from_pretrained(cls, model_name, head_config=None, dropout=0.1, l2_reg=0.01): | |
| """ | |
| Load a pretrained model from Huggingface model hub | |
| Args: | |
| model_name (str): Name of the model | |
| head_config (dict): Head configuration | |
| dropout (float): Dropout rate | |
| l2_reg (float): L2 regularization rate | |
| """ | |
| if head_config is None: | |
| head_config = {} | |
| # check if model exists locally | |
| hf_cache_dir = HF_HUB_CACHE | |
| model_path = os.path.join(hf_cache_dir, model_name) | |
| if os.path.exists(model_path): | |
| return cls._from_directory(model_path, head_config, dropout, l2_reg) | |
| model_path = snapshot_download(repo_id=model_name, cache_dir=hf_cache_dir) | |
| return cls._from_directory(model_path, head_config, dropout, l2_reg) | |
| def _from_directory(cls, model_path, head_config, dropout=0.1, l2_reg=0.01): | |
| """ | |
| Load a model from a directory | |
| Args: | |
| model_path (str): Path to the model directory | |
| head_config (dict): Head configuration | |
| dropout (float): Dropout rate | |
| l2_reg (float): L2 regularization rate | |
| """ | |
| backbone = AutoModel.from_pretrained(os.path.join(model_path, 'pretrained/backbone')) | |
| instance = cls(backbone, head_config, dropout, l2_reg) | |
| instance.load(os.path.join(model_path, 'multi-head-sequence-classification-model-model.pth')) | |
| instance.head_config = {k: v.weight.shape[1] for k, v in instance.heads.items()} | |
| return instance | |