Skip to content

Model and data

Modules under eso.model cover the CNN architecture, the dataset pipeline, and the training and evaluation wrappers used by both the baseline and the per-chromosome models. See CNN training and fitness for the algorithmic context.

Symbol File Role
BaseCNN eso/model/cnn.py The shared CNN architecture (1 conv layer, max-pool, two FC layers).
calc_back_conv eso/model/cnn.py Backward computation of the minimum legal input size through a convolution.
calc_back_pool eso/model/cnn.py The pooling counterpart of calc_back_conv.
Data eso/model/data.py Audio to spectrogram pipeline, splits, and dataset caching.
Model eso/model/model.py Train, evaluate, save, and load wrapper around the CNN.

eso.model.cnn

The CNN architecture. The default is a simple stack: one Conv2d → ReLU → MaxPool block followed by a flatten and two fully connected layers ending in a 2-unit softmax. Sizing is parameterised through ArchitectureConfig.

The module also exposes the helpers calc_back_conv and calc_back_pool, which walk the convolution and pool stack backwards to compute the smallest input shape the network can accept. ESO uses these helpers at startup to validate every gene's height against the architecture, so impossible configurations fail fast rather than during training.

BaseCNN

BaseCNN(
    input_shape,
    conv_layers,
    conv_filters,
    dropout_rate,
    conv_kernel,
    max_pooling_size,
    fc_units,
    fc_layers,
    conv_padding=None,
    stride_maxpool=None,
)

Bases: Module

Base CNN model for the classification of the images

Parameters:

Name Type Description Default
input_shape tuple

The input shape of the images in the form of (n_channels, height, width)

required
conv_layers int

The number of convolutional layers

required
conv_filters int

The number of filters in the convolutional layers

required
dropout_rate float

The dropout rate of the dropout layers

required
conv_kernel int

The kernel size of the convolutional layers

required
max_pooling_size int

The kernel size of the max pooling layers

required
fc_units int

The number of units in the fully connected layers

required
fc_layers int

The number of fully connected layers

required
Source code in eso/model/cnn.py
def __init__(
    self,
    input_shape,
    conv_layers,
    conv_filters,
    dropout_rate,
    conv_kernel,
    max_pooling_size,
    fc_units,
    fc_layers,
    conv_padding = None, 
    stride_maxpool = None, 
):
    """Base CNN model for the classification of the images

    Parameters
    ----------
    input_shape : tuple
        The input shape of the images in the form of (n_channels, height, width)
    conv_layers : int
        The number of convolutional layers
    conv_filters : int
        The number of filters in the convolutional layers
    dropout_rate : float
        The dropout rate of the dropout layers
    conv_kernel : int
        The kernel size of the convolutional layers
    max_pooling_size : int
        The kernel size of the max pooling layers
    fc_units : int
        The number of units in the fully connected layers
    fc_layers : int
        The number of fully connected layers
    """
    super(BaseCNN, self).__init__()
    self.input_shape = input_shape
    n_channels = input_shape[0]
    self.n_conv_layers = conv_layers
    self.conv_filters = conv_filters
    self.dropout_rate = dropout_rate
    self.conv_kernel = conv_kernel
    self.max_pooling_size = max_pooling_size
    self.fc_units = fc_units
    self.n_fc_layers = fc_layers
    self.conv_padding=conv_padding
    if stride_maxpool is None : 
        self.stride_maxpool= max_pooling_size
    else : self.stride_maxpool= stride_maxpool

    if conv_padding is None : 
        self.conv_padding=0
    else : self.conv_padding = conv_padding

    # Convolutional layers
    self.conv_layers = nn.Sequential()
    self.conv_layers.add_module(
        "conv0",
        nn.Conv2d(n_channels, self.conv_filters, kernel_size=self.conv_kernel, padding=self.conv_padding),
    )
    self.conv_layers.add_module("relu0", nn.ReLU())
    self.conv_layers.add_module("dropout0", nn.Dropout(self.dropout_rate))
    self.conv_layers.add_module("maxpool0", nn.MaxPool2d(self.max_pooling_size, stride= self.stride_maxpool))

    for i in range(1, self.n_conv_layers):
        self.conv_layers.add_module(
            f"conv{i}",
            nn.Conv2d(
                self.conv_filters, self.conv_filters, kernel_size=self.conv_kernel
            ),
        )
        self.conv_layers.add_module(f"relu{i}", nn.ReLU())
        self.conv_layers.add_module(f"dropout{i}", nn.Dropout(self.dropout_rate))
        self.conv_layers.add_module(
            f"maxpool{i}", nn.MaxPool2d(self.max_pooling_size)
        )

    # Fully connected layers
    self.fc_layers = nn.Sequential()
    # input_units = self.conv_filters * (128 // (self.max_pooling_size ** self.n_conv_layers)) * (76 // (self.max_pooling_size ** self.n_conv_layers))
    input_units = np.prod(self._calc_cnn_output_dim())
    for i in range(self.n_fc_layers):
        self.fc_layers.add_module(f"fc{i}", nn.Linear(input_units, self.fc_units))
        self.fc_layers.add_module(f"relu{i}", nn.ReLU())
        self.fc_layers.add_module(f"dropout{i}", nn.Dropout(self.dropout_rate))
        input_units = self.fc_units

    # Output layer
    self.output_layer = nn.Linear(self.fc_units, 2)
    self.softmax = nn.Softmax(dim=1)

