"""
dummy
"""
from typing import TypedDict, Optional, Dict, List, Union, Any
import json
import os
from pathlib import Path
import time
from datetime import datetime
from copy import deepcopy
import shutil
import psutil
import traceback
import pandas as pd
from tqdm import tqdm
import torch
from torch.utils.data import DataLoader
from .utils import (
load_component,
hash_args,
get_invalid_loc_queries,
Db,
Component,
)
from .context import get_caller, get_shared_data
class CompsDict(TypedDict):
"""
fgfvv
"""
model: Component
loss: Component
optimizer: Component
dataset: Component
metrics: Dict[str, Component]
# _core.py
[docs]
class PipeLine:
"""
khgkjv
"""
def __init__(self, pplid=None):
"""
Initialize the pipeline with default settings and empty components.
"""
self.settings = get_shared_data()
self.pplid = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.comps: CompsDict = {}
self.args = None
self.trainDataLoader = None
self.validDataLoader = None
self.settings = get_shared_data()
self.cnfg = None
self._prepared = False
self.__db = Db(db_path=f"{self.settings['data_path']}/ppls.db")
self.__best = None
if pplid:
self.load(pplid=pplid)
def _save_config(self) -> None:
"""
Save the current experiment configuration to a JSON file.
This method writes the configuration stored in `self.cnfg` to a config file,
but only if the hash of the current arguments matches the stored experiment ID.
This ensures consistency and prevents accidental overwrites due to argument changes.
Raises
------
ValueError
If the current arguments do not match the stored experiment's arguments,
indicating that the configuration has been modified since it was created.
"""
if self.verify(args=self.cnfg["args"]) == self.cnfg["pplid"]:
with open(self.get_path(of="config"), "w", encoding="utf-8") as out_file:
json.dump(self.cnfg, out_file, indent=4)
else:
raise ValueError(
f"can not save config for Experiment: {self.cnfg['pplid']}."
"\n it's args has been changed"
)
[docs]
def sync(self) -> None:
"""
Synchronize and update the experiment configuration with the latest quick settings.
Loads the quick configuration file associated with the current experiment ID,
updates the main configuration (`self.cnfg`) with its contents, and then saves
the updated configuration to disk.
Side Effects
------------
- Modifies the `self.cnfg` attribute by merging it with the quick configuration.
- Writes the updated configuration to the config file.
- Prints a success message indicating the experiment has been synced.
Raises
------
ValueError
If the current arguments do not match the original configuration, preventing saving.
"""
with open(self.get_path(of="quick"), "r", encoding="utf-8") as quick:
quick = json.load(quick)
self.cnfg.update({**quick})
self._save_config()
print(f"{self.pplid} synced successfully")
[docs]
def get_path(
self,
of: str,
pplid: Optional[str] = None,
epoch: Optional[int] = None
) -> str:
"""
Generate a standardized file path for various experiment artifacts.
Constructs and returns a file path based on the type of file (`of`), experiment ID,
epoch number, and batch index, where applicable. Automatically creates necessary
directories if they do not exist.
Parameters
----------
of : str
The type of file to retrieve the path for. Supported values:
- "config": Configuration file path.
- "weight": Model weights file path.
- "gradient": Saved gradients file path.
- "history": Training history file path.
- "quick": Quick config file path.
pplid : str, optional
Experiment ID. If not provided, uses the currently set `self.pplid`.
epoch : int, optional
Epoch number. Required for weight and gradient file paths.
For weights, if not specified, the best epoch from config is used.
Returns
-------
str
Full path to the specified artifact as a string with forward slashes.
Raises
------
ValueError
If `pplid` is not set or invalid.
If required parameters (`epoch`, `batch`) are missing for gradient paths.
If the `of` argument is not one of the supported values.
"""
pplid = pplid or self.pplid
if not pplid:
raise ValueError("Experiment ID (pplid) must be provided.")
base_path = Path(self.settings["data_path"])
if of == "config":
path = base_path / "Configs" / f"{pplid}.json"
elif of == "weight":
if epoch is None:
raise ValueError(
"Epoch must be specified or defined in config under 'best.epoch'."
)
path = base_path / "Weights" / pplid / f"{pplid}_e{epoch}.pth"
elif of == "history":
path = base_path / "Histories" / f"{pplid}.csv"
elif of == "quick":
path = base_path / "Quicks" / f"{pplid}.json"
else:
raise ValueError(
f"Invalid value for 'of': {of}. Supported values: "
"'config', 'weight', 'gradient', 'history', 'quick'."
)
path = path.as_posix()
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
[docs]
def load_model(self, epoch: Optional[Union[int, str]] = None) -> torch.nn.Module:
"""
Load model weights from disk into the model component for a specified epoch.
If `epoch` is set to 'last' or 'best', the corresponding epoch value from the
experiment configuration is used. If the epoch is 0 and no weights exist yet,
the model's current state is saved before loading. Weights are loaded with
`strict=False` to allow partial loading of the model.
Parameters
----------
epoch : int or str, optional
The epoch number or keyword ('last' or 'best') indicating which weights to load.
- 'last': Loads the most recent training checkpoint.
- 'best': Loads the checkpoint with the best validation performance.
- int: Loads the checkpoint from the specified epoch.
- None: Defaults to using the current epoch from config if available.
Returns
-------
torch.nn.Module
The model component with the loaded weights.
Raises
------
ValueError
If the experiment configuration or weight file path is invalid or missing.
Notes
-----
- Uses `torch.load(..., weights_only=True)` for loading weights.
- Uses `strict=False` in `load_state_dict` to allow for minor mismatches.
- Automatically saves the model if the requested epoch is 0 and no checkpoint exists.
"""
if epoch == "last":
epoch = self.cnfg["last"]["epoch"]
if epoch == "best":
epoch = self.cnfg["best"]["epoch"]
t = self.get_path(of="weight", pplid=self.pplid, epoch=epoch)
if epoch == 0 and not os.path.exists(t):
torch.save(self.comps["model"].state_dict(), t)
else:
self.comps["model"].load_state_dict(
torch.load(t, weights_only=True), strict=False
)
return self.comps["model"]
[docs]
def load(self, pplid: str, prepare: bool = False) -> None:
"""
Load the experiment configuration and optionally prepare the pipeline.
Retrieves the configuration file associated with the given experiment ID and sets
it as the active configuration. Optionally, prepares the pipeline using the loaded
settings (e.g., model, data loaders, etc.).
Parameters
----------
pplid : str
The experiment ID whose configuration is to be loaded.
prepare : bool, optional
Whether to immediately prepare the pipeline using the loaded configuration.
Defaults to False.
Raises
------
ValueError
If the provided experiment ID does not exist in the experiment database.
Side Effects
------------
- Sets `self.cnfg` with the loaded configuration dictionary.
- Updates `self.pplid` to the provided experiment ID.
- Calls `self.prepare()` if `prepare` is True.
"""
if not self.verify(pplid=pplid):
raise ValueError(f"The pplid: {pplid} is not exists")
with open(self.get_path(of="config", pplid=pplid), encoding="utf-8") as cnfg:
cnfg = json.load(cnfg)
self.cnfg = cnfg
self.pplid = pplid
if prepare:
self.prepare()
def _setup_dataloaders(self, args):
"""
initiating dataloaders
"""
self.comps["loss"] = load_component(**args["loss"]).to(self.device)
self.trainDataLoader = DataLoader(
**self._adjust_loader_params(mode="train", args=args)
)
self.validDataLoader = DataLoader(
**self._adjust_loader_params(mode="val", args=args)
)
[docs]
def reset(self):
"""
reset
"""
self.settings = None
self.pplid = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.comps = {
"model": None,
"loss": None,
"optimizer": None,
"dataset": None,
"metrics": {},
}
self.args = None
self.trainDataLoader = None
self.validDataLoader = None
self.settings = get_shared_data()
self.cnfg = None
self._prepared = False
self.__db = Db(db_path=f"{self.settings['data_path']}/exps.db")
[docs]
def verify(self, *, pplid: str = None, args: Dict = None) -> Union[str, bool]:
"""
Check whether a given experiment ID exists in the experiment database.
Queries the experiments table to verify whether the specified experiment ID is recorded.
Parameters
----------
pplid : str
The experiment ID to check.
Returns
-------
Union[str, bool]
Returns the `pplid` if it exists in the database, otherwise returns `False`.
Examples
--------
>>> pipeline.verify("exp_001")
'exp_001'
>>> pipeline.verify("nonexistent_exp")
False
"""
if pplid:
result = self.__db.query(
"SELECT 1 FROM ppls WHERE pplid = ? LIMIT 1", (pplid,)
)
if len(result) > 0:
return pplid
elif args:
args_hash = hash_args(args)
rows = self.__db.query(
"SELECT pplid FROM ppls WHERE args_hash =? LIMIT 1", (args_hash,)
)
if rows:
pplid = rows[0][0]
return pplid
return False
[docs]
def log(self):
"""
in future versions
"""
def _check_args(self, args):
t = get_invalid_loc_queries(args)
if t:
raise ValueError(
"Make sure all components are saved.\nReff: " + ", ".join(t)
)
t = self.verify(args=args)
if t:
raise ValueError(f"same configuration is already exists in: {t}")
[docs]
def new(
self,
pplid: Optional[str] = None,
args: Optional[Dict[str, Any]] = None,
prepare: bool = False,
) -> None:
"""
Create a new experiment configuration and initialize its tracking files.
Parameters
----------
pplid : str, optional
Unique experiment identifier. Raises ValueError if it already exists.
args : dict, optional
Configuration arguments for the experiment.
prepare : bool, optional
If True, calls `self.prepare()` after creation. Defaults to False.
Raises
------
ValueError
If the experiment ID already exists or if monitor mode is invalid.
KeyError
If 'metrics' key is missing from settings.
Behavior
--------
- Checks if the experiment ID already exists; raises an error if so.
- Checks if the same configuration already exists using `verify`.
- Initializes configuration dictionary with metadata.
- Saves the configuration.
- Creates an empty history CSV with columns for training and validation metrics and loss.
- Initializes quick checkpoint file with default best and last epoch metrics.
- Appends experiment metadata to the main experiments CSV.
- Optionally calls `self.prepare()` if `prepare=True`.
"""
if self.verify(pplid=pplid):
raise ValueError(f"{pplid} is already exists try different id")
self._check_args(args)
t = {
"pplid": pplid,
"args": args,
"used": "",
"best": {"epoch": 0},
"last": {"epoch": 0},
}
self.pplid = pplid
self.cnfg = t
metrics = self.settings.get("metrics")
if not metrics:
raise KeyError("'metrics' is missing from settings.")
keys = [
*[f"train_{m}" for m in metrics],
"train_loss",
"train_duration",
*[f"val_{m}" for m in metrics],
"val_loss",
"val_duration",
]
record = pd.DataFrame([], columns=["epoch", *keys])
record.to_csv(self.get_path(of="history", pplid=self.pplid), index=False)
strategy = self.settings["strategy"]
l = {"epoch": 0, **{i: 0 for i in keys}}
if strategy["mode"] == "min":
l[strategy["monitor"]] = float(10000)
elif strategy["mode"] == "max":
l[strategy["monitor"]] = -float(10000)
else:
raise ValueError("monitor should be min or max")
quick = {"last": l, "best": l}
with open(
self.get_path(of="quick", pplid=pplid), "w", encoding="utf-8"
) as out_file:
json.dump(quick, out_file, indent=4)
self.__db.execute(
"INSERT INTO ppls (pplid, args_hash) VALUES (?, ?)",
(pplid, hash_args(args)),
)
self._save_config()
# Initialize logs.csv and exps.csv
if prepare:
self.prepare()
def _adjust_loader_params(self, mode: str, args: Optional[dict] = None) -> dict:
"""
Adjusts DataLoader parameters based on system resources and dataset size.
Parameters
----------
mode : str
Either 'train' or 'val' to specify the data loader type.
args : dict, optional
Configuration dictionary containing dataset and batch size parameters.
If None, uses `self.args`.
Returns
-------
dict
Parameters for DataLoader including dataset, batch_size,
shuffle, num_workers, collate_fn, and pin_memory.
Raises
------
ValueError
If mode is neither 'train' nor 'val'.
"""
args = self.args if args is None else args
loc = args["dataset"]["loc"]
dsargs = args["dataset"]["args"]
if mode in {"val", "train"}:
dsargs["data_src"] = args[f"{mode}_data_src"]
ds = load_component(loc=loc, args=dsargs)
collate_fn = getattr(ds, "collate_fn", None) or None
batch_size = args[mode + "_batch_size"]
shuffle = not mode == "val "
else:
raise ValueError(mode + "_data_src is not found")
num_cpu_cores = os.cpu_count()
if len(ds) < batch_size:
batch_size = len(ds)
print(
"Warning: Dataset size is smaller than the batch size."
f"Adjusting batch size to {batch_size}."
)
args.update({mode + "_batch_size": batch_size})
if self.args is not None:
self._save_config()
pin_memory = batch_size >= 32 # Larger batches benefit more from pin_memory
if batch_size < 16:
num_workers = max(1, num_cpu_cores // 2) # Fewer workers for small batches
elif batch_size < 64:
num_workers = num_cpu_cores
else:
num_workers = min(num_cpu_cores * 2, 16)
system_memory_available = psutil.virtual_memory().available > 5 * 1024**3
if not system_memory_available:
num_workers = min(num_workers, 4)
pin_memory = False # Disable pin_memory to save memory
print(
f"memory available={psutil.virtual_memory().available}<={5 * 1024**3}"
" --> pin_memory={pin_memory}"
)
num_workers = 0 if self.args is None else num_workers
# Return the optimal settings for DataLoader
return {
"dataset": ds,
"batch_size": batch_size,
"shuffle": shuffle,
"num_workers": num_workers,
"collate_fn": collate_fn,
"pin_memory": pin_memory,
}
[docs]
def prepare(self) -> None:
"""
Prepare the experiment by loading model, optimizer, metrics, loss, and data loaders.
Loads components according to current configuration, initializes data loaders,
and sets the best metric value based on the stored history and strategy.
Raises
------
ValueError
If strategy monitor mode is not 'min' or 'max'.
Behavior
--------
- Loads model and moves it to device.
- Loads optimizer with model parameters.
- Loads metrics and loss functions to device.
- Creates training and validation data loaders.
- Loads last saved model weights.
- Initializes the best metric value from saved checkpoints or sets default.
- Sets internal flag `_prepared` to True on success.
"""
if not self.cnfg:
print("not initiated")
return
args = deepcopy(self.cnfg["args"])
self.comps["model"] = load_component(**args["model"]).to(self.device)
args["optimizer"]["args"]["model_parameters"] = self.comps["model"].parameters()
self.comps["optimizer"] = load_component(**args["optimizer"])
self.comps["metrics"] = {
name: load_component(**comp).to(self.device)
for name, comp in args["metrics"].items()
}
self.comps["loss"] = load_component(**args["loss"]).to(self.device)
self._setup_dataloaders(args=args)
self.comps["model"] = self.load_model(epoch="last")
with open(self.get_path(of="quick"), encoding="utf-8") as fl:
q = json.load(fl)
t = self.settings["strategy"]["monitor"]
self.__best = q["best"][t]
self._prepared = True
print("Data loaders are successfully created")
[docs]
def update(self, data: dict) -> bool:
"""
Update the pipeline configuration and save state after an epoch.
Parameters
----------
data : dict
Dictionary containing keys such as 'epoch', 'train_accuracy', 'train_loss',
'val_accuracy', 'val_loss', and potentially other metrics and durations.
Returns
-------
bool
Returns True if the current epoch's validation metric improves over the best recorded,
triggering a best model save; otherwise, False.
Notes
-----
- Saves model weights after every epoch.
- Appends training and validation metrics to the history CSV.
- Updates the quick checkpoint file with last and best metrics.
"""
torch.save(
self.comps["model"].state_dict(),
self.get_path(of="weight", epoch=data["epoch"]),
)
# print(f"Current Model Weights saved temporarily")
best = False
strategy = self.settings["strategy"]
if strategy["mode"] == "min" and self.__best >= data[strategy["monitor"]]:
best = True
elif strategy["mode"] == "max" and self.__best <= data[strategy["monitor"]]:
best = True
metrics = list(self.settings["metrics"])
metrics = (
["epoch"]
+ [f"train_{i}" for i in metrics]
+ ["train_loss", "train_duration"]
+ [f"val_{i}" for i in metrics]
+ ["val_loss", "val_duration"]
)
# print(metrics, {i:data[i] for i in metrics})
record = pd.DataFrame([[data[i] for i in metrics]], columns=metrics)
# print(record)
record.to_csv(
self.get_path(of="history", pplid=self.pplid),
mode="a",
header=False,
index=False,
)
with open(self.get_path(of="quick"), encoding="utf-8") as q:
quick = json.load(q)
quick["last"] = data
if best:
print(
f"Best Model Weights Updated: Epoch {data['epoch']} - Val Loss: {data['val_loss']}"
)
quick["best"] = data
with open(self.get_path(of="quick"), "w", encoding="utf-8") as out_file:
json.dump(quick, out_file, indent=4)
return best
[docs]
def train(
self,
num_epochs: int = 5,
self_patience: Optional[int] = None,
verbose: Union[List[str], str] = None
) -> None:
"""
Train the model for a specified number of epochs with optional early stopping.
Parameters
----------
num_epochs : int, optional
Number of epochs to train. Default is 5.
self_patience : int, optional
Number of epochs to wait for improvement before early stopping. If None, equals num_epochs.
verbose : list of str or str, optional
Metrics to display live during training. Must be from the set of defined metrics.
Notes
-----
- Uses early stopping based on the configured strategy and patience.
- Automatically resumes from last epoch.
- Saves best model weights and updates training history.
- Avoids re-entrance if training is already running.
"""
if not self._prepared:
print(
"Preparation Error. Execute prepare() or set prepare=True before training."
)
return
with open(self.get_path(of="quick"), encoding="utf-8") as q:
quick = json.load(q)
start_epoch = quick["last"]["epoch"]
end_epoch = start_epoch + num_epochs
patience = self_patience or num_epochs
epochs_without_improvement = (
self.cnfg["last"]["epoch"] - self.cnfg["best"]["epoch"]
)
verbose = list(self.settings['metrics']) if verbose ==None else verbose
if isinstance(verbose, str):
verbose = [verbose]
if not isinstance(verbose, list) or not all(
item in self.settings["metrics"] for item in verbose
):
print(f"Verbose should be in metrics: {list(self.settings['metrics'])}")
return
rows = self.__db.query(
"SELECT logid FROM runnings WHERE pplid = ?", (self.pplid,)
)
if rows:
print(f"pplid: {self.pplid} is running in logid: {rows[0][0]}")
return
try:
self.__db.execute(
"INSERT INTO runnings (pplid, logid) VALUES (?, ?)",
(self.pplid, self.settings["logid"]),
)
for epoch in range(start_epoch, end_epoch):
if epochs_without_improvement >= patience:
print(f"Early stopping at epoch {epoch} due to no improvement.")
return
if not self.should_running:
return
train_metrics = self._forward(
mode="train", epoch=epoch, verbose=verbose
)
val_metrics = self._forward(mode="valid", epoch=epoch)
metrics_record = {
"epoch": epoch + 1,
**{f"train_{k}": v for k, v in train_metrics.items()},
**{f"val_{k}": v for k, v in val_metrics.items()},
}
if self.update(metrics_record):
epochs_without_improvement = 0
else:
epochs_without_improvement += 1
print("Finished Training")
except (RuntimeError, ValueError, KeyError) as e:
print("Error in training loop:", e)
traceback.print_exc()
except BaseException as e:
print("Unexpected error in training loop:", type(e).__name__, e)
traceback.print_exc()
finally:
self.__db.execute("DELETE FROM runnings WHERE pplid = ?", (self.pplid,))
def _forward(
self,
mode: str,
epoch: int,
verbose: Optional[List[str]] = None
) -> Dict[str, float]:
"""
Perform a single epoch pass through the dataset.
Parameters
----------
mode : str
Either "train" or "valid" to determine which loader and mode to use.
epoch : int
Current epoch number.
verbose : list of str, optional
Metrics to display live in the progress bar.
Returns
-------
Dict[str, float]
Dictionary of averaged metrics and loss for the epoch, including duration.
Raises
------
ValueError
If mode is not 'train' or 'valid'.
"""
if mode == "train":
self.comps["model"].train()
loader = self.trainDataLoader
elif mode == "valid":
self.comps["model"].eval()
loader = self.validDataLoader
else:
raise ValueError("mode only can be 'train' or 'valid'")
loader_tqdm = tqdm(loader, desc=f"Epoch {epoch+1}", leave=True)
running_metrics = {m: 0.0 for m in self.settings["metrics"]}
running_metrics["loss"] = 0.0
start_time = time.perf_counter()
for batch_idx, datas in enumerate(loader_tqdm):
inpts = datas[0].to(self.device)
lbls = datas[1].to(self.device)
# lbls = torch.cat([
# t.unsqueeze(1) if t.ndim == 1 else t
# for t in lbls], dim=1)
if mode == "train":
self.comps["optimizer"].zero_grad()
logits = self.comps["model"](inpts)
loss = self.comps["loss"](logits, lbls)
loss.backward()
self.comps["optimizer"].step(loss=loss.item(), epoch=epoch)
else:
with torch.no_grad():
logits = self.comps["model"](inpts)
loss = self.comps["loss"](logits, lbls)
for m in self.settings["metrics"]:
running_metrics[m] += self.comps["metrics"][m](logits, lbls)
running_metrics["loss"] += loss.item()
if verbose:
metrics_display = {
m: running_metrics[m] / (batch_idx + 1)
for m in running_metrics
if m in verbose
}
loader_tqdm.set_postfix(**metrics_display)
averaged = {k: v / len(loader) for k, v in running_metrics.items()}
averaged["duration"] = time.perf_counter() - start_time
return averaged
[docs]
def is_running(self):
"""
Check if the current process (identified by `pplid`) is currently running.
Queries the `runnings` table for an entry with the matching `pplid`.
Returns:
int or bool: The `logid` of the running process if found, otherwise `False`.
"""
rows = self.__db.query(
"SELECT logid FROM runnings WHERE pplid = ?", (self.pplid,)
)
if rows:
return rows[0][0]
return False
@property
def should_running(self):
"""
Determine whether the process should continue running.
This checks the `parity` value for the current `pplid` in the `runnings` table.
If the value is `'stop'`, the process should no longer continue.
Returns:
bool: `True` if the process should keep running, `False` if it should stop.
"""
rows = self.__db.query(
"SELECT parity FROM runnings WHERE pplid = ?", (self.pplid,)
)
if rows and rows[0][0]=='stop':
return False
return True
[docs]
def stop_running(self):
"""
Mark the current running process to be stopped.
If the process is currently running (i.e., has an associated `logid` in the `runnings` table),
this updates the `parity` field to `'stop'`, signaling it to stop after the current iteration.
Otherwise, it prints a message indicating that the process is not running.
Returns:
None
"""
logid = self.is_running()
if logid:
self.__db.execute(
"UPDATE runnings SET parity = ? WHERE logid = ?", ('stop', logid)
)
print(f"ppid:{self.pplid} will be stopped at logid:{logid} after current iteration")
else:
print("it is not running anywhere")