Modules under eso.model cover the CNN architecture, the dataset pipeline, and the training and evaluation wrappers used by both the baseline and the per-chromosome models. See CNN training and fitness for the algorithmic context.
Symbol
File
Role
BaseCNN
eso/model/cnn.py
The shared CNN architecture (1 conv layer, max-pool, two FC layers).
calc_back_conv
eso/model/cnn.py
Backward computation of the minimum legal input size through a convolution.
calc_back_pool
eso/model/cnn.py
The pooling counterpart of calc_back_conv.
Data
eso/model/data.py
Audio to spectrogram pipeline, splits, and dataset caching.
Model
eso/model/model.py
Train, evaluate, save, and load wrapper around the CNN.
The CNN architecture. The default is a simple stack: one Conv2d → ReLU → MaxPool block followed by a flatten and two fully connected layers ending in a 2-unit softmax. Sizing is parameterised through ArchitectureConfig.
The module also exposes the helpers calc_back_conv and calc_back_pool, which walk the convolution and pool stack backwards to compute the smallest input shape the network can accept. ESO uses these helpers at startup to validate every gene's height against the architecture, so impossible configurations fail fast rather than during training.
def__init__(self,input_shape,conv_layers,conv_filters,dropout_rate,conv_kernel,max_pooling_size,fc_units,fc_layers,conv_padding=None,stride_maxpool=None,):"""Base CNN model for the classification of the images Parameters ---------- input_shape : tuple The input shape of the images in the form of (n_channels, height, width) conv_layers : int The number of convolutional layers conv_filters : int The number of filters in the convolutional layers dropout_rate : float The dropout rate of the dropout layers conv_kernel : int The kernel size of the convolutional layers max_pooling_size : int The kernel size of the max pooling layers fc_units : int The number of units in the fully connected layers fc_layers : int The number of fully connected layers """super(BaseCNN,self).__init__()self.input_shape=input_shapen_channels=input_shape[0]self.n_conv_layers=conv_layersself.conv_filters=conv_filtersself.dropout_rate=dropout_rateself.conv_kernel=conv_kernelself.max_pooling_size=max_pooling_sizeself.fc_units=fc_unitsself.n_fc_layers=fc_layersself.conv_padding=conv_paddingifstride_maxpoolisNone:self.stride_maxpool=max_pooling_sizeelse:self.stride_maxpool=stride_maxpoolifconv_paddingisNone:self.conv_padding=0else:self.conv_padding=conv_padding# Convolutional layersself.conv_layers=nn.Sequential()self.conv_layers.add_module("conv0",nn.Conv2d(n_channels,self.conv_filters,kernel_size=self.conv_kernel,padding=self.conv_padding),)self.conv_layers.add_module("relu0",nn.ReLU())self.conv_layers.add_module("dropout0",nn.Dropout(self.dropout_rate))self.conv_layers.add_module("maxpool0",nn.MaxPool2d(self.max_pooling_size,stride=self.stride_maxpool))foriinrange(1,self.n_conv_layers):self.conv_layers.add_module(f"conv{i}",nn.Conv2d(self.conv_filters,self.conv_filters,kernel_size=self.conv_kernel),)self.conv_layers.add_module(f"relu{i}",nn.ReLU())self.conv_layers.add_module(f"dropout{i}",nn.Dropout(self.dropout_rate))self.conv_layers.add_module(f"maxpool{i}",nn.MaxPool2d(self.max_pooling_size))# Fully connected layersself.fc_layers=nn.Sequential()# input_units = self.conv_filters * (128 // (self.max_pooling_size ** self.n_conv_layers)) * (76 // (self.max_pooling_size ** self.n_conv_layers))input_units=np.prod(self._calc_cnn_output_dim())foriinrange(self.n_fc_layers):self.fc_layers.add_module(f"fc{i}",nn.Linear(input_units,self.fc_units))self.fc_layers.add_module(f"relu{i}",nn.ReLU())self.fc_layers.add_module(f"dropout{i}",nn.Dropout(self.dropout_rate))input_units=self.fc_units# Output layerself.output_layer=nn.Linear(self.fc_units,2)self.softmax=nn.Softmax(dim=1)
defcalculate_min_input_size(self):# Start with a size of 1 (minimum meaningful size)min_height,min_width=1,1# Convert the generator to a list and reverse iterate through the layers of the CNNforlayerinreversed(list(self.modules())):ifisinstance(layer,nn.Conv2d):min_height=calc_back_conv(min_height,layer,0)# for heightmin_width=calc_back_conv(min_width,layer,1)# for widthelifisinstance(layer,nn.MaxPool2d):min_height=calc_back_pool(min_height,layer,0)# for heightmin_width=calc_back_pool(min_width,layer,1)# for widthreturnint(min_height),int(min_width)
defforward(self,x):"""Forward pass of the network Parameters ---------- x : torch.Tensor The input tensor. Shape should be (batch_size, n_channels, height, width) Returns ------- torch.Tensor The output tensor. Shape should be (batch_size, n_classes). Outputs a probability for each class. """x=self.conv_layers(x)x=x.view(x.size(0),-1)# Flatten the tensor# print("x size: ", x.size())x=self.fc_layers(x)x=self.output_layer(x)x=self.softmax(x)returnx
defcalc_back_conv(input_size,conv_layer,dim):"""Calculate the input size of a Conv2d layer Reverse calculation of the output size of a Conv2d layer. This is used to calculate the minimum input size of a CNN. Parameters ---------- input_size : int The output size of the Conv2d layer conv_layer : torch.nn.Conv2d The Conv2d layer to calculate the input size of dim : int The dimension to calculate the input size of. 0 for height, 1 for width Returns ------- int The input size of the Conv2d layer """kernel_size=conv_layer.kernel_size[dim]stride=conv_layer.stride[dim]padding=conv_layer.padding[dim]dilation=conv_layer.dilation[dim]#return ((input_size - 1) * stride) - 2 * padding + dilation * (kernel_size - 1) + 1#correction ? return((input_size-1)*stride)-2*padding+kernel_size
defcalc_back_pool(input_size,pool_layer,dim):"""Calculate the input size of a MaxPool2d layer Reverse calculation of the output size of a MaxPool2d layer. This is used to calculate the minimum input size of a CNN. Parameters ---------- input_size : int The output size of the MaxPool2d layer pool_layer : torch.nn.MaxPool2d The MaxPool2d layer to calculate the input size of dim : int The dimension to calculate the input size of. 0 for height, 1 for width Returns ------- int The input size of the MaxPool2d layer """kernel_size=(pool_layer.kernel_sizeifisinstance(pool_layer.kernel_size,int)elsepool_layer.kernel_size[dim])stride=(pool_layer.strideifisinstance(pool_layer.kernel_size,int)elsepool_layer.kernel_size[dim])#return input_size * kernel_size#correction ?return((input_size-1)*stride)+kernel_size
defget_conv_output_dim(layer:nn.Module,input_dim:tuple)->tuple:"""Calculate output dimension of a CNN layer Parameters ---------- layer : torch.nn.Module The CNN layer to calculate the output dimension of input_dim : tuple The input dimension of the CNN layer in the form of (n_channels, height, width) Returns ------- tuple The output dimension of the CNN layer in the form of (n_channels, height, width) """kernel_size=layer.kernel_sizestride=layer.stridepadding=layer.paddingdilation=layer.dilationinput_channels,input_height,input_width=input_dimoutput_channels=layer.out_channelsoutput_height=(input_height+2*padding[0]-dilation[0]*(kernel_size[0]-1)-1)/stride[0]+1output_width=(input_width+2*padding[1]-dilation[1]*(kernel_size[1]-1)-1)/stride[1]+1return(output_channels,int(output_height),int(output_width))
The data pipeline. Data.create_datasets reads audio, parses annotations, segments to fixed-length windows, generates mel-spectrograms, applies optional class balancing via augmentation, and writes the train/validation/test splits to disk (or holds them in memory if keep_in_memory is set). The same splits are reused across the baseline and every chromosome's CNN training, so all individuals see identical data.
def__init__(self,apply_preprocessing:bool,force_recreate_dataset:bool,species_folder:str,keep_in_memory:bool,preprocessing_args:dict,train_size:float,test_size:float,positive_class:str,negative_class:str,reshuffle:bool=False,logger=None,log_path=None,log_level=10,)->None:"""Initialize the Data class Parameters ---------- config : dict The config dictionary containing the settings for preprocessing logger : logging.Logger The logger object to log messages Returns ------- None """# This should only contain the confg for data settingsself.logger=setup_logger(logger=logger,log_path=log_path,log_level=log_level)self._positive_class=positive_classself._negative_class=negative_classself._preprocessing_flag=apply_preprocessingself._force_recreate_dataset=force_recreate_datasetself.species_folder=species_folderself._keep_in_memory=keep_in_memoryself._train_size=train_sizeself._reshuffle=reshuffleself._test_size=test_sizeself.preprocessing_args=preprocessing_args
defcreate_datasets(self):types=["train","validation","test"]# self._shuffle_files_names()ifself._preprocessing_flag:preproces_name="preprocessed"else:preproces_name="unpreprocessed"self.save_path=Path(self.species_folder,"SavedData",preproces_name)preprocessing=Preprocessing(**self.preprocessing_args,apply_preprocessing=self._preprocessing_flag,species_folder=self.species_folder,positive_class=self._positive_class,negative_class=self._negative_class,)train_path=Path(self.species_folder,"DataFiles","train.txt")validation_path=Path(self.species_folder,"DataFiles","validation.txt")test_path=Path(self.species_folder,"DataFiles","test.txt")if(os.path.exists(train_path)andos.path.exists(validation_path)andos.path.exists(test_path)):# This means the files have already been shuffled,# check if they should be reshuffled egeinifself._reshuffle:self.logger.info("Found Existing Files but reshuffle flag ist set. Reshuffling..")preprocessing.shuffle_files_names(train_size=self._train_size,test_size=self._test_size)else:self.logger.info("Found already existing shuffled file names! Loading from memory..")else:# Files dont exist, create the splitself.logger.info("Reshuffling file names for the first time...")preprocessing.shuffle_files_names(train_size=self._train_size,test_size=self._test_size)fortypeintypes:save_type_path=str(Path(self.save_path)/type)# Check if the dataset already existsif(os.path.exists(Path(save_type_path,"X.pkl"))andnotself._force_recreate_dataset):self.logger.info("The dataset already exists. Skipping...")ifnothasattr(self,"image_shape"):# Load the dataset to get the image shapeself.logger.debug("Loading dataset to set image shape...")X,Y=self._load_dataset(type)self.image_shape=X.shape[1:]continue# Create the folderos.makedirs(save_type_path,exist_ok=True)path=Path(self.species_folder,"DataFiles",f"{type}.txt")self.logger.debug("File path: "+str(path))iftype=="train":self.logger.info("Creating the training dataset")# Create the dataset WITH augmentationX,Y=preprocessing.create_dataset(file_names=path,augmentation=True,annotation_folder="Annotations",sufix_file=".svl",)else:self.logger.info("Creating the validation dataset")X,Y=preprocessing.create_dataset(file_names=path,augmentation=False,annotation_folder="Annotations",sufix_file=".svl",)ifnothasattr(self,"image_shape"):self.image_shape=X.shape[1:]# Check if the dataset is emptyifY.shape[0]==0:raiseException("The dataset is empty. Please check the data files.")Y=self._one_hot_encode(Y)ifnotos.path.exists(Path(self.save_path,"encoded_mapping.txt")):# Save encoded mapping as text fileencoded_mapping=self.get_encoded_mapping()withopen(Path(self.save_path,"encoded_mapping.txt"),"w")asf:f.write(str(encoded_mapping))# Save the datasetwithopen(Path(save_type_path,"X.pkl"),"wb")asf:pickle.dump(X,f)withopen(Path(save_type_path,"Y.pkl"),"wb")asf:pickle.dump(Y,f)self.logger.info("Dataset created and saved at "+save_type_path+"/X.pkl")self._distribution=preprocessing.check_distribution(Y)
defget_data(self,type="train")->tuple:"""Returns the dataset Returns ------- X : ndarray The Images Y : ndarray The labels """path=Path(self.save_path,type)# Check if the dataset existsifnotos.path.exists(os.path.join(path,"X.pkl")):raiseException("The dataset does not exist. Please create the dataset first.")# Check keep in memory flagifself._keep_in_memory:# Check if the dataset is already loadedifnothasattr(self,"_X"):self.logger.debug("Loading dataset into memory...")self._X,self._Y=self._load_dataset(type)else:self.logger.debug("Dataset already loaded into memory.")X=self._XY=self._Yelse:self.logger.debug("Loading dataset...")X,Y=self._load_dataset(type)returnX,Y
defget_encoded_mapping(self):"""Returns the encoded mapping of the labels"""# Check if the encoder is fittedifnothasattr(self,"_encoder"):ifos.path.exists(Path(self.save_path,"encoded_mapping.txt")):withPath(self.save_path,"encoded_mapping.txt").open("r")asfile:encoded_mapping=file.read()returnencoded_mappingelse:raiseException("The encoder is not fitted and no file found. Please fit the encoder first.")# The categories are stored in a list of listscategories=self._encoder.categories_[0]# Create a dictionary of the categoriescategories_one_hot=self._encoder.transform(categories.reshape(-1,1)).toarray()categories_dict=dict(zip(categories,categories_one_hot))returncategories_dict
The training and inference wrapper around a CNN. Model is used both by ESO to train the baseline and by every Chromosome to train its own CNN on the extracted bands. It owns the optimiser, the loss function, and the early-stopping logic. It also implements get_number_of_parameters, which produces the parameter count used in the fitness equation.
def__init__(self,results_path,input_shape,optimizer_name:str,loss_function_name:str,batch_size:int,learning_rate:float,num_epochs:int,metric:str,architecture_args:dict,shuffle:bool=True,logger=None,use_chromosome=False,#keep in case we allow different architecture for chromosomepatience=3,min_delta=0.005):architecture=architecture_argsself.cnn=BaseCNN(input_shape=input_shape,**architecture)architecture=architecture.copy()architecture["input_shape"]=input_shapeself._architecture=architecture# self.logger.info("Initializing Model...")# Get Deviceself.device=torch.device("cuda"iftorch.cuda.is_available()else"cpu")#to save the model self.results_path=results_pathself.optimizer_name=optimizer_nameself.learning_rate=learning_rateself.loss_name=loss_function_nameself.batch_size=batch_sizeself.shuffle=shuffleself.n_epochs=num_epochsself.logger=loggerself.metric=metricself._set_optimizer_and_loss()#earlystoppingself.patience=patienceself.min_delta=min_deltaself.counter=0self.min_validation_loss=float('inf')
@staticmethoddefload_cnn(cnn_dict,device):""" Load the model from a saved state dictionary of the CNN. Parameters ---------- cnn_dict_path : str Path to the saved cnn model dictionary. Returns ------- Model The loaded model. """# Check if its a path or a dictionaryiftype(cnn_dict)==dict:dictionary=cnn_dictelse:ifos.path.exists(cnn_dict):dictionary=torch.load(cnn_dict,map_location=device)else:raiseFileNotFoundError(f"Model file {cnn_dict} not found")cnn=BaseCNN(**dictionary["architecture"])cnn.load_state_dict(dictionary["state_dict"])returncnn
defsave_model(self,path,model_name):save_path=os.path.join(Path(path,model_name+"_cnn_state.pth"))self._model_state_dict=deepcopy(self.get_model_dict())torch.save(self._model_state_dict,save_path)self.logger.info(f"CNN model state dict saved to {save_path}!")