input_shape instance-attribute

input_shape = input_shape

n_conv_layers instance-attribute

n_conv_layers = conv_layers

conv_filters instance-attribute

conv_filters = conv_filters

dropout_rate instance-attribute

dropout_rate = dropout_rate

conv_kernel instance-attribute

conv_kernel = conv_kernel

max_pooling_size instance-attribute

max_pooling_size = max_pooling_size

fc_units instance-attribute

fc_units = fc_units

n_fc_layers instance-attribute

n_fc_layers = fc_layers

conv_padding instance-attribute

conv_padding = conv_padding

stride_maxpool instance-attribute

stride_maxpool = max_pooling_size

conv_layers instance-attribute

conv_layers = Sequential()

fc_layers instance-attribute

fc_layers = Sequential()

output_layer instance-attribute

output_layer = Linear(fc_units, 2)

softmax instance-attribute

softmax = Softmax(dim=1)

calculate_min_input_size

calculate_min_input_size()
Source code in eso/model/cnn.py
def calculate_min_input_size(self):
    # Start with a size of 1 (minimum meaningful size)
    min_height, min_width = 1, 1
    # Convert the generator to a list and reverse iterate through the layers of the CNN
    for layer in reversed(list(self.modules())):
        if isinstance(layer, nn.Conv2d):
            min_height = calc_back_conv(min_height, layer, 0)  # for height
            min_width = calc_back_conv(min_width, layer, 1)  # for width
        elif isinstance(layer, nn.MaxPool2d):
            min_height = calc_back_pool(min_height, layer, 0)  # for height
            min_width = calc_back_pool(min_width, layer, 1)  # for width
    return int(min_height), int(min_width)

forward

forward(x)

Forward pass of the network

Parameters:

Name Type Description Default
x Tensor

The input tensor. Shape should be (batch_size, n_channels, height, width)

required

Returns:

Type Description
Tensor

The output tensor. Shape should be (batch_size, n_classes). Outputs a probability for each class.

Source code in eso/model/cnn.py
def forward(self, x):
    """Forward pass of the network

    Parameters
    ----------
    x : torch.Tensor
        The input tensor. Shape should be (batch_size, n_channels, height, width)

    Returns
    -------
    torch.Tensor
        The output tensor. Shape should be (batch_size, n_classes). Outputs a probability for each class.
    """
    x = self.conv_layers(x)
    x = x.view(x.size(0), -1)  # Flatten the tensor
    # print("x size: ", x.size())
    x = self.fc_layers(x)
    x = self.output_layer(x)
    x = self.softmax(x)
    return x

calc_back_conv

calc_back_conv(input_size, conv_layer, dim)

Calculate the input size of a Conv2d layer

Reverse calculation of the output size of a Conv2d layer. This is used to calculate the minimum input size of a CNN.

Parameters:

Name Type Description Default
input_size int

