Skip to content

eso.ESO

The top-level orchestrator. Pass the path of a JSON settings file. Call run(). The data preparation, baseline training, evolution loop, and evaluation stages are all driven from inside this class.

A typical invocation looks like:

from eso import ESO

e = ESO(settings_path="settings/my_experiment.json")
e.run()

What run() does

The run() method is a thin wrapper that walks the full pipeline.

Step Method called Description
1 _load_settings Validate the JSON against eso.utils.settings.
2 _setup_logging Configure file and TensorBoard loggers.
3 _prepare_data Build the preprocessed and unprocessed mel-spectrogram datasets.
4 _train_baseline Train the baseline CNN on the preprocessed dataset. Record its F1 and parameter count.
5 _initialise_population Create a random population of chromosomes.
6 optimize() Iterate max_generations of selection, mutation, crossover, and reproduction.
7 evaluate() Run sliding-window inference on the test set with the best chromosome.

The methods marked as private (prefixed with _) are filtered out of the public reference below, but the public surface (run, optimize, evaluate) is documented in full.

Settings

ESO(settings_path=...) is the only required argument. The JSON file is parsed into the dataclass hierarchy in eso.utils.settings. See Configuration for every field.

Reference

ESO

ESO(
    settings,
    stop_event=None,
    logger=None,
    population_file_path=None,
    log_level=0,
    log_path=None,
    tensorboard_log_dir=None,
    results_path="results",
    progress_handler=None,
)

The main class for the ESO algorithm.

This class is responsible for training the baseline model and performing the genetic algorithm to find the optimal band positions and heights. The ESO class is initialized with a settings file. The settings file is a json file that contains all the parameters for the algorithm, or a dictionary containing the parameters. The settings file must contain the following parameters: - data: The parameters for the data - preprocessing: The parameters for the preprocessing - model: The parameters for the model - chromosome: The parameters for the chromosome - gene: The parameters for the gene - population: The parameters for the population - selection_operator: The parameters for the selection operator - genetic_operator: The parameters for the genetic operator - algorithm: The parameters for the algorithm

Check the documentation for the parameters of each class.

Parameters:

Name Type Description Default
settings str or dict

The path to the settings file or a dictionary containing the parameters

required
stop_event Event

The event to stop the algorithm, by default None. Used if run from the GUI.

None
logger Logger

The logger to use, by default None. If None, a logger is created.

None
population_file_path str

The path to the population file, by default None. If None, a new population is created.

None
log_level int

The log level to use, by default 0. Check the logging module for the different log levels.

0
log_path str

The path to the log file, by default None. If None, the log is not saved to a file.

None
tensorboard_log_dir str

The directory to log to tensorboard, by default None. If None, tensorboard is not used.

None
results_path str

The path to save the results, by default "results"

'results'
progress_bar tkinter progress bar

The progress bar to update, by default None. Only used if run from the GUI.

required
progress_bar_training tkinter progress bar

The progress bar to update during training, by default None. Only used if run from the GUI.

required

Raises:

Type Description
ImportError

If the baseline.json file exists but could not be loaded.

ValueError

If max_generations is not specified in the settings file or as an argument to the method

ValueError

If the sum of mutation_rate, crossover_rate and reproduction_rate is not 1

ValueError

If the number of new chromosomes does not match the population size

Examples:

