Module see.GeneticSearch

Using the specified search space and fitness function defined in 'Algorithm'. This runs the genetic algorithm over that space. Best individuals are stored in the hall of fame (hof).

Expand source code
"""Using the specified search space and fitness function defined in 'Algorithm'. 
This runs
the genetic algorithm over that space. Best individuals are stored in the hall of fame (hof).
"""

import random
import copy

import json
import logging
from shutil import copyfile
from pathlib import Path

import deap
from deap import base
from deap import tools
from deap import creator
from scoop import futures

from see import base_classes


# TODO Change algoirthm and algo_instance to be more clear.  use
# consistant naming.


def twoPointCopy(np1, np2, seed=False):
    """Execute a crossover between two numpy arrays of the same length."""
    if seed:
        random.seed(0)
    assert len(np1) == len(np2)
    size = len(np1)
    point1 = random.randint(1, size)
    point2 = random.randint(1, size - 1)
    if point2 >= point1:
        point2 += 1
    else:  # Swap the two points
        point1, point2 = point2, point1
    np1[point1:point2], np2[point1:point2] = np2[point1:point2].copy(
    ), np1[point1:point2].copy()
    return np1, np2


def skimageCrossRandom(np1, np2, seed=False):
    """Execute a crossover.
    
    Between two arrays (np1 and np2) picking a random
    amount of indexes to change between the two.
    """
    if seed == True:
        random.seed(0)
    # DO: Only change values associated with algorithm
    assert len(np1) == len(np2)
    # The number of places that we'll cross
    crosses = random.randrange(len(np1))
    # We pick that many crossing points
    indexes = random.sample(range(0, len(np1)), crosses)
    # And at those crossing points, we switch the parameters

    for i in indexes:
        np1[i], np2[i] = np2[i], np1[i]

    return np1, np2


def mutate(copy_child, pos_vals, flip_prob=0.5, seed=False):
    """Change a few of the parameters of the weighting a random number against the flip_prob.

    Keyword arguments:
    copy_child -- the individual to mutate.
    pos_vals -- list of lists where each list are the possible
                values for that particular parameter.
    flip_prob -- how likely it is that we will mutate each value.
                It is computed seperately for each value.

    Outputs:
    child -- New, possibly mutated, individual.

    """
    # Just because we chose to mutate a value doesn't mean we mutate
    # Every aspect of the value
    child = copy.deepcopy(copy_child)

    # Not every algorithm is associated with every value
    # Let's first see if we change the algorithm
    rand_val = random.random()
    if rand_val < flip_prob:
        # Let's mutate
        child[0] = random.choice(pos_vals[0])
    # Now let's get the indexes (parameters) related to that value
    #switcher = AlgoHelp().algoIndexes()
    #indexes = switcher.get(child[0])

    for index in range(len(pos_vals)):
        rand_val = random.random()
        if rand_val < flip_prob:
            #             # Then we mutate said value
            #             if index == 22:
            #                 # Do some special
            #                 my_x = random.choice(pos_vals[22])
            #                 my_y = random.choice(pos_vals[23])
            #                 my_z = random.choice(pos_vals[24])
            #                 child[index] = (my_x, my_y, my_z)
            #                 continue
            child[index] = random.choice(pos_vals[index])
    return child


# DO: Make a toolbox from a list of individuals
# DO: Save a population as a list of indivudals (with fitness functions?)

# TODO: change algo_instance to an algorithm class.
def makeToolbox(pop_size, algo_constructor):
    """Make a genetic algorithm toolbox using DEAP. The toolbox uses premade functions
     for crossover, mutation, evaluation and fitness.

    Keyword arguments:
    pop_size -- The size of our population, or how many individuals we have

    """
    # Minimizing fitness function
    creator.create("FitnessMin", base.Fitness, weights=(-0.000001,))
    creator.create("Individual", list, fitness=creator.FitnessMin)

    # The functions that the GA knows
    toolbox = base.Toolbox()

    # Genetic functions
    toolbox.register("mate", skimageCrossRandom)  # crossover
    # toolbox.register("mutate", mutate)  # Mutation
    toolbox.register("mutate", base_classes.mutateAlgo)  # Mutation
    toolbox.register("evaluate", algo_constructor.runAlgo)  # Fitness
    toolbox.register("select", tools.selTournament, tournsize=5)  # Selection
    toolbox.register("map", futures.map)  # So that we can use scoop

    # DO: May want to later do a different selection process

    # We choose the parameters, for the most part, random
    algo_instance = algo_constructor()
    params = algo_instance.params

    for key in params.pkeys:
        toolbox.register(key, random.choice, params.ranges[key])

    func_seq = []
    for key in params.pkeys:
        func_seq.append(getattr(toolbox, key))

    # Here we populate our individual with all of the parameters
    toolbox.register("individual", tools.initCycle,
                     creator.Individual, func_seq, n=1)

    # And we make our population
    toolbox.register("population", tools.initRepeat,
                     list, toolbox.individual, n=pop_size)

    return toolbox


def initIndividual(icls, content):
    """Create a new individual."""
    logging.getLogger().info(f"In initIndividual={content}")
    return icls(content)