The output size of the Conv2d layer

required
conv_layer Conv2d

The Conv2d layer to calculate the input size of

required
dim int

The dimension to calculate the input size of. 0 for height, 1 for width

required

Returns:

Type Description
int

The input size of the Conv2d layer

Source code in eso/model/cnn.py
def calc_back_conv(input_size, conv_layer, dim):
    """Calculate the input size of a Conv2d layer

    Reverse calculation of the output size of a Conv2d layer. This is used to calculate the minimum input size of a CNN.

    Parameters
    ----------
    input_size : int
        The output size of the Conv2d layer
    conv_layer : torch.nn.Conv2d
        The Conv2d layer to calculate the input size of
    dim : int
        The dimension to calculate the input size of. 0 for height, 1 for width

    Returns
    -------
    int
        The input size of the Conv2d layer
    """
    kernel_size = conv_layer.kernel_size[dim]
    stride = conv_layer.stride[dim]
    padding = conv_layer.padding[dim]
    dilation = conv_layer.dilation[dim]

    #return ((input_size - 1) * stride) - 2 * padding + dilation * (kernel_size - 1) + 1
    #correction ? 
    return ((input_size - 1) * stride) - 2 * padding + kernel_size

calc_back_pool

calc_back_pool(input_size, pool_layer, dim)

Calculate the input size of a MaxPool2d layer

Reverse calculation of the output size of a MaxPool2d layer. This is used to calculate the minimum input size of a CNN.

Parameters:

Name Type Description Default
input_size int

The output size of the MaxPool2d layer

required
pool_layer MaxPool2d

The MaxPool2d layer to calculate the input size of

required
dim int

The dimension to calculate the input size of. 0 for height, 1 for width

required

Returns:

Type Description
int

The input size of the MaxPool2d layer

Source code in eso/model/cnn.py
def calc_back_pool(input_size, pool_layer, dim):
    """Calculate the input size of a MaxPool2d layer

    Reverse calculation of the output size of a MaxPool2d layer. This is used to calculate the minimum input size of a CNN.

    Parameters
    ----------
    input_size : int
        The output size of the MaxPool2d layer
    pool_layer : torch.nn.MaxPool2d
        The MaxPool2d layer to calculate the input size of
    dim : int
        The dimension to calculate the input size of. 0 for height, 1 for width

    Returns
    -------
    int
        The input size of the MaxPool2d layer
    """
    kernel_size = (
        pool_layer.kernel_size
        if isinstance(pool_layer.kernel_size, int)
        else pool_layer.kernel_size[dim]
    )
    stride=(pool_layer.stride
            if isinstance(pool_layer.kernel_size, int)
            else pool_layer.kernel_size[dim]
    )
    #return input_size * kernel_size
    #correction ?
    return ((input_size - 1) * stride) + kernel_size

get_conv_output_dim

get_conv_output_dim(layer: Module, input_dim: tuple) -> tuple

Calculate output dimension of a CNN layer

Parameters:

Name Type Description Default
layer Module

The CNN layer to calculate the output dimension of

required
input_dim tuple

The input dimension of the CNN layer in the form of (n_channels, height, width)

required

Returns:

Type Description
tuple

The output dimension of the CNN layer in the form of (n_channels, height, width)

Source code in eso/model/cnn.py
def get_conv_output_dim(layer: nn.Module, input_dim: tuple) -> tuple:
    """Calculate output dimension of a CNN layer

    Parameters
    ----------
    layer : torch.nn.Module
        The CNN layer to calculate the output dimension of
    input_dim : tuple
        The input dimension of the CNN layer in the form of (n_channels, height, width)

    Returns
    -------
    tuple
        The output dimension of the CNN layer in the form of (n_channels, height, width)
    """
    kernel_size = layer.kernel_size
    stride = layer.stride
    padding = layer.padding
    dilation = layer.dilation

    input_channels, input_height, input_width = input_dim

    output_channels = layer.out_channels
    output_height = (
        input_height + 2 * padding[0] - dilation[0] * (kernel_size[0] - 1) - 1
    ) / stride[0] + 1
    output_width = (
        input_width + 2 * padding[1] - dilation[1] * (kernel_size[1] - 1) - 1
    ) / stride[1] + 1

    return (output_channels, int(output_height), int(output_width))