>>> from eso import ESO
>>> eso = ESO(settings="settings.json")
>>> eso.opimize(max_generations=100)
Source code in eso/eso.py
def __init__(
    self,
    settings,
    stop_event=None,
    logger=None,
    population_file_path=None,
    log_level=0,
    log_path=None,
    tensorboard_log_dir=None,
    results_path="results",
    progress_handler=None,
):

    self.logger = setup_logger(
        logger=logger, log_path=log_path, log_level=log_level, name="eso"
    )
    self.evolution_logger = setup_logger(
        logger=None,
        log_path=log_path,
        log_level=log_level,
        name="evolution",
        add_stream_handler=False,
    )
    self.population_logger = setup_logger(
        logger=None,
        log_path=log_path,
        log_level=log_level,
        name="population",
        add_stream_handler=False,
    )




    init_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    self.logger.info(f"{init_time} Initializing ESO...")
    self.results_path = results_path
    #save the settings
    with open(os.path.join(self.results_path, "settings.json"), "w") as f:
        json.dump(settings, f, indent=4)


    self.config = Config(settings)
    self.logger.debug(f"Config: {self.config}")
    self.stop_event = stop_event
    self.population_file_path = population_file_path
    self.tensorboard_log_dir = tensorboard_log_dir
    self.progress_handler = progress_handler
    # If tensorboard_log_dir is None, self.writer is None
    os.makedirs(self.results_path, exist_ok=True)
    self._all_time_best_fitness = -np.inf
    self._best_chromosome = None

    # Allow -1 as a special value for the GUI, meaning "automatic sizing"
    if self.config.gene.band_height == -1:
        self.config.gene.band_height = None

    if self.config.gene.band_height is not None:
        self.band_height_fixed=True 
    else : self.band_height_fixed = False 

    if self.config.chromosome.num_genes is not None and self.config.chromosome.num_genes != -1 : 
        self.nb_genes_fixed = True 
    else : self.nb_genes_fixed = False

    if self.config.gene.band_position is not None and self.config.gene.band_position != -1 : 
        self.band_position_fixed = True
    else : self.band_position_fixed = False 

logger instance-attribute

logger = setup_logger(logger=logger, log_path=log_path, log_level=log_level, name="eso")

evolution_logger instance-attribute

evolution_logger = setup_logger(
    logger=None,
    log_path=log_path,
    log_level=log_level,
    name="evolution",
    add_stream_handler=False,
)

population_logger instance-attribute

population_logger = setup_logger(
    logger=None,
    log_path=log_path,
    log_level=log_level,
    name="population",
    add_stream_handler=False,
)

results_path instance-attribute

results_path = results_path

config instance-attribute

config = Config(settings)

stop_event instance-attribute

stop_event = stop_event

population_file_path instance-attribute

population_file_path = population_file_path

tensorboard_log_dir instance-attribute

tensorboard_log_dir = tensorboard_log_dir

progress_handler instance-attribute

progress_handler = progress_handler

band_height_fixed instance-attribute

band_height_fixed = True

nb_genes_fixed instance-attribute

nb_genes_fixed = True

band_position_fixed instance-attribute

band_position_fixed = True

run

run()
Source code in eso/eso.py
def run(self):
    max_generations = self.config.algorithm.max_generations
    self.writer = setup_tensorboard(self.tensorboard_log_dir, self.logger)
    base_results = self._get_baseline_results()
    self._write_baseline_results_to_config(base_results)
    self._check_minimum_image_shape()
    if self.progress_handler:
        self.progress_handler.set_main_value(0)
        self.progress_handler.set_main_max(max_generations)

    self.optimize(max_generations)
    # Clean up and save
    if self.results_path is not None:
        self._save_results()
    if self.tensorboard_log_dir is not None:
        # Save hyperparameters
        metric_dict = {
            "metric": self._best_chromosome.get_metric(),
            "fitness": self._best_chromosome.get_fitness(),
            "trainable_params": self._best_chromosome.get_trainable_parameters(),
        }

        self.writer.add_hparams(
            self.config.get_params(),
            metric_dict=metric_dict,
        )
        # add dict as text
        self.writer.add_text(
            "Best Chromosome",
            str(self._best_chromosome),
            0,
        )
        # add hyperparams as text
        self.writer.add_text(
            "Hyperparameters",
            str(self.config.get_params()),
            0,
        )
        self.writer.close()

    self.logger.info("Algorithm finished!")
    self.logger.info("All-time best Chromosome:")
    self.logger.info(self._best_chromosome)
    self.logger.info("Now retraining for full epochs...")
    self._retrain_full()
    return self._best_chromosome

optimize

optimize(max_generations, log_evolution=False)

Perform Genetic Algorithm to find optimal band positions and heights.

This method will first train the baseline model and then perform the genetic algorithm to find the optimal band positions and heights. At each epoch, the population is trained and then evolved. The best chromosome is logged to tensorboard.

Parameters:

Name Type Description Default
max_generations int

The maximum number of generations to run the algorithm for, by default None

required

Raises:

Type Description
ValueError

If max_generations is not specified in the settings file or as an argument to the method

ValueError

