Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #LSTM-RAY.py
- #The import of pytorch_lightning must be changed to lightning.pytorch in
- #darts/models/forecasting in pl_forecasting_module.py and torch_forecasting_module.py
- import pickle
- import ray
- from ray import tune, train
- from ray.tune import CLIReporter
- from ray.tune.schedulers import ASHAScheduler
- from darts import TimeSeries
- from darts.models import RNNModel
- from darts.dataprocessing.transformers import Scaler
- from darts.metrics import rmse, smape
- import lightning.pytorch as pl
- from lightning.pytorch.callbacks import EarlyStopping
- from ray.tune.integration.pytorch_lightning import TuneReportCheckpointCallback
- from ray.train.lightning import RayTrainReportCallback, RayLightningEnvironment, RayDDPStrategy, prepare_trainer
- from torch.nn import MSELoss
- from lightning.pytorch import Trainer
- import ray.train.lightning
- from ray.train.context import TrainContext
- from lightning.pytorch.loggers import CSVLogger, TensorBoardLogger
- from lightning.pytorch.plugins.environments import SLURMEnvironment
- import torch
- from torch import nn, Tensor
- import torch.optim.lr_scheduler as lr
- from torchmetrics import MeanAbsoluteError, SymmetricMeanAbsolutePercentageError, MetricCollection
- from ray.train import RunConfig, ScalingConfig, CheckpointConfig, FailureConfig
- from ray.train.torch import TorchTrainer, TorchConfig
- import os
- #os.environ["CUDA_VISIBLE_DEVICES"]=str(torch.cuda.device_count())
- def loadScaledDataset():
- # Returns scaler, trianListScaled, valListScaled
- return pickle.load(open('large_reduced_scaler_train_val_scaled_dfs.pkl', 'rb'))
- def train_model(config):
- train = ray.get(config["train_ref"])
- val = ray.get(config["val_ref"])
- MODELNAME = "lstm_optim_1"
- NUM_NODES = 1
- torch.set_float32_matmul_precision('high')
- accel = "cuda" if torch.cuda.is_available() else "cpu"
- print("torch device: " + str(torch.cuda.current_device()))
- modelParams = {
- "input_chunk_length":config["input_chunk_length"],
- "training_length":config["input_chunk_length"],
- "model":"LSTM",
- "hidden_dim":config["hidden_dim"],
- "n_rnn_layers":config["n_rnn_layers"],
- "dropout":config["dropout"],
- "batch_size":config["batch_size"],
- "n_epochs":config["n_epochs"],
- "loss_fn":nn.HuberLoss(),
- #"loss_fn":rmse, #see if this works
- "optimizer_cls":torch.optim.AdamW,
- #"random_state":42,
- "log_tensorboard": True,
- "save_checkpoints": True,
- "force_reset":True
- }
- pl_trainer_kwargs={
- "devices": "auto",
- "num_nodes": NUM_NODES,
- "gradient_clip_val": 0.1,
- "max_epochs":config["n_epochs"],
- "default_root_dir": "ckpts/",
- }
- initial_trainer_kwargs = pl_trainer_kwargs
- initial_trainer = Trainer(**initial_trainer_kwargs,
- strategy=ray.train.lightning.RayDDPStrategy(),
- plugins=[ray.train.lightning.RayLightningEnvironment()],
- callbacks=[ray.train.lightning.RayTrainReportCallback()],
- enable_checkpointing=False)
- initial_trainer = prepare_trainer(initial_trainer)
- initial_model = RNNModel(
- **modelParams,
- )
- # Find the optimal learning rate
- lr_finder = initial_model.lr_find(series=train, val_series=val, trainer=initial_trainer)
- base_lr = lr_finder.suggestion()
- max_lr = 4 * base_lr
- model = RNNModel(
- **modelParams,
- optimizer_kwargs = {
- 'lr': base_lr,
- 'weight_decay': config["weight_decay"]
- },
- lr_scheduler_cls = lr.CyclicLR,
- lr_scheduler_kwargs={
- "base_lr":base_lr,
- "max_lr":base_lr * config["max_lr_scale"],
- "mode": 'exp_range',
- 'gamma': 0.9,
- 'cycle_momentum':False
- }
- )
- print("training")
- model.fit(series=train, val_series=val, trainer=initial_trainer)
- # Code will only get here if it's not terminated by ray's time limit
- #predict over the 2nd half of the validation dataset
- val_true = val.drop_after(0.5)
- val_pred = model.predict(n=val_true.n_timesteps -1, series = val_true, trainer=initial_trainer)
- print("reporting")
- errors = {"rmse":rmse(val, val_pred), "smape":smape(val,val_pred)}
- print(config)
- print(errors)
- def get_ray_train_configs(config: dict=None, num_samples: int=16):
- if config is None: raise Exception("No config passed!")
- num_devices = torch.cuda.device_count()
- num_devices = 1 if num_devices==0 else num_devices
- print("num_devices: " + str(num_devices))
- scaling_config = ScalingConfig(
- num_workers= num_devices, use_gpu=True,
- accelerator_type="A100",
- #resources_per_worker={"CPU": 2, "GPU": 1}
- )
- col_list = list(config.keys());col_list.remove("train_ref");col_list.remove("val_ref")
- reporter = CLIReporter(
- parameter_columns=col_list,
- metric_columns=["error", "training_iteration"],
- metric="error", mode=min
- )
- run_config = RunConfig(
- checkpoint_config=CheckpointConfig(
- num_to_keep=2,
- checkpoint_score_attribute="val_loss",
- checkpoint_score_order="min",
- ),
- failure_config = FailureConfig(max_failures=1), #Should retry failed runs once
- storage_path = "/home/farooqzahid/transfer1/ray_results"
- )
- scheduler = ASHAScheduler(
- #time_attr = "time_total_s" or "total_time_s" #time option
- time_attr = "time_total_s",
- metric="val_loss", #Scaled data, so average rmse is similar to average mape in application
- mode="min",
- max_t=80000, #default t is number of iterations #SET FOR ARC
- grace_period=20,
- reduction_factor=2
- )
- ray_trainer = TorchTrainer(
- train_loop_per_worker=train_model,
- torch_config=TorchConfig(backend="gloo"),
- scaling_config=scaling_config,
- run_config=run_config
- )
- tune_config = tune.TuneConfig(
- #metric = "HuberLoss", mode = "min", #might need to be HuberLoss OR error
- scheduler=scheduler,
- num_samples=num_samples,
- max_concurrent_trials=8
- #time_budget_s = 000 #Can set this on ARC
- )
- tuner = tune.Tuner(
- ray_trainer,
- param_space={"train_loop_config":config},
- tune_config=tune_config
- )
- return tuner
- def main(filepath=None, time_col=None, value_col=None):
- scaler, train, val = loadScaledDataset()
- del scaler
- #ray.init(ignore_reinit_error=True)
- train_ref = ray.put(train)
- val_ref = ray.put(val)
- config = {
- "input_chunk_length": tune.choice([24, 48, 92]),
- #"training_length":tune.sample_from(lambda spec: spec.config.input_chunk_length),
- "hidden_dim": tune.choice([10, 25, 50]),
- "n_rnn_layers": tune.choice([16, 128, 256]),
- "dropout": tune.uniform(0, 0.4),
- "batch_size": tune.choice([32, 256, 1024]),
- "n_epochs": tune.randint(50, 2000),
- "max_lr_scale": tune.uniform(1, 5),
- "weight_decay": tune.choice([1e-3, 1e-5, 1e-7]),
- "train_ref":train_ref,
- "val_ref":val_ref
- }
- tuner = get_ray_train_configs(config = config, num_samples= 16)
- results = tuner.fit()
- return results
- if __name__ == main():
- main()
Advertisement
Add Comment
Please, Sign In to add comment