def initPopulation(pcls, ind_init, filename):
    """Create a population by initializing our specified number of individuals."""
    with open(filename, "r") as pop_file:
        contents = json.load(pop_file)
    return pcls(ind_init(c) for c in contents)


##### FILE I/O #####
# TODO Think about moving this to another file?

# TODO make it so we can read from json, pickle or text.
def write_algo_vector(fpop_file, outstring):
    """Write Text output"""
    print(f"Writing in {fpop_file}")
    with open(fpop_file, 'a') as myfile:
        myfile.write(f'{outstring}\n')


def read_algo_vector(fpop_file):
    """Read Text output"""
    print(f"Reading in {fpop_file}")
    inlist = []
    with open(fpop_file, 'r') as myfile:
        for line in myfile:
            inlist.append(eval(line))
    return inlist


class Evolver(object):
    """Perform the genetic algorithm by initializing a population and evolving it over a
     specified number of generations to find the optimal algorithm and parameters for the problem.

    Functions:
    newpopulation -- Initialize a new population.
    writepop -- Records our population in the file "filename".
    readpop -- Reads in existing population from "filename".
    popfitness -- Calculates the fitness values for our population.
    mutate -- Performs mutation and crossover on population.
    nextgen -- Generates the next generation of our population.
    run -- Runs the genetic algorithm.

    """

#     AllVals = []
# #     my_p=param_space
#     for key in my_p.pkeys:
#         AllVals.append(my_p.ranges[key])

    def __init__(self, algo_constructor, data, pop_size=10):
        """Set default values for the variables.

        Keyword arguments:
        img -- The original training image
        mask -- The ground truth segmentation mask for the img
        pop_size -- Integer value denoting size of our population,
            or how many individuals there are (default 10)

        """
        # Build Population based on size
        self.data = data
        self.algo_constructor = algo_constructor
        self.tool = makeToolbox(pop_size, algo_constructor)
        self.hof = deap.tools.HallOfFame(10)
        self.best_avgs = []
        self.gen = 0
        self.cxpb, self.mutpb, self.flip_prob = 0.9, 0.9, 0.9
        
    #TODO add some checking to make sure lists are the right size and type
    #TODO think about how we want to add in fitness to these?
    def copy_individual(self,fromlist):
        """Return individual from list of individuals"""
        new_individual = self.tool.individual()
        for index in range(len(new_individual)):
            new_individual[index] = fromlist[index]
        return new_individual

    # TODO add some checking (see next comment)
    def copy_pop_list(self, tpop):
        """Copy population list to new list"""
        new_tpop = []
        for individual in tpop:
            new_tpop.append(self.copy_individual(individual))
        return new_tpop

    def newpopulation(self):
        """Initialize a new population."""
        return self.tool.population()

    def writepop(self, tpop, filename='test.json'):
        """Record the population in the file "filename".

        Keyword arguments:
        tpop -- The population to be recorded.
        filename -- string denoting file in which to record
            the population. (default 'test.json')

        """
        logging.getLogger().info(f"Writting population to {filename}")
        with open(filename, 'w') as outfile:
            json.dump(tpop, outfile)

    def readpop(self, filename='test.json'):
        """Read in existing population from "filename"."""
        filen = Path(filename)

        if filen.suffix == ".txt":
            list_of_lists = read_algo_vector(filen)
            tpop = self.copy_pop_list(list_of_lists)
            return tpop

        logging.getLogger().info(f"Reading population from {filename}")
        self.tool.register("population_read", initPopulation,
                           list, creator.Individual, filename)

        self.tool.register("individual_guess",
                           initIndividual, creator.Individual)

        self.tool.register("population_guess", initPopulation,
                           list, self.tool.individual_guess, "my_guess.json")

        return self.tool.population_read()

    def popfitness(self, tpop):
        """Calculate the fitness values for the population.
        
        Also,log general statistics about these
        values. Uses hall of fame (hof) to keep track of top 10 individuals.

        Keyword arguments:
        tpop -- current population

        Outputs:
        extract_fits -- Fitness values for our population
        tpop -- current population

        """
        # make copies of self.data
        data_references = [copy.deepcopy(self.data)
                           for i in range(0, len(tpop))]
        algos = [self.algo_constructor(paramlist=list(ind)) for ind in tpop]

        # Map the evaluation command to reference data and then to population
        # list
        outdata = map(self.tool.evaluate, algos, data_references)

        # Loop though outputs and add them to ind.fitness so we have a complete
        # record.
        for ind, data in zip(tpop, outdata):
            print(f"fitness={data.fitness}\n")
            ind.fitness.values = [data.fitness]
        extract_fits = [ind.fitness.values[0] for ind in tpop]

        self.hof.update(tpop)

        #Algo = AlgorithmSpace(AlgoParams)

        # Evaluating the new population
        leng = len(tpop)
        mean = sum(extract_fits) / leng
        self.best_avgs.append(mean)
        sum1 = sum(i * i for i in extract_fits)
        stdev = abs(sum1 / leng - mean ** 2) ** 0.5
        logging.getLogger().info(f"Generation: {self.gen}")
        logging.getLogger().info(f" Min: {min(extract_fits)}")
        logging.getLogger().info(f" Max: {max(extract_fits)}")
        logging.getLogger().info(f" Avg: {mean}")
        logging.getLogger().info(f" Std: {stdev}")
        logging.getLogger().info(f" Size: {leng}")
        #logging.info(" Time: ", time.time() - initTime)
        logging.getLogger().info(f"Best Fitness: {self.hof[0].fitness.values}")
        logging.getLogger().info(f"{self.hof[0]}")
        # Did we improve the population?
        # past_pop = tpop
        # past_min = min(extract_fits)
        # past_mean = mean

        self.gen += self.gen

        return extract_fits, tpop

    def mutate(self, tpop, keep_prob=0.1, mutate_prob=0.4):
        """Return new population with mutated individuals. Perform both mutation and crossover.

        Keyword arguments:
        tpop -- current population

        Output:
        final -- new population with mutated individuals.

        """
        # Calculate next population

        # TODO: There is an error here. We need to make sure the best hof is
        # included?

        my_sz = len(tpop)  # Length of current population
        top = min(10, max(1, round(keep_prob * my_sz)))
        top = min(top, len(self.hof))
        var = max(1, round(mutate_prob * my_sz))
        var = min(var, len(self.hof))
        ran = my_sz - top - var

