Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import math
- from typing import List, Optional
- from torch.optim import Optimizer
- from torch.optim.lr_scheduler import LRScheduler
- class CosineAnnealingWarmupRestarts(LRScheduler):
- r"""CosineAnnealingWarmupRestarts.
- :param optimizer: Optimizer. wrapped optimizer instance.
- :param first_cycle_steps: int. first cycle step size.
- :param cycle_mult: float. cycle steps magnification.
- :param max_lr: float.
- :param min_lr: float.
- :param warmup_steps: int. number of warmup steps.
- :param gamma: float. decrease rate of lr by cycle.
- :param last_epoch: int. step size of the current cycle.
- """
- def __init__(
- self,
- optimizer: Optimizer,
- first_cycle_steps: int,
- cycle_mult: float = 1.0,
- max_lr: float = 1e-4,
- min_lr: float = 1e-6,
- warmup_steps: int = 0,
- gamma: float = 0.9,
- last_epoch: int = -1,
- ):
- if warmup_steps >= first_cycle_steps:
- raise ValueError(
- f'[-] warmup_steps must be smaller than first_cycle_steps. {warmup_steps} < {first_cycle_steps}'
- )
- self.first_cycle_steps = first_cycle_steps
- self.cycle_mult = cycle_mult
- self.base_max_lr = max_lr
- self.max_lr = max_lr
- self.min_lr = min_lr
- self.warmup_steps = warmup_steps
- self.gamma = gamma
- self.cur_cycle_steps = first_cycle_steps
- self.step_in_cycle = last_epoch
- self.last_epoch = last_epoch
- self.cycle: int = 0
- self.base_lrs: List[float] = []
- super().__init__(optimizer, last_epoch)
- self.init_lr()
- def init_lr(self) -> None:
- self.base_lrs = []
- for param_group in self.optimizer.param_groups:
- param_group['lr'] = self.min_lr
- self.base_lrs.append(self.min_lr)
- def get_lr(self) -> List[float]:
- if self.step_in_cycle == -1:
- return self.base_lrs
- if self.step_in_cycle < self.warmup_steps:
- return [
- (self.max_lr - base_lr) * self.step_in_cycle / self.warmup_steps + base_lr for base_lr in self.base_lrs
- ]
- return [
- base_lr
- + (self.max_lr - base_lr)
- * (
- 1
- + math.cos(
- math.pi * (self.step_in_cycle - self.warmup_steps) / (self.cur_cycle_steps - self.warmup_steps)
- )
- )
- / 2.0
- for base_lr in self.base_lrs
- ]
- def step(self, epoch: Optional[int] = None):
- if epoch is None:
- epoch = self.last_epoch + 1
- self.step_in_cycle = self.step_in_cycle + 1
- if self.step_in_cycle >= self.cur_cycle_steps:
- self.cycle += 1
- self.step_in_cycle = self.step_in_cycle - self.cur_cycle_steps
- self.cur_cycle_steps = (
- int((self.cur_cycle_steps - self.warmup_steps) * self.cycle_mult) + self.warmup_steps
- )
- elif epoch >= self.first_cycle_steps:
- if self.cycle_mult == 1.0:
- self.step_in_cycle = epoch % self.first_cycle_steps
- self.cycle = epoch // self.first_cycle_steps
- else:
- n: int = int(math.log((epoch / self.first_cycle_steps * (self.cycle_mult - 1) + 1), self.cycle_mult))
- self.cycle = n
- self.step_in_cycle = epoch - int(
- self.first_cycle_steps * (self.cycle_mult ** n - 1) / (self.cycle_mult - 1)
- ) # fmt: skip
- self.cur_cycle_steps = self.first_cycle_steps * self.cycle_mult ** n # fmt: skip
- else:
- self.cur_cycle_steps = self.first_cycle_steps
- self.step_in_cycle = epoch
- self.max_lr = self.base_max_lr * (self.gamma ** self.cycle) # fmt: skip
- self.last_epoch = math.floor(epoch)
- lrs = self.get_lr()
- for param_group, lr in zip(self.optimizer.param_groups, lrs):
- param_group['lr'] = lr
- self._last_lr = lrs
- def get_last_lr(self) -> List[float]:
- # Если _last_lr еще не установлен — вернуть расчетный lr
- if hasattr(self, '_last_lr'):
- return self._last_lr
- else:
- return self.get_lr()
- class CosineAnnealingWarmupRestartsPD(LRScheduler):
- r"""CosineAnnealingWarmupRestarts with per-param-group LR scaling.
- Каждая группа параметров начинает с собственного начального lr,
- и scheduler понижает/повышает её относительно этих значений.
- """
- def __init__(
- self,
- optimizer: Optimizer,
- first_cycle_steps: int,
- cycle_mult: float = 1.0,
- max_lr: float = 1e-4, # глобальный максимум для самой быстрой группы
- min_lr: float = 1e-6, # глобальный минимум для самой быстрой группы
- warmup_steps: int = 0,
- gamma: float = 0.9,
- last_epoch: int = -1,
- ):
- if warmup_steps >= first_cycle_steps:
- raise ValueError(
- f"warmup_steps must be smaller than first_cycle_steps ({warmup_steps} < {first_cycle_steps})"
- )
- self.first_cycle_steps = first_cycle_steps
- self.cycle_mult = cycle_mult
- self.global_base_max_lr = max_lr
- self.global_min_lr = min_lr
- self.warmup_steps = warmup_steps
- self.gamma = gamma
- self.cur_cycle_steps = first_cycle_steps
- self.step_in_cycle = last_epoch
- self.last_epoch = last_epoch
- self.cycle = 0
- # Сохраним начальные lrs для каждой группы
- self.init_base_lrs = [pg["lr"] for pg in optimizer.param_groups]
- # Вычислим коэффициенты для масштабирования каждой группы
- # Самая быстрая группа == max_lr, остальные пропорционально
- max_init_lr = max(self.init_base_lrs)
- self.scale_factors = [init_lr / max_init_lr for init_lr in self.init_base_lrs]
- super().__init__(optimizer, last_epoch)
- def get_lr(self) -> List[float]:
- """Вычисляем LR для каждой группы, сохраняя пропорцию относительно изначальных значений."""
- if self.step_in_cycle == -1:
- return self.init_base_lrs
- # Определяем текущий глобальный максимум (для самой быстрой группы)
- current_global_max = self.global_base_max_lr * (self.gamma ** self.cycle)
- current_global_min = self.global_min_lr
- lrs = []
- for scale in self.scale_factors:
- # Персональные min/max для этой группы
- group_max = current_global_max * scale
- group_min = current_global_min * scale
- if self.step_in_cycle < self.warmup_steps:
- # Линейный разгон
- lr = group_min + (group_max - group_min) * self.step_in_cycle / self.warmup_steps
- else:
- # Косинусное затухание
- lr = group_min + (group_max - group_min) * (
- 1 + math.cos(
- math.pi * (self.step_in_cycle - self.warmup_steps)
- / (self.cur_cycle_steps - self.warmup_steps)
- )
- ) / 2.0
- lrs.append(lr)
- return lrs
- def step(self, epoch: Optional[int] = None):
- if epoch is None:
- epoch = self.last_epoch + 1
- self.step_in_cycle += 1
- if self.step_in_cycle >= self.cur_cycle_steps:
- self.cycle += 1
- self.step_in_cycle -= self.cur_cycle_steps
- self.cur_cycle_steps = (
- int((self.cur_cycle_steps - self.warmup_steps) * self.cycle_mult)
- + self.warmup_steps
- )
- elif epoch >= self.first_cycle_steps:
- if self.cycle_mult == 1.0:
- self.step_in_cycle = epoch % self.first_cycle_steps
- self.cycle = epoch // self.first_cycle_steps
- else:
- n = int(math.log((epoch / self.first_cycle_steps * (self.cycle_mult - 1) + 1), self.cycle_mult))
- self.cycle = n
- self.step_in_cycle = epoch - int(
- self.first_cycle_steps * (self.cycle_mult**n - 1) / (self.cycle_mult - 1)
- )
- self.cur_cycle_steps = self.first_cycle_steps * (self.cycle_mult**n)
- else:
- self.cur_cycle_steps = self.first_cycle_steps
- self.step_in_cycle = epoch
- self.last_epoch = math.floor(epoch)
- lrs = self.get_lr()
- for pg, lr in zip(self.optimizer.param_groups, lrs):
- pg["lr"] = lr
- self._last_lr = lrs
- def get_last_lr(self) -> List[float]:
- return getattr(self, "_last_lr", self.get_lr())
- class CosineAnnealingWarmupRestartsDTEFIX(LRScheduler):
- r"""CosineAnnealingWarmupRestarts with per-param-group LR scaling.
- Каждая группа параметров начинает с собственного начального lr,
- и scheduler понижает/повышает её относительно этих значений.
- """
- def __init__(
- self,
- optimizer: Optimizer,
- first_cycle_steps: int,
- cycle_mult: float = 1.0,
- max_lr: float = 1e-4, # глобальный максимум для самой быстрой группы
- min_lr: float = 1e-6, # глобальный минимум для самой быстрой группы
- warmup_steps: int = 0,
- gamma: float = 0.9,
- last_epoch: int = -1,
- ):
- if warmup_steps >= first_cycle_steps:
- raise ValueError(
- f"warmup_steps must be smaller than first_cycle_steps ({warmup_steps} < {first_cycle_steps})"
- )
- self.first_cycle_steps = first_cycle_steps
- self.cycle_mult = cycle_mult
- self.global_base_max_lr = max_lr
- self.global_min_lr = min_lr
- self.warmup_steps = warmup_steps
- self.gamma = gamma
- self.cur_cycle_steps = first_cycle_steps
- self.step_in_cycle = last_epoch
- self.last_epoch = last_epoch
- self.cycle = 0
- # Сохраним начальные lrs для каждой группы
- self.init_base_lrs = [pg["lr"] for pg in optimizer.param_groups]
- # Вычислим коэффициенты для масштабирования каждой группы
- # Самая быстрая группа == max_lr, остальные пропорционально
- max_init_lr = max(self.init_base_lrs)
- self.scale_factors = [init_lr / max_init_lr for init_lr in self.init_base_lrs]
- super().__init__(optimizer, last_epoch)
- def get_lr(self) -> List[float]:
- if self.step_in_cycle == -1:
- return self.init_base_lrs
- # max — затухает по gamma, min — фиксированный
- current_global_max = self.global_base_max_lr * (self.gamma ** self.cycle)
- current_global_min = self.global_min_lr # теперь не уменьшается
- lrs = []
- for scale in self.scale_factors:
- group_max = current_global_max * scale
- group_min = self.global_min_lr # не масштабируем вниз
- if self.step_in_cycle < self.warmup_steps:
- lr = group_min + (group_max - group_min) * self.step_in_cycle / self.warmup_steps
- else:
- lr = group_min + (group_max - group_min) * (
- 1 + math.cos(
- math.pi * (self.step_in_cycle - self.warmup_steps)
- / (self.cur_cycle_steps - self.warmup_steps)
- )
- ) / 2.0
- # гарантируем, что ниже глобального минимума не уйдёт
- lr = max(lr, self.global_min_lr)
- lrs.append(lr)
- return lrs
- def step(self, epoch: Optional[int] = None):
- if epoch is None:
- epoch = self.last_epoch + 1
- self.step_in_cycle += 1
- if self.step_in_cycle >= self.cur_cycle_steps:
- self.cycle += 1
- self.step_in_cycle -= self.cur_cycle_steps
- self.cur_cycle_steps = (
- int((self.cur_cycle_steps - self.warmup_steps) * self.cycle_mult)
- + self.warmup_steps
- )
- elif epoch >= self.first_cycle_steps:
- if self.cycle_mult == 1.0:
- self.step_in_cycle = epoch % self.first_cycle_steps
- self.cycle = epoch // self.first_cycle_steps
- else:
- n = int(math.log((epoch / self.first_cycle_steps * (self.cycle_mult - 1) + 1), self.cycle_mult))
- self.cycle = n
- self.step_in_cycle = epoch - int(
- self.first_cycle_steps * (self.cycle_mult**n - 1) / (self.cycle_mult - 1)
- )
- self.cur_cycle_steps = self.first_cycle_steps * (self.cycle_mult**n)
- else:
- self.cur_cycle_steps = self.first_cycle_steps
- self.step_in_cycle = epoch
- self.last_epoch = math.floor(epoch)
- lrs = self.get_lr()
- for pg, lr in zip(self.optimizer.param_groups, lrs):
- pg["lr"] = lr
- self._last_lr = lrs
- def get_last_lr(self) -> List[float]:
- return getattr(self, "_last_lr", self.get_lr())
- class CosineAnnealingWarmupRestartsMINLRFIX(LRScheduler):
- r"""CosineAnnealingWarmupRestarts with per-param-group LR scaling.
- min_lr фиксирован и не уменьшается по циклам.
- """
- def __init__(
- self,
- optimizer: Optimizer,
- first_cycle_steps: int,
- cycle_mult: float = 1.0,
- max_lr: float = 1e-4, # глобальный максимум для самой быстрой группы
- min_lr: float = 1e-6, # фиксированный глобальный минимум
- warmup_steps: int = 0,
- gamma: float = 0.9,
- last_epoch: int = -1,
- ):
- if warmup_steps >= first_cycle_steps:
- raise ValueError(
- f"warmup_steps must be smaller than first_cycle_steps ({warmup_steps} < {first_cycle_steps})"
- )
- self.first_cycle_steps = first_cycle_steps
- self.cycle_mult = cycle_mult
- self.global_base_max_lr = max_lr
- self.global_min_lr = min_lr # фиксируем
- self.warmup_steps = warmup_steps
- self.gamma = gamma
- self.cur_cycle_steps = first_cycle_steps
- self.step_in_cycle = last_epoch
- self.last_epoch = last_epoch
- self.cycle = 0
- # начальные lrs
- self.init_base_lrs = [pg["lr"] for pg in optimizer.param_groups]
- # масштаб для групп
- max_init_lr = max(self.init_base_lrs)
- self.scale_factors = [init_lr / max_init_lr for init_lr in self.init_base_lrs]
- super().__init__(optimizer, last_epoch)
- def get_lr(self) -> List[float]:
- if self.step_in_cycle == -1:
- return self.init_base_lrs
- current_global_max = self.global_base_max_lr * (self.gamma ** self.cycle)
- current_global_min = self.global_min_lr # фиксирован
- lrs = []
- for scale in self.scale_factors:
- group_max = current_global_max * scale
- group_min = current_global_min * scale
- if self.step_in_cycle < self.warmup_steps:
- lr = group_min + (group_max - group_min) * self.step_in_cycle / self.warmup_steps
- else:
- lr = group_min + (group_max - group_min) * (
- 1 + math.cos(
- math.pi * (self.step_in_cycle - self.warmup_steps)
- / (self.cur_cycle_steps - self.warmup_steps)
- )
- ) / 2.0
- # гарантируем, что lr не упадёт ниже group_min
- lrs.append(max(lr, group_min))
- return lrs
- def step(self, epoch: Optional[int] = None):
- if epoch is None:
- epoch = self.last_epoch + 1
- self.step_in_cycle += 1
- if self.step_in_cycle >= self.cur_cycle_steps:
- self.cycle += 1
- self.step_in_cycle -= self.cur_cycle_steps
- self.cur_cycle_steps = (
- int((self.cur_cycle_steps - self.warmup_steps) * self.cycle_mult)
- + self.warmup_steps
- )
- elif epoch >= self.first_cycle_steps:
- if self.cycle_mult == 1.0:
- self.step_in_cycle = epoch % self.first_cycle_steps
- self.cycle = epoch // self.first_cycle_steps
- else:
- n = int(math.log((epoch / self.first_cycle_steps * (self.cycle_mult - 1) + 1), self.cycle_mult))
- self.cycle = n
- self.step_in_cycle = epoch - int(
- self.first_cycle_steps * (self.cycle_mult**n - 1) / (self.cycle_mult - 1)
- )
- self.cur_cycle_steps = self.first_cycle_steps * (self.cycle_mult**n)
- else:
- self.cur_cycle_steps = self.first_cycle_steps
- self.step_in_cycle = epoch
- self.last_epoch = math.floor(epoch)
- lrs = self.get_lr()
- for pg, lr in zip(self.optimizer.param_groups, lrs):
- pg["lr"] = lr
- self._last_lr = lrs
- def get_last_lr(self) -> List[float]:
- return getattr(self, "_last_lr", self.get_lr())
Advertisement
Add Comment
Please, Sign In to add comment