eso.model.data

The data pipeline. Data.create_datasets reads audio, parses annotations, segments to fixed-length windows, generates mel-spectrograms, applies optional class balancing via augmentation, and writes the train/validation/test splits to disk (or holds them in memory if keep_in_memory is set). The same splits are reused across the baseline and every chromosome's CNN training, so all individuals see identical data.

Data

Data(
    apply_preprocessing: bool,
    force_recreate_dataset: bool,
    species_folder: str,
    keep_in_memory: bool,
    preprocessing_args: dict,
    train_size: float,
    test_size: float,
    positive_class: str,
    negative_class: str,
    reshuffle: bool = False,
    logger=None,
    log_path=None,
    log_level=10,
)

Class for handling the data. This class will be used by the eso class.

It initializes the preprocessing class and handles the creation of the dataset based on the preprocessing settings.

Initialize the Data class

Parameters:

Name Type Description Default
config dict

The config dictionary containing the settings for preprocessing

required
logger Logger

The logger object to log messages

None

Returns:

Type Description
None
Source code in eso/model/data.py
def __init__(
    self,
    apply_preprocessing: bool,
    force_recreate_dataset: bool,
    species_folder: str,
    keep_in_memory: bool,
    preprocessing_args: dict,
    train_size: float,
    test_size: float,
    positive_class: str,
    negative_class: str,
    reshuffle: bool = False,
    logger=None,
    log_path=None,
    log_level=10,
) -> None:
    """Initialize the Data class

    Parameters
    ----------
    config : dict
        The config dictionary containing the settings for preprocessing
    logger : logging.Logger
        The logger object to log messages

    Returns
    -------
    None

    """

    # This should only contain the confg for data settings
    self.logger = setup_logger(
        logger=logger, log_path=log_path, log_level=log_level
    )
    self._positive_class = positive_class
    self._negative_class = negative_class
    self._preprocessing_flag = apply_preprocessing
    self._force_recreate_dataset = force_recreate_dataset
    self.species_folder = species_folder
    self._keep_in_memory = keep_in_memory
    self._train_size = train_size
    self._reshuffle = reshuffle
    self._test_size = test_size
    self.preprocessing_args = preprocessing_args

logger instance-attribute

logger = setup_logger(logger=logger, log_path=log_path, log_level=log_level)

species_folder instance-attribute

species_folder = species_folder

preprocessing_args instance-attribute

preprocessing_args = preprocessing_args

create_datasets