#         print(f"pop[0:{top}:{var}:{ran}]")
#         print(f"pop[0:{top}:{top+var}:{my_sz}]")

#         offspring = self.tool.select(tpop, var)
#         offspring = list(map(self.tool.clone, offspring))  # original code

        offspring = copy.deepcopy(list(self.hof))

        # crossover
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            # Do we crossover?
            if random.random() < self.cxpb:
                self.tool.mate(child1, child2)
                # The parents may be okay values so we should keep them
                # in the set
                del child1.fitness.values
                del child2.fitness.values

        # mutation
        for mutant in offspring:
            if random.random() < self.mutpb:
                self.tool.mutate(self.algo_constructor, mutant, self.flip_prob)
                del mutant.fitness.values

        # new
        #population = self.newpopulation()
        pop = self.tool.population()

        final = pop[0:ran]
        #print(f"pop size should be {len(final)}")
        final += self.hof[0:top]
        #print(f"pop size should be {len(final)}")
        final += offspring[0:var]
        #print(f"pop size should be {len(final)}")

        # print(f"pop[0:{top}:{var}:{ran}]")
        #print(f"pop size should be {len(final)}")

        # Replacing the old population
        return final

    def nextgen(self, tpop):
        """Generate the next generation of the population.

        Keyword arguments:
        tpop -- current population

        """
        _, tpop = self.popfitness(tpop)
        return self.mutate(tpop)

    def run(
            self,
            ngen=10,
            population=None,
            startfile=None,
            checkpoint=None,
            cp_freq=1):
        """Run the genetic algorithm, updating the population over ngen number of generations.
        
        Keywork arguments:
        ngen -- number of generations to run the genetic algorithm.
        startfile -- File containing existing population (default None)
        checkpoint -- File containing existing checkpoint (default None)

        Output:
        population -- Resulting population after ngen generations.
        """
        if startfile:
            try:
                print(f"Reading in {startfile}")
                population = self.readpop(startfile)
            except FileNotFoundError:
                print("WARNING: Start file not found")
            except BaseException:
                raise

        if not population:
            print(f"Initializing a new random population")
            population = self.newpopulation()
            if checkpoint:
                self.writepop(population, filename=f"{checkpoint}")
        else:
            print(f"Using existing population")

        for cur_g in range(0, ngen+1):
            print(f"Generation {cur_g}/{ngen} of population size {len(population)}")

            _, population = self.popfitness(population)

            bestsofar = self.hof[0]

            # Create a new instance from the current algorithm
            # seg = self.algo_constructor(bestsofar)
            # self.data = seg.pipe(self.data)
            fitness = bestsofar.fitness.values[0]
            print(f"#BEST [{fitness},  {bestsofar}]")

            if checkpoint and cur_g % cp_freq == 0:
                print(f"Writing Checkpoint file - {checkpoint}")
                copyfile(f"{checkpoint}", f"{checkpoint}.prev")
                self.writepop(population, filename=f"{checkpoint}")
                for cur_p in range(len(population)):
                    logging.getLogger().info(population[cur_p])

            if cur_g < ngen+1:
                if bestsofar.fitness.values[0] >= 0.95:
                    print("Bestsofar not good enough (>=0.95) restarting population")
                    population = self.newpopulation()
                  # if the best fitness value is at or above the
                  # threshold of 0.95, discard the entire current
                  # population and randomly select a new population
                  # for the next generation
                  # note: setting keep_prob = 0 and mutate_prob = 1
                  # as mutate arguments
                  # should have same result as self.new_population()
                else:
                    print("Mutating Population")
                    population = self.mutate(population)
                  # if the best fitness value is below this threshold,
                  # proceed as normal, mutating the current population
                  # to get the next generation

        if checkpoint:
            print(f"Writing Checkpoint file - {checkpoint}")
            copyfile(f"{checkpoint}", f"{checkpoint}.prev")
            self.writepop(population, filename=f"{checkpoint}")
            for cur_p in range(len(population)):
                logging.getLogger().info(population[cur_p])
        return population

Functions

def initIndividual(icls, content)

Create a new individual.