If the sum of mutation_rate, crossover_rate and reproduction_rate is not 1

ValueError

If the number of new chromosomes does not match the population size

Source code in eso/eso.py
def optimize(self, max_generations, log_evolution=False):
    """Perform Genetic Algorithm to find optimal band positions and heights.

    This method will first train the baseline model and then perform the genetic algorithm to find the optimal band positions and heights.
    At each epoch, the population is trained and then evolved. The best chromosome is logged to tensorboard.

    Parameters
    ----------
    max_generations : int, optional
        The maximum number of generations to run the algorithm for, by default None

    Raises
    ------
    ValueError
        If max_generations is not specified in the settings file or as an argument to the method
    ValueError
        If the sum of mutation_rate, crossover_rate and reproduction_rate is not 1
    ValueError
        If the number of new chromosomes does not match the population size
    """
    if log_evolution:
        with open(os.path.join(self.results_path, "evolution.log"), "w"):
            pass
    # TODO refactor this
    # only implement the optimization here
    self.logger.info("Optimizing...")
    data_dict = self.config.data.dict().copy()
    data_dict["force_recreate_dataset"] = False
    data = Data(
        apply_preprocessing=False,
        logger=self.logger,
        preprocessing_args=self.config.preprocessing.dict(),
        **data_dict,
    )
    self.logger.debug("Creating datasets for chromosomes...")
    data.create_datasets()
    # Check distribution
    self.logger.info(f"Encoding: {data.get_encoded_mapping()}")
    # Initialize Population
    if self.population_file_path is not None:
        # NOTE maybe this breaks if Baseline was trained again
        self.population = Population.load(
            self.population_file_path, data=data, logger=self.logger
        )
        self.logger.info(f"Loaded population from {self.population_file_path}")
        self.population_logger.info(f"Loaded population from {self.population_file_path}")
        self.population_logger.info(self.population)
    else:
        self.population = Population(self.results_path,
            **self.config.population.dict(),
            chromosome_args=self.config.chromosome.dict(),
            gene_args=self.config.gene.dict(),
            model_args=self.config.model.dict(),
            architecture_args=self.config.cnn_architecture.dict(),
            logger=self.logger,
            data=data,
        )
        self.logger.info("Creating Population from scratch.")
        self.population_logger.info("Creating Population from scratch.")
        self.population_logger.info(self.population)

    # Initialize Selection operator
    self.parent_selector = SelectionOperator(
        **self.config.selection_operator.dict(), 
    )

    # Initiliaze Genetic Operator
    self.genetic_operator = GeneticOperator(self.band_height_fixed,self.band_position_fixed,self.config.gene.spec_height,**self.config.genetic_operator.dict())
    start_eso_loop = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    self.logger.info(f"{start_eso_loop} Starting ESO...")
    for epoch in range(max_generations):
        # Check if the stop event is set
        if self.stop_event is not None:
            if self.stop_event.is_set():
                self.logger.info("Stopping ESO...")
                break

        if self.progress_handler: # << MODIFIED
            self.progress_handler.set_main_value(epoch + 1) # +1 because epoch is 0-indexed

        if epoch != 0:
            self.population.reset_trained_flags()
        # Train the population
        self.logger.info(f"---------- Epoch {epoch} / {max_generations} ----------")
        self.evolution_logger.info(
            f"---------- Epoch {epoch} / {max_generations} ----------"
        )
        self.population_logger.info(
            f"---------- Epoch {epoch} / {max_generations} ----------"
        )
        # this will evaluate the fitness of each chromosome
        stop = self.population.train_population(
            progress_handler=self.progress_handler,
            stop_event=self.stop_event,
        )

        self.evolve_population()

        # Log the best chromosome
        best_chromosome = self.population.get_best_chromosome()
        self.logger.debug(
            f"Current Best Chromosome Fitness: {best_chromosome.get_fitness()}"
        )
        if best_chromosome.get_fitness() > self._all_time_best_fitness:
            self.logger.debug("Better Chromosome!")

            self._best_chromosome = deepcopy(best_chromosome)
            self._all_time_best_fitness = best_chromosome.get_fitness()
            image_name_base = "all_time_best_chromosome"
            image_full_path = os.path.abspath(
                os.path.join(self.results_path, f"{image_name_base}.png")
            )
            plot_chromosome(
                best_chromosome,
                self.config.gene.spec_height,
                self.config.model.metric,
                self.results_path,
                name=image_name_base,
            )

            if self.progress_handler: # << NOTIFY HANDLER
                self.progress_handler.notify_best_chromosome_image_updated(image_full_path)

            if self.results_path is not None:
                self._save_results()

        # Log to Tensorboard
        log_tensorboard(
            best_chromosome=best_chromosome,
            epoch=epoch,
            writer=self.writer,
            tensorboard_log_dir=self.tensorboard_log_dir,
            image_height=self.config.gene.spec_height,
            metric_name=self.config.model.metric,
            results_path=self.results_path,
        )
        if stop:
            self.logger.debug("Stopped Training...")
            break
    # Stop the thread
    if self.stop_event is not None:
        self.stop_event.set()