create_datasets()
Source code in eso/model/data.py
def create_datasets(self):
    types = ["train", "validation", "test"]
    # self._shuffle_files_names()
    if self._preprocessing_flag:
        preproces_name = "preprocessed"
    else:
        preproces_name = "unpreprocessed"

    self.save_path = Path(self.species_folder, "SavedData", preproces_name)
    preprocessing = Preprocessing(
        **self.preprocessing_args,
        apply_preprocessing=self._preprocessing_flag,
        species_folder=self.species_folder,
        positive_class=self._positive_class,
        negative_class=self._negative_class,
    )

    train_path = Path(self.species_folder, "DataFiles", "train.txt")
    validation_path = Path(self.species_folder, "DataFiles", "validation.txt")
    test_path = Path(self.species_folder, "DataFiles", "test.txt")
    if (
        os.path.exists(train_path)
        and os.path.exists(validation_path)
        and os.path.exists(test_path)
    ):
        # This means the files have already been shuffled,
        # check if they should be reshuffled egein
        if self._reshuffle:
            self.logger.info(
                "Found Existing Files but reshuffle flag ist set. Reshuffling.."
            )
            preprocessing.shuffle_files_names(
                train_size=self._train_size, test_size=self._test_size
            )
        else:
            self.logger.info(
                "Found already existing shuffled file names! Loading from memory.."
            )
    else:
        # Files dont exist, create the split
        self.logger.info("Reshuffling file names for the first time...")
        preprocessing.shuffle_files_names(
            train_size=self._train_size, test_size=self._test_size
        )
    for type in types:
        save_type_path = str(Path(self.save_path) / type)
        # Check if the dataset already exists
        if (
            os.path.exists(Path(save_type_path, "X.pkl"))
            and not self._force_recreate_dataset
        ):
            self.logger.info("The dataset already exists. Skipping...")
            if not hasattr(self, "image_shape"):
                # Load the dataset to get the image shape
                self.logger.debug("Loading dataset to set image shape...")
                X, Y = self._load_dataset(type)
                self.image_shape = X.shape[1:]
            continue

        # Create the folder
        os.makedirs(save_type_path, exist_ok=True)
        path = Path(self.species_folder, "DataFiles", f"{type}.txt")
        self.logger.debug("File path: " + str(path))
        if type == "train":
            self.logger.info("Creating the training dataset")
            # Create the dataset WITH augmentation
            X, Y = preprocessing.create_dataset(
                file_names=path,
                augmentation=True,
                annotation_folder="Annotations",
                sufix_file=".svl",
            )
        else:
            self.logger.info("Creating the validation dataset")
            X, Y = preprocessing.create_dataset(
                file_names=path,
                augmentation=False,
                annotation_folder="Annotations",
                sufix_file=".svl",
            )

        if not hasattr(self, "image_shape"):
            self.image_shape = X.shape[1:]

        # Check if the dataset is empty
        if Y.shape[0] == 0:
            raise Exception("The dataset is empty. Please check the data files.")
        Y = self._one_hot_encode(Y)

        if not os.path.exists(Path(self.save_path, "encoded_mapping.txt")):
            # Save encoded mapping as text file
            encoded_mapping = self.get_encoded_mapping()
            with open(Path(self.save_path, "encoded_mapping.txt"), "w") as f:
                f.write(str(encoded_mapping))

        # Save the dataset
        with open(Path(save_type_path, "X.pkl"), "wb") as f:
            pickle.dump(X, f)
        with open(Path(save_type_path, "Y.pkl"), "wb") as f:
            pickle.dump(Y, f)

        self.logger.info(
            "Dataset created and saved at " + save_type_path + "/X.pkl"
        )
    self._distribution = preprocessing.check_distribution(Y)

get_image_shape

get_image_shape() -> tuple

Returns the shape of one image

Source code in eso/model/data.py
def get_image_shape(self) -> tuple:
    """Returns the shape of one image"""
    return self.image_shape

get_data

get_data(type='train') -> tuple

Returns the dataset

Returns:

Name Type Description
X ndarray

The Images

Y ndarray

The labels

Source code in eso/model/data.py
def get_data(self, type="train") -> tuple:
    """Returns the dataset
    Returns
    -------
    X : ndarray
        The Images
    Y : ndarray
        The labels
    """
    path = Path(self.save_path, type)
    # Check if the dataset exists
    if not os.path.exists(os.path.join(path, "X.pkl")):
        raise Exception(
            "The dataset does not exist. Please create the dataset first."
        )
    # Check keep in memory flag
    if self._keep_in_memory:
        # Check if the dataset is already loaded
        if not hasattr(self, "_X"):
            self.logger.debug("Loading dataset into memory...")
            self._X, self._Y = self._load_dataset(type)
        else:
            self.logger.debug("Dataset already loaded into memory.")
        X = self._X
        Y = self._Y
    else:
        self.logger.debug("Loading dataset...")
        X, Y = self._load_dataset(type)
    return X, Y

get_encoded_mapping

get_encoded_mapping()

Returns the encoded mapping of the labels

Source code in eso/model/data.py
def get_encoded_mapping(self):
    """Returns the encoded mapping of the labels"""
    # Check if the encoder is fitted
    if not hasattr(self, "_encoder"):
        if os.path.exists(Path(self.save_path, "encoded_mapping.txt")):
            with Path(self.save_path, "encoded_mapping.txt").open("r") as file:
                encoded_mapping = file.read()
            return encoded_mapping
        else:
            raise Exception(
                "The encoder is not fitted and no file found. Please fit the encoder first."
            )
    # The categories are stored in a list of lists
    categories = self._encoder.categories_[0]
    # Create a dictionary of the categories
    categories_one_hot = self._encoder.transform(
        categories.reshape(-1, 1)
    ).toarray()
    categories_dict = dict(zip(categories, categories_one_hot))
    return categories_dict