Expand source code
def initIndividual(icls, content):
    """Create a new individual."""
    logging.getLogger().info(f"In initIndividual={content}")
    return icls(content)
def initPopulation(pcls, ind_init, filename)

Create a population by initializing our specified number of individuals.

Expand source code
def initPopulation(pcls, ind_init, filename):
    """Create a population by initializing our specified number of individuals."""
    with open(filename, "r") as pop_file:
        contents = json.load(pop_file)
    return pcls(ind_init(c) for c in contents)
def makeToolbox(pop_size, algo_constructor)

Make a genetic algorithm toolbox using DEAP. The toolbox uses premade functions for crossover, mutation, evaluation and fitness.

Keyword arguments: pop_size – The size of our population, or how many individuals we have

Expand source code
def makeToolbox(pop_size, algo_constructor):
    """Make a genetic algorithm toolbox using DEAP. The toolbox uses premade functions
     for crossover, mutation, evaluation and fitness.

    Keyword arguments:
    pop_size -- The size of our population, or how many individuals we have

    """
    # Minimizing fitness function
    creator.create("FitnessMin", base.Fitness, weights=(-0.000001,))
    creator.create("Individual", list, fitness=creator.FitnessMin)

    # The functions that the GA knows
    toolbox = base.Toolbox()

    # Genetic functions
    toolbox.register("mate", skimageCrossRandom)  # crossover
    # toolbox.register("mutate", mutate)  # Mutation
    toolbox.register("mutate", base_classes.mutateAlgo)  # Mutation
    toolbox.register("evaluate", algo_constructor.runAlgo)  # Fitness
    toolbox.register("select", tools.selTournament, tournsize=5)  # Selection
    toolbox.register("map", futures.map)  # So that we can use scoop

    # DO: May want to later do a different selection process

    # We choose the parameters, for the most part, random
    algo_instance = algo_constructor()
    params = algo_instance.params

    for key in params.pkeys:
        toolbox.register(key, random.choice, params.ranges[key])

    func_seq = []
    for key in params.pkeys:
        func_seq.append(getattr(toolbox, key))

    # Here we populate our individual with all of the parameters
    toolbox.register("individual", tools.initCycle,
                     creator.Individual, func_seq, n=1)

    # And we make our population
    toolbox.register("population", tools.initRepeat,
                     list, toolbox.individual, n=pop_size)

    return toolbox
def mutate(copy_child, pos_vals, flip_prob=0.5, seed=False)

Change a few of the parameters of the weighting a random number against the flip_prob.

Keyword arguments: copy_child – the individual to mutate. pos_vals – list of lists where each list are the possible values for that particular parameter. flip_prob – how likely it is that we will mutate each value. It is computed seperately for each value.

Outputs: child – New, possibly mutated, individual.

Expand source code
def mutate(copy_child, pos_vals, flip_prob=0.5, seed=False):
    """Change a few of the parameters of the weighting a random number against the flip_prob.

    Keyword arguments:
    copy_child -- the individual to mutate.
    pos_vals -- list of lists where each list are the possible
                values for that particular parameter.
    flip_prob -- how likely it is that we will mutate each value.
                It is computed seperately for each value.

    Outputs:
    child -- New, possibly mutated, individual.

    """
    # Just because we chose to mutate a value doesn't mean we mutate
    # Every aspect of the value
    child = copy.deepcopy(copy_child)

    # Not every algorithm is associated with every value
    # Let's first see if we change the algorithm
    rand_val = random.random()
    if rand_val < flip_prob:
        # Let's mutate
        child[0] = random.choice(pos_vals[0])
    # Now let's get the indexes (parameters) related to that value
    #switcher = AlgoHelp().algoIndexes()
    #indexes = switcher.get(child[0])

    for index in range(len(pos_vals)):
        rand_val = random.random()
        if rand_val < flip_prob:
            #             # Then we mutate said value
            #             if index == 22:
            #                 # Do some special
            #                 my_x = random.choice(pos_vals[22])
            #                 my_y = random.choice(pos_vals[23])
            #                 my_z = random.choice(pos_vals[24])
            #                 child[index] = (my_x, my_y, my_z)
            #                 continue
            child[index] = random.choice(pos_vals[index])
    return child
def read_algo_vector(fpop_file)

Read Text output

Expand source code
def read_algo_vector(fpop_file):
    """Read Text output"""
    print(f"Reading in {fpop_file}")
    inlist = []
    with open(fpop_file, 'r') as myfile:
        for line in myfile:
            inlist.append(eval(line))
    return inlist
def skimageCrossRandom(np1, np2, seed=False)

Execute a crossover.

Between two arrays (np1 and np2) picking a random amount of indexes to change between the two.

Expand source code
def skimageCrossRandom(np1, np2, seed=False):
    """Execute a crossover.
    
    Between two arrays (np1 and np2) picking a random
    amount of indexes to change between the two.
    """
    if seed == True:
        random.seed(0)
    # DO: Only change values associated with algorithm
    assert len(np1) == len(np2)
    # The number of places that we'll cross
    crosses = random.randrange(len(np1))
    # We pick that many crossing points
    indexes = random.sample(range(0, len(np1)), crosses)
    # And at those crossing points, we switch the parameters

    for i in indexes:
        np1[i], np2[i] = np2[i], np1[i]

    return np1, np2
