Last active
August 11, 2020 04:02
-
-
Save c58/8401a34794c2d04c75eb to your computer and use it in GitHub Desktop.
Very fast basic genetic algorithm implementation on CYthon
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cdef class GeneticAlgorithm(object): | |
cdef: | |
public BasicGeneticFunction genetics | |
Population* population | |
cpdef run(self) | |
cpdef iteration(self) | |
cpdef stat(self) | |
cdef Population* _make_population(self) | |
cdef public struct Population: | |
Chromosome** chromos | |
int size | |
int _alloc_size | |
cdef public struct Chromosome: | |
int size | |
double* genotype | |
double score | |
int age | |
bint fitted | |
cdef class BasicGeneticFunction(object): | |
cdef: | |
public int pop_size | |
public int chromo_size | |
Population* bank | |
public int banking_age | |
public double banking_score | |
public double eq_distance | |
public iter_count | |
# Generally configurable | |
cdef Population* initial(self) | |
cdef bint maybe_stop(self, Population* pop) | |
cdef double random_chromo_value(self) | |
cdef double fitness(self, Chromosome* chromo) | |
cdef Population* withhelds(self, Population* pop) | |
cdef Population* parents(self, Population* pop) | |
cdef Chromosome* crossover(self, Chromosome* parent1, Chromosome* parent2) | |
cdef Population* mutations(self, Population* pop) | |
cdef Chromosome* mutate(self, Chromosome* ch) | |
cdef on_iteration(self, Population* pop) | |
# Public | |
cdef Chromosome* best_chromosome(self, Population* pop) | |
cdef double avg_age(self, Population* pop) | |
cdef double avg_score(self, Population* pop) | |
cdef double max_score(self, Population* pop) | |
cdef Chromosome* generate_chromosome(self) | |
# Private | |
cdef _do_fitness(self, Chromosome* chromo) | |
cdef Population* _withhelds_all(self, Population* pop) | |
cdef Population* _withhelds_best(self, Population* pop, int mut_size) | |
cdef Population* _parents_random(self, Population* pop, int mut_size) | |
cdef Population* _parents_proportional(self, Population* pop, int mut_size) | |
cdef Chromosome* _crossover_mix(self, Chromosome* parent1, Chromosome* parent2) | |
cdef Chromosome* _crossover_exchange(self, Chromosome* parent1, Chromosome* parent2) | |
cdef Population* _mutations_random(self, Population* pop, int mut_size) | |
cdef Chromosome* _mutate_by_element(self, Chromosome* ch) | |
cdef double _mutate_by_element_func(self, double value, Chromosome* orig_ch) | |
cdef Chromosome* _mutate_switch(self, Chromosome* ch) | |
cdef Population* selection(self, Population* pop) | |
cdef Population* _selection_threashold(self, Population* pop) | |
cdef bint _banking_filter(self, Chromosome* ch) | |
cdef double _distance(self, Chromosome* first, Chromosome* second) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
# cython: profile=False | |
# cython: wraparound=False | |
# cython: boundscheck=False | |
# cython: infer_types=True | |
# filename: fastgen.pyx | |
from cython.parallel import parallel, prange | |
from libc.stdlib cimport malloc, realloc, free, rand, qsort | |
cimport cython | |
from cython.view cimport array as cvarray | |
cimport libc.math as cmath | |
import math | |
import numpy as np | |
cimport numpy as np | |
import sys | |
cdef class GeneticAlgorithm(object): | |
def __init__(self, BasicGeneticFunction genetics): | |
self.genetics = genetics | |
self.population = self.genetics.initial() | |
cpdef run(self): | |
while not self.genetics.maybe_stop(self.population): | |
self.iteration() | |
cpdef iteration(self): | |
cdef Population* new_population = self._make_population() | |
_destroy_population(self.population, deep=True) | |
self.population = new_population | |
self.genetics.on_iteration(self.population) | |
cpdef stat(self): | |
return self.genetics.avg_age(self.population), self.genetics.avg_score(self.population), \ | |
self.genetics.max_score(self.population), \ | |
np.array(<double[:self.genetics.chromo_size]>self.genetics.best_chromosome(self.population).genotype) | |
cdef Population* _make_population(self): | |
cdef: | |
Population* raw_pop | |
Population* selected_pop | |
Population* proc_pop | |
Population* parents1 | |
Population* parents2 | |
int i = 0, j | |
# Create new population array | |
raw_pop = _new_population(self.genetics.pop_size * 4) | |
# Save some chromosomes from previous population | |
proc_pop = self.genetics.withhelds(self.population) | |
for j in range(proc_pop.size): | |
raw_pop.chromos[i] = _clone_chromosome(proc_pop.chromos[j]) | |
i += 1 | |
_destroy_population(proc_pop) | |
# Mutate some chromosomes | |
proc_pop = self.genetics.mutations(self.population) | |
for j in range(proc_pop.size): | |
raw_pop.chromos[i] = _clone_chromosome(self.genetics.mutate(proc_pop.chromos[j])) | |
i += 1 | |
_destroy_population(proc_pop) | |
# Crossover families | |
parents1 = self.genetics.parents(self.population) | |
parents2 = self.genetics.parents(self.population) | |
for j in range(parents1.size): | |
raw_pop.chromos[i] = _clone_chromosome(self.genetics.crossover(parents1.chromos[j], parents2.chromos[j])) | |
i += 1 | |
_destroy_population(parents1) | |
_destroy_population(parents2) | |
# Fitness | |
for j in range(i): | |
if not raw_pop.chromos[j].fitted: | |
self.genetics._do_fitness(raw_pop.chromos[j]) | |
# Selection | |
raw_pop.size = i | |
selected_pop = self.genetics.selection(raw_pop) | |
_destroy_population(raw_pop, True) | |
# Create random chromosomes | |
while selected_pop.size < self.genetics.pop_size: | |
_extend_population(selected_pop, self.genetics.generate_chromosome()) | |
return selected_pop | |
def __dealloc__(GeneticAlgorithm self): | |
_destroy_population(self.population, deep=True) | |
cdef class BasicGeneticFunction(object): | |
def __init__(self, int pop_size=30, int chromo_size=10, double eq_distance=0.0001, | |
int banking_age=1000, double banking_score=1000): | |
self.pop_size = pop_size | |
self.chromo_size = chromo_size | |
self.bank = _new_population(0) | |
self.eq_distance = eq_distance | |
self.banking_age = banking_age | |
self.banking_score = banking_score | |
self.iter_count = 0 | |
# Stop evolution condition | |
cdef bint maybe_stop(self, Population* pop): | |
return False | |
## | |
# STATISTICS | |
# | |
cdef Chromosome* best_chromosome(self, Population* pop): | |
cdef int i | |
cdef Chromosome* best = pop.chromos[0] | |
for i in range(pop.size): | |
if pop.chromos[i].score > best.score: | |
best = pop.chromos[i] | |
for i in range(self.bank.size): | |
if self.bank.chromos[i].score > best.score: | |
best = self.bank.chromos[i] | |
return best | |
cdef double avg_age(self, Population* pop): | |
cdef int i | |
return sum([pop.chromos[i].age for i in range(pop.size)]) / <double>pop.size | |
cdef double avg_score(self, Population* pop): | |
cdef int i | |
return sum([pop.chromos[i].score for i in range(pop.size)]) / <double>pop.size | |
cdef double max_score(self, Population* pop): | |
cdef int i | |
return max([pop.chromos[i].score for i in range(pop.size)]) | |
## | |
# INITIAL POPULATION | |
# | |
cdef Population* initial(self): | |
return self._initial_random() | |
# implementations | |
cdef Population* _initial_random(self): | |
cdef Population* population = _new_population(self.pop_size) | |
cdef int i | |
for i in range(population.size): | |
population.chromos[i] = self.generate_chromosome() | |
return population | |
cdef Population* _initial_with_value(self, double value): | |
cdef Population* population = _new_population(self.pop_size) | |
cdef int i, j | |
for i in range(population.size): | |
population.chromos[i] = _new_chromosome(self.chromo_size) | |
for j in range(self.chromo_size): | |
population.chromos[i].genotype[j] = value | |
return population | |
## | |
# RANDOM CHROMOSOME GENERATOR | |
# | |
cdef Chromosome* generate_chromosome(self): | |
cdef Chromosome* chromosome = _new_chromosome(self.chromo_size) | |
cdef i | |
for i in range(chromosome.size): | |
chromosome.genotype[i] = self.random_chromo_value() | |
self._do_fitness(chromosome) | |
return chromosome | |
cdef double random_chromo_value(self): | |
return np.random.uniform(0, 1) | |
## | |
# SCORE OF THE CHROMOSOME calculation | |
# | |
cdef double fitness(self, Chromosome* chromo): | |
return 1 | |
cdef void _do_fitness(self, Chromosome* chromo): | |
chromo.fitted = True | |
chromo.score = self.fitness(chromo) | |
## | |
# return BEST CHROMOSOMES for holding in the population | |
# | |
cdef Population* withhelds(self, Population* pop): | |
return self._withhelds_all(pop) | |
# implementations | |
cdef Population* _withhelds_all(self, Population* pop): | |
cdef i | |
cdef Population* new_pop = _new_population(pop.size) | |
for i in range(pop.size): | |
new_pop.chromos[i] = pop.chromos[i] | |
return new_pop | |
cdef Population* _withhelds_best(self, Population* pop, int mut_size): | |
qsort(&pop.chromos[0], pop.size, sizeof(Chromosome*), &_chromosome_compare) | |
cdef i | |
cdef Population* new_pop = _new_population(mut_size) | |
for i in range(mut_size): | |
new_pop.chromos[i] = pop.chromos[i] | |
return new_pop | |
## | |
# CROSSOVER | |
# | |
cdef Population* parents(self, Population* pop): | |
return self._parents_proportional(pop, 10) | |
cdef Chromosome* crossover(self, Chromosome* parent1, Chromosome* parent2): | |
return self._crossover_exchange(parent1, parent2) | |
# implementations | |
cdef Population* _parents_random(self, Population* pop, int mut_size): | |
cdef int i | |
cdef Population* muts = _new_population(mut_size) | |
for i in range(mut_size): | |
muts.chromos[i] = pop.chromos[np.random.randint(0, pop.size)] | |
return muts | |
cdef Population* _parents_proportional(self, Population* pop, int mut_size): | |
# Create result population | |
cdef Population* muts = _new_population(mut_size) | |
# Calc total score (weights) | |
cdef int i | |
cdef double total = 0 | |
for i in range(pop.size): | |
total += pop.chromos[i].score | |
# Alghoritm from: | |
# http://stackoverflow.com/questions/2140787/select-random-k-elements-from-a-list-whose-elements-have-weights | |
i = 0 | |
cdef double x, w = pop.chromos[0].score | |
cdef Chromosome* v = pop.chromos[0] | |
while mut_size: | |
x = total * (1 - np.random.random() ** (1.0 / mut_size)) | |
total -= x | |
while x > w: | |
x -= w | |
i += 1 | |
w = pop.chromos[i].score | |
v = pop.chromos[i] | |
w -= x | |
muts.chromos[mut_size - 1] = v | |
mut_size -= 1 | |
return muts | |
cdef Chromosome* _crossover_mix(self, Chromosome* parent1, Chromosome* parent2): | |
cdef int i | |
cdef Chromosome* child = _new_chromosome(parent1.size) | |
for i in range(child.size): | |
child.genotype[i] = parent1.genotype[i] if np.random.random() > 0.5 else parent2.genotype[i] | |
return child | |
cdef Chromosome* _crossover_exchange(self, Chromosome* parent1, Chromosome* parent2): | |
cdef int i | |
cdef int position = np.random.randint(0, parent1.size) | |
cdef Chromosome* child = _new_chromosome(parent1.size) | |
for i in range(child.size): | |
child.genotype[i] = parent1.genotype[i] if i > position else parent2.genotype[i] | |
return child | |
## | |
# MUTATION | |
# | |
cdef Population* mutations(self, Population* pop): | |
return self._mutations_random(pop, 10) | |
cdef Chromosome* mutate(self, Chromosome* ch): | |
return self._mutate_by_element(ch) | |
# implementations | |
cdef Population* _mutations_random(self, Population* pop, int mut_size): | |
cdef int i | |
cdef Population* muts = _new_population(mut_size) | |
for i in range(mut_size): | |
muts.chromos[i] = pop.chromos[np.random.randint(0, pop.size)] | |
return muts | |
cdef Chromosome* _mutate_by_element(self, Chromosome* ch): | |
cdef int position = np.random.randint(0, ch.size) | |
cdef Chromosome* mut = _clone_chromosome(ch, empty=True) | |
mut.genotype[position] = self._mutate_by_element_func(mut.genotype[position], ch) | |
return mut | |
cdef double _mutate_by_element_func(self, double value, Chromosome* orig_ch): | |
return self.random_chromo_value() | |
cdef Chromosome* _mutate_switch(self, Chromosome* ch): | |
cdef Chromosome* mut = _clone_chromosome(ch, empty=True) | |
cdef int position1 = np.random.randint(0, ch.size) | |
cdef int position2 = np.random.randint(0, ch.size) | |
mut.genotype[position1] = ch.genotype[position2] | |
mut.genotype[position2] = ch.genotype[position1] | |
return mut | |
## | |
# SELECTION | |
# MUST CLONE each CHROMOSOME to result population | |
# | |
cdef Population* selection(self, Population* pop): | |
return self._selection_threashold(pop) | |
# implementations | |
cdef Population* _selection_threashold(self, Population* pop): | |
qsort(&pop.chromos[0], pop.size, sizeof(Chromosome*), &_chromosome_compare) | |
cdef int i, j, k = 0 | |
cdef bint skip_ch = False | |
cdef Population* muts = _new_population(self.pop_size) | |
for i in range(pop.size): | |
# Filter by size of population | |
if k >= muts.size: | |
break | |
# Filter equal to banked chromosomes | |
if not self._banking_filter(pop.chromos[i]): | |
continue | |
# If filter passed, add chromosome to result | |
muts.chromos[k] = _clone_chromosome(pop.chromos[i]) | |
k += 1 | |
# Update size manually, because filters may decrease max size | |
# that generally greater then `self.pop_size` | |
muts.size = k | |
return muts | |
cdef bint _banking_filter(self, Chromosome* ch): | |
cdef int j | |
for j in range(self.bank.size): | |
if self._distance(self.bank.chromos[j], ch) <= self.eq_distance: | |
return False | |
if ch.age >= self.banking_age and ch.score >= self.banking_score: | |
_extend_population(self.bank, _clone_chromosome(ch)) | |
return True | |
cdef double _distance(self, Chromosome* first, Chromosome* second): | |
return 1 | |
# Iteration callback, invoke at the end of each iteration | |
cdef void on_iteration(self, Population* pop): | |
cdef int i | |
self.iter_count += 1 | |
for i in range(pop.size): | |
pop.chromos[i].age += 1 | |
print '-------------------------------------' | |
print 'Iteration: '+str(self.iter_count) | |
print 'Bank size: '+str(self.bank.size) | |
for i in range(self.bank.size): | |
print '\tCh '+str(i)+': '+str(self.bank.chromos[i].score)+' ('+str(self.bank.chromos[i].age)+')' | |
print 'Avg. age: '+str(self.avg_age(pop)) | |
print 'Avg. score: '+str(self.avg_score(pop)) | |
for i in range(pop.size): | |
print '\tCh '+str(i)+': '+str(pop.chromos[i].score)+' ('+str(pop.chromos[i].age)+')' | |
print '-------------------------------------' | |
## | |
## Helpers | |
## | |
cdef Population* _new_population(size): | |
cdef Population* population = <Population*> malloc(sizeof(Population)) | |
population._alloc_size = size * 2 | |
population.size = size | |
population.chromos = <Chromosome**> malloc(sizeof(Chromosome*) * population._alloc_size) | |
return population | |
cdef _extend_population(Population* pop, Chromosome* ch): | |
cdef Chromosome** ext_chromos | |
if pop.size + 1 >= pop._alloc_size: | |
pop._alloc_size += 10 | |
ext_chromos = <Chromosome**> realloc(pop.chromos, sizeof(Chromosome*)*pop._alloc_size) | |
if not ext_chromos: | |
raise MemoryError() | |
pop.chromos = ext_chromos | |
pop.chromos[pop.size] = ch | |
pop.size += 1 | |
cdef _destroy_population(Population* population, deep=False): | |
cdef int i | |
if deep: | |
for i in range(population.size): | |
free(population.chromos[i].genotype) | |
free(population.chromos[i]) | |
free(population.chromos) | |
free(population) # destroy array | |
cdef Chromosome* _new_chromosome(size): | |
cdef int i | |
cdef Chromosome* chromosome = <Chromosome*> malloc(sizeof(Chromosome)) | |
chromosome.fitted = False | |
chromosome.age = 0 | |
chromosome.size = size | |
chromosome.genotype = <double *> malloc( sizeof(double) * size) | |
return chromosome | |
cdef Chromosome* _clone_chromosome(Chromosome* ch, bint empty=False): | |
cdef Chromosome* chromosome = _new_chromosome(ch.size) | |
if not empty: | |
chromosome.fitted = ch.fitted | |
chromosome.age = ch.age | |
chromosome.score = ch.score | |
cdef int i | |
for i in range(ch.size): | |
chromosome.genotype[i] = ch.genotype[i] | |
return chromosome | |
cdef int _chromosome_compare(const void *a, const void *b) nogil: | |
cdef double v = (<Chromosome*>((<Chromosome**>a)[0])).score - (<Chromosome*>((<Chromosome**>b)[0])).score | |
return -1 if v > 0 else 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@suprafun MIT, feel free to do anything you want :) no guarantee that it works though, it was 6 years ago (damn, so many years!)