eso.model.model

The training and inference wrapper around a CNN. Model is used both by ESO to train the baseline and by every Chromosome to train its own CNN on the extracted bands. It owns the optimiser, the loss function, and the early-stopping logic. It also implements get_number_of_parameters, which produces the parameter count used in the fitness equation.

Model

Model(
    results_path,
    input_shape,
    optimizer_name: str,
    loss_function_name: str,
    batch_size: int,
    learning_rate: float,
    num_epochs: int,
    metric: str,
    architecture_args: dict,
    shuffle: bool = True,
    logger=None,
    use_chromosome=False,
    patience=3,
    min_delta=0.005,
)

Model class.

Source code in eso/model/model.py
def __init__(
    self,
    results_path,
    input_shape,
    optimizer_name: str,
    loss_function_name: str,
    batch_size: int,
    learning_rate: float,
    num_epochs: int,
    metric: str,
    architecture_args: dict,
    shuffle: bool = True,
    logger =None,
    use_chromosome=False,  #keep in case we allow different architecture for chromosome
    patience=3, min_delta=0.005
):


    architecture = architecture_args
    self.cnn = BaseCNN(input_shape=input_shape, **architecture)
    architecture = architecture.copy()
    architecture["input_shape"] = input_shape
    self._architecture = architecture

    # self.logger.info("Initializing Model...")
    # Get Device
    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    #to save the model 
    self.results_path=results_path
    self.optimizer_name = optimizer_name
    self.learning_rate = learning_rate
    self.loss_name = loss_function_name
    self.batch_size = batch_size
    self.shuffle = shuffle
    self.n_epochs = num_epochs
    self.logger = logger
    self.metric = metric

    self._set_optimizer_and_loss()

    #earlystopping
    self.patience = patience
    self.min_delta = min_delta
    self.counter = 0
    self.min_validation_loss = float('inf')

cnn instance-attribute

cnn = BaseCNN(input_shape=input_shape, **architecture)

device instance-attribute

device = device('cuda' if is_available() else 'cpu')

results_path instance-attribute

results_path = results_path

optimizer_name instance-attribute

optimizer_name = optimizer_name

learning_rate instance-attribute

learning_rate = learning_rate

loss_name instance-attribute

loss_name = loss_function_name

batch_size instance-attribute

batch_size = batch_size

shuffle instance-attribute

shuffle = shuffle

n_epochs instance-attribute

n_epochs = num_epochs

logger instance-attribute

logger = logger

metric instance-attribute

metric = metric

patience instance-attribute

patience = patience

min_delta instance-attribute

min_delta = min_delta

counter instance-attribute

counter = 0

min_validation_loss instance-attribute

min_validation_loss = float('inf')

load_cnn staticmethod

load_cnn(cnn_dict, device)

Load the model from a saved state dictionary of the CNN.

Parameters:

Name Type Description Default
cnn_dict_path str

Path to the saved cnn model dictionary.

required

Returns:

Type Description
Model

The loaded model.

Source code in eso/model/model.py
@staticmethod
def load_cnn(cnn_dict, device):
    """
    Load the model from a saved state dictionary of the CNN.

    Parameters
    ----------
    cnn_dict_path : str
        Path to the saved cnn model dictionary.

    Returns
    -------
    Model
        The loaded model.
    """
    # Check if its a path or a dictionary
    if type(cnn_dict) == dict:
        dictionary = cnn_dict
    else:
        if os.path.exists(cnn_dict):
            dictionary = torch.load(cnn_dict, map_location=device)
        else:
            raise FileNotFoundError(f"Model file {cnn_dict} not found")
    cnn = BaseCNN(**dictionary["architecture"])
    cnn.load_state_dict(dictionary["state_dict"])
    return cnn

load

load(model_dict)
Source code in eso/model/model.py
def load(self, model_dict):
    self.cnn = BaseCNN(**model_dict["architecture"])
    self.cnn.load_state_dict(model_dict["state_dict"])

get_model_dict