def twoPointCopy(np1, np2, seed=False)

Execute a crossover between two numpy arrays of the same length.

Expand source code
def twoPointCopy(np1, np2, seed=False):
    """Execute a crossover between two numpy arrays of the same length."""
    if seed:
        random.seed(0)
    assert len(np1) == len(np2)
    size = len(np1)
    point1 = random.randint(1, size)
    point2 = random.randint(1, size - 1)
    if point2 >= point1:
        point2 += 1
    else:  # Swap the two points
        point1, point2 = point2, point1
    np1[point1:point2], np2[point1:point2] = np2[point1:point2].copy(
    ), np1[point1:point2].copy()
    return np1, np2
def write_algo_vector(fpop_file, outstring)

Write Text output

Expand source code
def write_algo_vector(fpop_file, outstring):
    """Write Text output"""
    print(f"Writing in {fpop_file}")
    with open(fpop_file, 'a') as myfile:
        myfile.write(f'{outstring}\n')

Classes

class Evolver (algo_constructor, data, pop_size=10)

Perform the genetic algorithm by initializing a population and evolving it over a specified number of generations to find the optimal algorithm and parameters for the problem.

Functions: newpopulation – Initialize a new population. writepop – Records our population in the file "filename". readpop – Reads in existing population from "filename". popfitness – Calculates the fitness values for our population. mutate – Performs mutation and crossover on population. nextgen – Generates the next generation of our population. run – Runs the genetic algorithm.

Set default values for the variables.

Keyword arguments: img – The original training image mask – The ground truth segmentation mask for the img pop_size – Integer value denoting size of our population, or how many individuals there are (default 10)

Expand source code
class Evolver(object):
    """Perform the genetic algorithm by initializing a population and evolving it over a
     specified number of generations to find the optimal algorithm and parameters for the problem.

    Functions:
    newpopulation -- Initialize a new population.
    writepop -- Records our population in the file "filename".
    readpop -- Reads in existing population from "filename".
    popfitness -- Calculates the fitness values for our population.
    mutate -- Performs mutation and crossover on population.
    nextgen -- Generates the next generation of our population.
    run -- Runs the genetic algorithm.

    """

#     AllVals = []
# #     my_p=param_space
#     for key in my_p.pkeys:
#         AllVals.append(my_p.ranges[key])

    def __init__(self, algo_constructor, data, pop_size=10):
        """Set default values for the variables.

        Keyword arguments:
        img -- The original training image
        mask -- The ground truth segmentation mask for the img
        pop_size -- Integer value denoting size of our population,
            or how many individuals there are (default 10)

        """
        # Build Population based on size
        self.data = data
        self.algo_constructor = algo_constructor
        self.tool = makeToolbox(pop_size, algo_constructor)
        self.hof = deap.tools.HallOfFame(10)
        self.best_avgs = []
        self.gen = 0
        self.cxpb, self.mutpb, self.flip_prob = 0.9, 0.9, 0.9
        
    #TODO add some checking to make sure lists are the right size and type
    #TODO think about how we want to add in fitness to these?
    def copy_individual(self,fromlist):
        """Return individual from list of individuals"""
        new_individual = self.tool.individual()
        for index in range(len(new_individual)):
            new_individual[index] = fromlist[index]
        return new_individual

    # TODO add some checking (see next comment)
    def copy_pop_list(self, tpop):
        """Copy population list to new list"""
        new_tpop = []
        for individual in tpop:
            new_tpop.append(self.copy_individual(individual))
        return new_tpop

    def newpopulation(self):
        """Initialize a new population."""
        return self.tool.population()

    def writepop(self, tpop, filename='test.json'):
        """Record the population in the file "filename".

        Keyword arguments:
        tpop -- The population to be recorded.
        filename -- string denoting file in which to record
            the population. (default 'test.json')

        """
        logging.getLogger().info(f"Writting population to {filename}")
        with open(filename, 'w') as outfile:
            json.dump(tpop, outfile)

    def readpop(self, filename='test.json'):
        """Read in existing population from "filename"."""
        filen = Path(filename)

        if filen.suffix == ".txt":
            list_of_lists = read_algo_vector(filen)
            tpop = self.copy_pop_list(list_of_lists)
            return tpop

        logging.getLogger().info(f"Reading population from {filename}")
        self.tool.register("population_read", initPopulation,
                           list, creator.Individual, filename)

        self.tool.register("individual_guess",
                           initIndividual, creator.Individual)

        self.tool.register("population_guess", initPopulation,
                           list, self.tool.individual_guess, "my_guess.json")

        return self.tool.population_read()

    def popfitness(self, tpop):
        """Calculate the fitness values for the population.
        
        Also,log general statistics about these
        values. Uses hall of fame (hof) to keep track of top 10 individuals.

        Keyword arguments:
        tpop -- current population

        Outputs:
        extract_fits -- Fitness values for our population
        tpop -- current population

        """
        # make copies of self.data
        data_references = [copy.deepcopy(self.data)
                           for i in range(0, len(tpop))]
        algos = [self.algo_constructor(paramlist=list(ind)) for ind in tpop]

        # Map the evaluation command to reference data and then to population
        # list
        outdata = map(self.tool.evaluate, algos, data_references)

        # Loop though outputs and add them to ind.fitness so we have a complete
        # record.
        for ind, data in zip(tpop, outdata):
            print(f"fitness={data.fitness}\n")
            ind.fitness.values = [data.fitness]
        extract_fits = [ind.fitness.values[0] for ind in tpop]

        self.hof.update(tpop)

        #Algo = AlgorithmSpace(AlgoParams)

        # Evaluating the new population
        leng = len(tpop)
        mean = sum(extract_fits) / leng
        self.best_avgs.append(mean)
        sum1 = sum(i * i for i in extract_fits)
        stdev = abs(sum1 / leng - mean ** 2) ** 0.5
        logging.getLogger().info(f"Generation: {self.gen}")
        logging.getLogger().info(f" Min: {min(extract_fits)}")
        logging.getLogger().info(f" Max: {max(extract_fits)}")
        logging.getLogger().info(f" Avg: {mean}")
        logging.getLogger().info(f" Std: {stdev}")
        logging.getLogger().info(f" Size: {leng}")
        #logging.info(" Time: ", time.time() - initTime)
        logging.getLogger().info(f"Best Fitness: {self.hof[0].fitness.values}")
        logging.getLogger().info(f"{self.hof[0]}")
        # Did we improve the population?
        # past_pop = tpop
        # past_min = min(extract_fits)
        # past_mean = mean

        self.gen += self.gen

        return extract_fits, tpop

    def mutate(self, tpop, keep_prob=0.1, mutate_prob=0.4):
        """Return new population with mutated individuals. Perform both mutation and crossover.

        Keyword arguments:
        tpop -- current population

        Output:
        final -- new population with mutated individuals.

        """
        # Calculate next population

        # TODO: There is an error here. We need to make sure the best hof is
        # included?

        my_sz = len(tpop)  # Length of current population
        top = min(10, max(1, round(keep_prob * my_sz)))
        top = min(top, len(self.hof))
        var = max(1, round(mutate_prob * my_sz))
        var = min(var, len(self.hof))
        ran = my_sz - top - var