evolve_population

evolve_population()

Evolve the population using the genetic operator and selection operator

Creates new chromosomes using the genetic operator and replaces the old population with the new one.

Raises:

Type Description
ValueError

If the sum of mutation_rate, crossover_rate and reproduction_rate is not 1

ValueError

If the number of new chromosomes does not match the population size

Source code in eso/eso.py
def evolve_population(self):
    """Evolve the population using the genetic operator and selection operator

    Creates new chromosomes using the genetic operator and replaces the old population with the new one.

    Raises
    ------
    ValueError
        If the sum of mutation_rate, crossover_rate and reproduction_rate is not 1
    ValueError
        If the number of new chromosomes does not match the population size
    """
    mutation_rate = self.config.genetic_operator.mutation_rate
    crossover_rate = self.config.genetic_operator.crossover_rate
    reproduction_rate = self.config.genetic_operator.reproduction_rate

    # Check if they add up to 1
    if round (mutation_rate + crossover_rate + reproduction_rate, 2) != 1:
        raise ValueError(
            "The sum of mutation_rate, crossover_rate and reproduction_rate must be 1"
        )
    population_size = len(self.population)
    self.evolution_logger.debug(f"Population size before: {population_size}")
    mutation_size = int(population_size * mutation_rate)
    # because crossover creates 2 offspring
    crossover_size = int((population_size * crossover_rate)) // 2
    # reproduction_size = population_size - mutation_size - crossover_size

    self.evolution_logger.debug(f"Mutation size: {mutation_size}")
    self.evolution_logger.debug(f"Crossover size: {crossover_size}")
    # TODO MAYBE MOVE THIS TO GENETIC OPERATOR CLASS
    new_chromosomes = []
    for _ in range(mutation_size):
        self.evolution_logger.info("....")
        self.evolution_logger.debug("Mutating...")
        parent = self.parent_selector.select_one_parent(self.population)
        self.evolution_logger.debug(f"Parent Mutation: {str(parent)}")
        offspring = self.genetic_operator.mutate(parent)
        self.evolution_logger.debug(f"Offspring Mutation: {str(offspring)}")
        new_chromosomes.append(offspring)

    for _ in range(crossover_size):
        self.evolution_logger.info("....")
        self.evolution_logger.debug("Crossover...")

        #need to take into account when several genes possible and the height of offsprings is < minimum_gene_height
        max_retries=5 #to avoid infinite loop
        retry_count =0
        while retry_count < max_retries:
            self.evolution_logger.debug(f"Retry count: {retry_count}")
            try:
                self.evolution_logger.debug("Selecting parents for crossover...")
                # Place your code here that might raise an error
                parent1, parent2 = self.parent_selector.select_parents(self.population)
                self.evolution_logger.debug(f"Parent1:{str(parent1)}")
                self.evolution_logger.debug(f"Parent2:{str(parent2)}")

                offspring1, offspring2 = self.genetic_operator.crossover(parent1, parent2)
                self.evolution_logger.debug(f"Offspring1:{str(offspring1)}")
                self.evolution_logger.debug(f"Offspring2:{str(offspring2)}")
                break  # Exit the loop if successful

            except Exception as e:  # if height of the offsprings is too small need to redo the process
                self.evolution_logger.error(f"Error during crossover: {e}")
                print(f"Error occurred: {e}. Retrying... ({retry_count + 1}/{max_retries})")
                retry_count += 1
                if retry_count == max_retries:
                    print("Max retries reached. Exiting...")

        new_chromosomes.append(offspring1)
        new_chromosomes.append(offspring2)

    reproduction_size = population_size - len(new_chromosomes)
    self.evolution_logger.debug(f"Reproduction size: {reproduction_size}")
    for _ in range(reproduction_size):
        self.evolution_logger.info("....")
        self.evolution_logger.debug("Reproduction...")
        parent = self.parent_selector.select_one_parent(self.population)
        # Keep the parent
        new_chromosomes.append(parent)

    # Replace the old population with the new one
    if len(new_chromosomes) != population_size:
        raise ValueError(
            "The number of new chromosomes does not match the population size"
        )
    self.population.replace_chromosomes(new_chromosomes)
    del new_chromosomes
    self.evolution_logger.debug(
        f"Population evolved: population size: {len(self.population)}"
    )
    self.population_logger.info("new population : ")
    self.population_logger.info(self.population)
    self.evolution_logger.info("--------------------------------")
    self.population_logger.info("--------------------------------")