get_model_dict()
Source code in eso/model/model.py
def get_model_dict(self):
    return {"state_dict": self.cnn.state_dict(), "architecture": self._architecture}

get_number_of_parameters

get_number_of_parameters()
Source code in eso/model/model.py
def get_number_of_parameters(self):
    return sum(p.numel() for p in self.cnn.parameters() if p.requires_grad)

get_minimum_input_shape

get_minimum_input_shape()
Source code in eso/model/model.py
def get_minimum_input_shape(self):
    return self.cnn.calculate_min_input_size()

train

train(
    X_train,
    Y_train,
    X_val,
    Y_val,
    save=True,
    model_name="baseline",
    save_path=None,
    verbose=False,
)
Source code in eso/model/model.py
def train(self, X_train, Y_train, X_val, Y_val, save=True, model_name="baseline", save_path=None, verbose=False):
    # Create Dataloaders
    train_loader = self._create_dataloader(X_train, Y_train)
    val_loader = self._create_dataloader(X=X_val, Y=Y_val)
    val_losses=[]
    train_losses = []
    min_val_loss = torch.inf

    for epoch in range(self.n_epochs):
        epoch_train_loss=self._train_one_epoch(train_loader)
        train_losses.append(epoch_train_loss)

        epoch_val_loss = self._val_one_epoch(val_loader)
        val_losses.append(epoch_val_loss)

        if save: 
            if min_val_loss > epoch_val_loss:
                min_val_loss = epoch_val_loss
                # Saving State Dict
                self.save_model(self.results_path, model_name)


        if self._early_stop(epoch_val_loss):             
            break



    return train_losses, val_losses

evaluate

evaluate(X_val, Y_val, metric=None, threshold=None, print_report=False)
Source code in eso/model/model.py
def evaluate(self, X_val, Y_val, metric=None, threshold=None, print_report=False):
    if metric is None:
        metric = self.metric
    loader = self._create_dataloader(X=X_val, Y=Y_val)
    self.cnn.eval()

    with torch.no_grad():
        total_loss = 0
        targets = []
        predictions = []
        for batch_inputs, batch_targets in loader:
            batch_inputs, batch_targets = batch_inputs.to(
                self.device
            ), batch_targets.to(self.device)
            # print("targets: ", batch_targets)
            batch_preds = self.cnn.forward(batch_inputs)
            total_loss += self.criterion(batch_preds, batch_targets).item()

            #Predict true label if probability is greater than 0.7
            if threshold is not None:
                prediction = (batch_preds > threshold).float()
            else:
                prediction = batch_preds.argmax(dim=1).cpu()

            target = batch_targets.argmax(dim=1).cpu()
            # print("model prediction: ", batch_preds)
            # print("Prediction: ", prediction)
            # print("Target: ", target)

            predictions.extend(prediction.detach().numpy())
            targets.extend(target.detach().numpy())

        f1 = f1_score(targets, predictions)
        report = classification_report(targets, predictions)
        confusion = confusion_matrix(targets, predictions)
        if print_report:
            print(report)
            print(confusion)
        # print("F1: ", f1)
        # print("true positives: ", np.sum(np.logical_and(targets, predictions)))
        # print("true negatives: ", np.sum(np.logical_and(np.logical_not(targets), np.logical_not(predictions))))
        # print("false positives: ", np.sum(np.logical_and(np.logical_not(targets), predictions)))
        # print("false negatives: ", np.sum(np.logical_and(targets, np.logical_not(predictions))))
        accuracy = accuracy_score(targets, predictions)

    if metric == "f1":
        return (f1, "F1")
    elif metric == "accuracy":
        return (accuracy, "Accuracy")
    else:
        raise ValueError(f"Unsupported metric: {metric!r}. Use 'f1' or 'accuracy'.")

save_model

save_model(path, model_name)
Source code in eso/model/model.py
def save_model(self, path, model_name):
    save_path = os.path.join(Path(path, model_name + "_cnn_state.pth"))
    self._model_state_dict = deepcopy(self.get_model_dict())
    torch.save(self._model_state_dict, save_path)
    self.logger.info(f"CNN model state dict saved to {save_path}!")