#         print(f"pop[0:{top}:{var}:{ran}]")
#         print(f"pop[0:{top}:{top+var}:{my_sz}]")

#         offspring = self.tool.select(tpop, var)
#         offspring = list(map(self.tool.clone, offspring))  # original code

        offspring = copy.deepcopy(list(self.hof))

        # crossover
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            # Do we crossover?
            if random.random() < self.cxpb:
                self.tool.mate(child1, child2)
                # The parents may be okay values so we should keep them
                # in the set
                del child1.fitness.values
                del child2.fitness.values

        # mutation
        for mutant in offspring:
            if random.random() < self.mutpb:
                self.tool.mutate(self.algo_constructor, mutant, self.flip_prob)
                del mutant.fitness.values

        # new
        #population = self.newpopulation()
        pop = self.tool.population()

        final = pop[0:ran]
        #print(f"pop size should be {len(final)}")
        final += self.hof[0:top]
        #print(f"pop size should be {len(final)}")
        final += offspring[0:var]
        #print(f"pop size should be {len(final)}")

        # print(f"pop[0:{top}:{var}:{ran}]")
        #print(f"pop size should be {len(final)}")

        # Replacing the old population
        return final

    def nextgen(self, tpop):
        """Generate the next generation of the population.

        Keyword arguments:
        tpop -- current population

        """
        _, tpop = self.popfitness(tpop)
        return self.mutate(tpop)

    def run(
            self,
            ngen=10,
            population=None,
            startfile=None,
            checkpoint=None,
            cp_freq=1):
        """Run the genetic algorithm, updating the population over ngen number of generations.
        
        Keywork arguments:
        ngen -- number of generations to run the genetic algorithm.
        startfile -- File containing existing population (default None)
        checkpoint -- File containing existing checkpoint (default None)

        Output:
        population -- Resulting population after ngen generations.
        """
        if startfile:
            try:
                print(f"Reading in {startfile}")
                population = self.readpop(startfile)
            except FileNotFoundError:
                print("WARNING: Start file not found")
            except BaseException:
                raise

        if not population:
            print(f"Initializing a new random population")
            population = self.newpopulation()
            if checkpoint:
                self.writepop(population, filename=f"{checkpoint}")
        else:
            print(f"Using existing population")

        for cur_g in range(0, ngen+1):
            print(f"Generation {cur_g}/{ngen} of population size {len(population)}")

            _, population = self.popfitness(population)

            bestsofar = self.hof[0]

            # Create a new instance from the current algorithm
            # seg = self.algo_constructor(bestsofar)
            # self.data = seg.pipe(self.data)
            fitness = bestsofar.fitness.values[0]
            print(f"#BEST [{fitness},  {bestsofar}]")

            if checkpoint and cur_g % cp_freq == 0:
                print(f"Writing Checkpoint file - {checkpoint}")
                copyfile(f"{checkpoint}", f"{checkpoint}.prev")
                self.writepop(population, filename=f"{checkpoint}")
                for cur_p in range(len(population)):
                    logging.getLogger().info(population[cur_p])

            if cur_g < ngen+1:
                if bestsofar.fitness.values[0] >= 0.95:
                    print("Bestsofar not good enough (>=0.95) restarting population")
                    population = self.newpopulation()
                  # if the best fitness value is at or above the
                  # threshold of 0.95, discard the entire current
                  # population and randomly select a new population
                  # for the next generation
                  # note: setting keep_prob = 0 and mutate_prob = 1
                  # as mutate arguments
                  # should have same result as self.new_population()
                else:
                    print("Mutating Population")
                    population = self.mutate(population)
                  # if the best fitness value is below this threshold,
                  # proceed as normal, mutating the current population
                  # to get the next generation

        if checkpoint:
            print(f"Writing Checkpoint file - {checkpoint}")
            copyfile(f"{checkpoint}", f"{checkpoint}.prev")
            self.writepop(population, filename=f"{checkpoint}")
            for cur_p in range(len(population)):
                logging.getLogger().info(population[cur_p])
        return population