save

save()
Source code in eso/eso.py
def save(self):
    self._save_results()

evaluate

evaluate(
    test_type="simple",
    overlap=0.25,
    nb_to_group=2,
    threshold=0.8,
    save_name=None,
    force_calc_spectrograms=False,
)
Source code in eso/eso.py
def evaluate(self, test_type="simple", overlap=0.25, nb_to_group=2, threshold=0.8 ,save_name=None, force_calc_spectrograms=False):
    starting = datetime.now()


    f_baseline, confusion_matrix_baseline, baseline_params, baseline_image_shape, baseline_pixels, baseline_execution_time = self._evaluate_model(model_type="baseline", test_type=test_type, overlap=overlap, nb_to_group=nb_to_group, force_calc_spectrograms=force_calc_spectrograms, threshold=threshold)
    f_chromosome, confusion_matrix_chromosome, chromosome_params, chromosome_image_shape, chromosome_pixels, chromosome_execution_time = self._evaluate_model(model_type="chromosome", test_type=test_type, overlap=overlap, nb_to_group=nb_to_group, force_calc_spectrograms=force_calc_spectrograms, threshold=threshold)


    # Make confusion matrix into 1d string
    confusion_matrix_baseline_str = " ".join([" ".join(map(str, row)) for row in confusion_matrix_baseline])
    confusion_matrix_chromosome_str = " ".join([" ".join(map(str, row)) for row in confusion_matrix_chromosome])

    # Creates pandas dataframe
    df = pd.DataFrame(columns=["F1", "CONFUSION", "TIME", "PARAMS", "Image Shape", "Image Size"])
    df.loc["baseline"] = [
        f_baseline,
        confusion_matrix_baseline_str,
        baseline_execution_time,
        baseline_params,
        baseline_image_shape,
        baseline_pixels,
    ]
    df.loc["chromosome"] = [
        f_chromosome,
        confusion_matrix_chromosome_str,
        chromosome_execution_time,
        chromosome_params,
        chromosome_image_shape,
        chromosome_pixels,
    ]

    # Calculate improvement of chromosome model over baseline
    df["F1_improvement"] = (df["F1"] - df["F1"].shift(1)) / df["F1"].shift(1)
    df["TIME_improvement"] = (df["TIME"] - df["TIME"].shift(1)) / df["TIME"].shift(1)
    df["PARAMS_improvement"] = (df["PARAMS"] - df["PARAMS"].shift(1)) / df["PARAMS"].shift(1)
    df["Image Size Improvement"] = (df["Image Size"] - df["Image Size"].shift(1)) / df["Image Size"].shift(1)


    # Save to csv
    now = datetime.now()
    now = now.strftime("%Y-%m-%d_%H-%M-%S")
    if save_name is not None:
        path = os.path.join(self.results_path, save_name + "_evaluation_" + now + ".csv")
        df.to_csv(path)

        self.logger.info(f"Evaluation saved to: {path} ")
    else:    
        path = os.path.join(self.results_path, "evaluation_" + now + ".csv")
        df.to_csv(path)
        self.logger.info(f"Evaluation saved to: {path} ")
    print("------------------")
    print("RESULTS")
    print(df)
    return df