Mix.install([
{:kino, "~> 0.12.0"},
{:kino_explorer, "~> 0.1.11"},
{:vega_lite, "~> 0.1.6"},
{:kino_vega_lite, "~> 0.1.10"},
{:statistics, "~> 0.6.3"}
])
defmodule GeneticString do
@target_phrase "The solution is yet to emerge"
def possible_characters() do
Enum.to_list(?a..?z) ++
Enum.to_list(?A..?Z) ++
[?_, ?\s]
end
# Generate a random chromosome (potential solution)
def random_chromosome() do
Enum.map(
1..String.length(@target_phrase),
fn _ -> possible_characters() |> Enum.random() end
)
|> List.to_string()
end
def fitness(chromosome, memo \\ %{}) do
case Map.get(memo, chromosome) do
nil ->
fitness = fitness_calc(chromosome)
updated_memo = Map.put(memo, chromosome, fitness)
{fitness, updated_memo}
fitness ->
{fitness, memo}
end
end
def fitness_calc(chromosome) do
chromosome
|> String.graphemes()
|> Enum.zip(String.graphemes(@target_phrase))
|> Enum.count(fn {char1, char2} -> char1 == char2 end)
end
def memoize_fitness([], memo), do: memo
def memoize_fitness([chromosome | tail], memo) do
{_, new_memo} = fitness(chromosome, memo)
memoize_fitness(tail, new_memo)
end
def select_parents([]), do: []
def select_parents(possible_parents) do
possible_parents
|> Enum.shuffle()
|> Enum.take(2)
end
def crossover(chromosome1, chromosome2) do
crossover_point = Enum.random(1..(String.length(chromosome1) - 1))
String.slice(chromosome1, 0, crossover_point) <>
String.slice(chromosome2, crossover_point, String.length(chromosome2) - crossover_point)
end
def mutation(chromosome) do
mutation_point = Enum.random(0..(String.length(chromosome) - 1))
String.slice(chromosome, 0, mutation_point) <>
List.to_string([possible_characters() |> Enum.random()]) <>
String.slice(
chromosome,
mutation_point + 1,
String.length(chromosome) - (mutation_point + 1)
)
end
def evolve(population_size, generation_limit \\ 100) do
population = Enum.map(1..population_size, fn _ -> random_chromosome() end)
elitism_rate = 0.01
evolve_mechanism(
%{i: 0, limit: generation_limit},
%{population: population, size: population_size},
%{rate: elitism_rate, count: floor(elitism_rate * population_size)},
%{}
)
end
def evolve_mechanism(generation, %{population: [best_match | _]}, _elitism, _fitness_memo)
when generation.limit == generation.i,
do: %{chromosome: best_match, generation: generation.i}
def evolve_mechanism(generation, population_data, elitism, fitness_memo) do
memoized_fitness = memoize_fitness(population_data.population, fitness_memo)
sorted_population =
population_data.population
|> Enum.sort_by(fn chromosome -> Map.get(memoized_fitness, chromosome) end, :desc)
elite_population =
sorted_population
|> Enum.take(elitism.count)
rest_population = Enum.drop(sorted_population, elitism.count) |> Enum.shuffle()
possible_parents =
case length(elite_population) do
x when x < 2 -> elite_population ++ Enum.take(sorted_population, 2)
_ -> elite_population
end
[parent1, parent2] = select_parents(possible_parents)
offspring = crossover(parent1, parent2) |> mutation()
# Keep an elite-num of chromosome, and drop less fortunate chromomse before appending the offspring
new_population =
(elite_population ++ rest_population)
|> Enum.drop(-1)
|> Kernel.++([offspring])
[elite | _] = new_population
fitness_score = Map.get(memoized_fitness, elite)
max_score = String.length(@target_phrase)
case fitness_score do
x when x == max_score ->
%{chromosome: elite, generation: generation.i}
_ ->
evolve_mechanism(
%{i: generation.i + 1, limit: generation.limit},
%{population: new_population, size: population_data.size},
adjust_elitism(elitism, generation, population_data.size),
# elitism,
memoized_fitness
)
end
end
def adjust_elitism(elitism, generation, population_size) do
progress = generation.i / generation.limit
rate =
case progress do
# Less of population considered for elite (aka, in this case, available for reproduction)
x when x < 0.2 ->
max(elitism.rate - 0.05, 0.01)
# In later generations, a larger proportion of population will be included in elite population
_ ->
min(elitism.rate + 0.01, 0.25)
end
%{rate: rate, count: floor(rate * population_size)}
end
end
With the GeneticString module defined, we will run n
number of evolutions to get an idea of how well the algorithm performs. We will make use of tasks
so multiple evolutions can be run at the same time. (But this is limited by how many processes your virtual machine can handle.)
After running the simulations (will take between 10 to 30 seconds), you can review its performance further down. The lower the median
for number of generations required to arrive at the solution, the better.
The distribution chart is also a helpful tool as it will give an idea if there is anything obviously wrong (later generations for example may be stuck with too limited pool of "genes" to produce new chromosomes and be heavily right-tailed).
n = 1000
tasks =
Enum.map(1..n, fn _ ->
Task.async(fn -> GeneticString.evolve(50, 20000) end)
end)
# Wait for a maximum of 300 seconds.
results = Task.await_many(tasks, 300_000)
Running the code below will produce a density chart. Try making the @target_phrase
longer for instance and you will see how the chart will move further to the right. If you modify the adjust_elitism
's generation-progress case
you might see a "hump" being shifted in the chart as well.
VegaLite.new(width: 400, height: 300)
|> VegaLite.data_from_values(results)
|> VegaLite.transform(density: "generation")
|> VegaLite.mark(:area)
|> VegaLite.encode_field(:x, "value", type: :quantitative, title: "Number of generations")
|> VegaLite.encode_field(:y, "density", type: :quantitative)
require Explorer.DataFrame
results
|> Explorer.DataFrame.new(lazy: true)
|> Explorer.DataFrame.summarise(
generation_median: median(generation),
generation_mean: mean(generation),
generation_min: min(generation),
generation_max: max(generation),
generation_standard_deviation: standard_deviation(generation)
)
# Here we will just prepare the data for making a QQ-plot
# A QQ-plot is useful when we want to determine if our result follows a Normal distribution
sorted_series =
results
|> Explorer.DataFrame.new(lazy: true)
|> Explorer.DataFrame.sort_by(asc: generation)
|> Explorer.DataFrame.collect()
values_list =
sorted_series
|> Explorer.DataFrame.select(["generation"])
|> Explorer.DataFrame.to_series()
|> Map.get("generation")
|> Explorer.Series.to_list()
sample_size = length(values_list)
# Define an anonymous function to calculate quantiles
calculate_theoretical_quantiles = fn data ->
mean = Statistics.mean(data)
stdev = Statistics.stdev(data)
probabilities = Enum.to_list(1..(length(data) + 1)) |> Enum.map(&(&1 / (length(data) + 1)))
quantiles =
probabilities
|> Enum.drop(-1)
|> Enum.map(fn prob ->
theoretic = Statistics.Distributions.Normal.ppf(mean, stdev).(prob)
(theoretic - mean) / stdev
end)
quantiles
end
theoretical_quantiles = calculate_theoretical_quantiles.(values_list)
# Match each data-point with the theoretical point
qq =
Enum.zip(values_list, theoretical_quantiles)
|> Enum.map(fn {data, theoretic_std_dev} ->
%{data: data, theoretic: theoretic_std_dev}
end)
# Render the QQ-plot
VegaLite.new()
|> VegaLite.data_from_values(qq, only: ["theoretic", "data"])
|> VegaLite.mark(:point)
|> VegaLite.encode_field(:x, "theoretic", type: :quantitative)
|> VegaLite.encode_field(:y, "data", type: :quantitative)