Methods

def copy_individual(self, fromlist)

Return individual from list of individuals

Expand source code
def copy_individual(self,fromlist):
    """Return individual from list of individuals"""
    new_individual = self.tool.individual()
    for index in range(len(new_individual)):
        new_individual[index] = fromlist[index]
    return new_individual
def copy_pop_list(self, tpop)

Copy population list to new list

Expand source code
def copy_pop_list(self, tpop):
    """Copy population list to new list"""
    new_tpop = []
    for individual in tpop:
        new_tpop.append(self.copy_individual(individual))
    return new_tpop
def mutate(self, tpop, keep_prob=0.1, mutate_prob=0.4)

Return new population with mutated individuals. Perform both mutation and crossover.

Keyword arguments: tpop – current population

Output: final – new population with mutated individuals.

Expand source code
    def mutate(self, tpop, keep_prob=0.1, mutate_prob=0.4):
        """Return new population with mutated individuals. Perform both mutation and crossover.

        Keyword arguments:
        tpop -- current population

        Output:
        final -- new population with mutated individuals.

        """
        # Calculate next population

        # TODO: There is an error here. We need to make sure the best hof is
        # included?

        my_sz = len(tpop)  # Length of current population
        top = min(10, max(1, round(keep_prob * my_sz)))
        top = min(top, len(self.hof))
        var = max(1, round(mutate_prob * my_sz))
        var = min(var, len(self.hof))
        ran = my_sz - top - var

#         print(f"pop[0:{top}:{var}:{ran}]")
#         print(f"pop[0:{top}:{top+var}:{my_sz}]")

#         offspring = self.tool.select(tpop, var)
#         offspring = list(map(self.tool.clone, offspring))  # original code

        offspring = copy.deepcopy(list(self.hof))

        # crossover
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            # Do we crossover?
            if random.random() < self.cxpb:
                self.tool.mate(child1, child2)
                # The parents may be okay values so we should keep them
                # in the set
                del child1.fitness.values
                del child2.fitness.values

        # mutation
        for mutant in offspring:
            if random.random() < self.mutpb:
                self.tool.mutate(self.algo_constructor, mutant, self.flip_prob)
                del mutant.fitness.values

        # new
        #population = self.newpopulation()
        pop = self.tool.population()

        final = pop[0:ran]
        #print(f"pop size should be {len(final)}")
        final += self.hof[0:top]
        #print(f"pop size should be {len(final)}")
        final += offspring[0:var]
        #print(f"pop size should be {len(final)}")

        # print(f"pop[0:{top}:{var}:{ran}]")
        #print(f"pop size should be {len(final)}")

        # Replacing the old population
        return final
def newpopulation(self)

Initialize a new population.

Expand source code
def newpopulation(self):
    """Initialize a new population."""
    return self.tool.population()
def nextgen(self, tpop)

Generate the next generation of the population.

Keyword arguments: tpop – current population

Expand source code
def nextgen(self, tpop):
    """Generate the next generation of the population.

    Keyword arguments:
    tpop -- current population

    """
    _, tpop = self.popfitness(tpop)
    return self.mutate(tpop)
def popfitness(self, tpop)

Calculate the fitness values for the population.

Also,log general statistics about these values. Uses hall of fame (hof) to keep track of top 10 individuals.

Keyword arguments: tpop – current population

Outputs: extract_fits – Fitness values for our population tpop – current population

Expand source code
def popfitness(self, tpop):
    """Calculate the fitness values for the population.
    
    Also,log general statistics about these
    values. Uses hall of fame (hof) to keep track of top 10 individuals.

    Keyword arguments:
    tpop -- current population

    Outputs:
    extract_fits -- Fitness values for our population
    tpop -- current population

    """
    # make copies of self.data
    data_references = [copy.deepcopy(self.data)
                       for i in range(0, len(tpop))]
    algos = [self.algo_constructor(paramlist=list(ind)) for ind in tpop]

    # Map the evaluation command to reference data and then to population
    # list
    outdata = map(self.tool.evaluate, algos, data_references)

    # Loop though outputs and add them to ind.fitness so we have a complete
    # record.
    for ind, data in zip(tpop, outdata):
        print(f"fitness={data.fitness}\n")
        ind.fitness.values = [data.fitness]
    extract_fits = [ind.fitness.values[0] for ind in tpop]

    self.hof.update(tpop)

    #Algo = AlgorithmSpace(AlgoParams)

    # Evaluating the new population
    leng = len(tpop)
    mean = sum(extract_fits) / leng
    self.best_avgs.append(mean)
    sum1 = sum(i * i for i in extract_fits)
    stdev = abs(sum1 / leng - mean ** 2) ** 0.5
    logging.getLogger().info(f"Generation: {self.gen}")
    logging.getLogger().info(f" Min: {min(extract_fits)}")
    logging.getLogger().info(f" Max: {max(extract_fits)}")
    logging.getLogger().info(f" Avg: {mean}")
    logging.getLogger().info(f" Std: {stdev}")
    logging.getLogger().info(f" Size: {leng}")
    #logging.info(" Time: ", time.time() - initTime)
    logging.getLogger().info(f"Best Fitness: {self.hof[0].fitness.values}")
    logging.getLogger().info(f"{self.hof[0]}")
    # Did we improve the population?
    # past_pop = tpop
    # past_min = min(extract_fits)
    # past_mean = mean

    self.gen += self.gen

    return extract_fits, tpop
def readpop(self, filename='test.json')

Read in existing population from "filename".

Expand source code
def readpop(self, filename='test.json'):
    """Read in existing population from "filename"."""
    filen = Path(filename)

    if filen.suffix == ".txt":
        list_of_lists = read_algo_vector(filen)
        tpop = self.copy_pop_list(list_of_lists)
        return tpop

    logging.getLogger().info(f"Reading population from {filename}")
    self.tool.register("population_read", initPopulation,
                       list, creator.Individual, filename)

    self.tool.register("individual_guess",
                       initIndividual, creator.Individual)

    self.tool.register("population_guess", initPopulation,
                       list, self.tool.individual_guess, "my_guess.json")

    return self.tool.population_read()
def run(self, ngen=10, population=None, startfile=None, checkpoint=None, cp_freq=1)

Run the genetic algorithm, updating the population over ngen number of generations.

Keywork arguments: ngen – number of generations to run the genetic algorithm. startfile – File containing existing population (default None) checkpoint – File containing existing checkpoint (default None)

Output: population – Resulting population after ngen generations.

Expand source code
def run(
        self,
        ngen=10,
        population=None,
        startfile=None,
        checkpoint=None,
        cp_freq=1):
    """Run the genetic algorithm, updating the population over ngen number of generations.
    
    Keywork arguments:
    ngen -- number of generations to run the genetic algorithm.
    startfile -- File containing existing population (default None)
    checkpoint -- File containing existing checkpoint (default None)

    Output:
    population -- Resulting population after ngen generations.
    """
    if startfile:
        try:
            print(f"Reading in {startfile}")
            population = self.readpop(startfile)
        except FileNotFoundError:
            print("WARNING: Start file not found")
        except BaseException:
            raise

    if not population:
        print(f"Initializing a new random population")
        population = self.newpopulation()
        if checkpoint:
            self.writepop(population, filename=f"{checkpoint}")
    else:
        print(f"Using existing population")

    for cur_g in range(0, ngen+1):
        print(f"Generation {cur_g}/{ngen} of population size {len(population)}")

        _, population = self.popfitness(population)

        bestsofar = self.hof[0]

        # Create a new instance from the current algorithm
        # seg = self.algo_constructor(bestsofar)
        # self.data = seg.pipe(self.data)
        fitness = bestsofar.fitness.values[0]
        print(f"#BEST [{fitness},  {bestsofar}]")

        if checkpoint and cur_g % cp_freq == 0:
            print(f"Writing Checkpoint file - {checkpoint}")
            copyfile(f"{checkpoint}", f"{checkpoint}.prev")
            self.writepop(population, filename=f"{checkpoint}")
            for cur_p in range(len(population)):
                logging.getLogger().info(population[cur_p])

        if cur_g < ngen+1:
            if bestsofar.fitness.values[0] >= 0.95:
                print("Bestsofar not good enough (>=0.95) restarting population")
                population = self.newpopulation()
              # if the best fitness value is at or above the
              # threshold of 0.95, discard the entire current
              # population and randomly select a new population
              # for the next generation
              # note: setting keep_prob = 0 and mutate_prob = 1
              # as mutate arguments
              # should have same result as self.new_population()
            else:
                print("Mutating Population")
                population = self.mutate(population)
              # if the best fitness value is below this threshold,
              # proceed as normal, mutating the current population
              # to get the next generation

    if checkpoint:
        print(f"Writing Checkpoint file - {checkpoint}")
        copyfile(f"{checkpoint}", f"{checkpoint}.prev")
        self.writepop(population, filename=f"{checkpoint}")
        for cur_p in range(len(population)):
            logging.getLogger().info(population[cur_p])
    return population
def writepop(self, tpop, filename='test.json')

Record the population in the file "filename".

Keyword arguments: tpop – The population to be recorded. filename – string denoting file in which to record the population. (default 'test.json')

Expand source code
def writepop(self, tpop, filename='test.json'):
    """Record the population in the file "filename".

    Keyword arguments:
    tpop -- The population to be recorded.
    filename -- string denoting file in which to record
        the population. (default 'test.json')

    """
    logging.getLogger().info(f"Writting population to {filename}")
    with open(filename, 'w') as outfile:
        json.dump(tpop, outfile)