Created
April 15, 2022 00:33
-
-
Save evanthebouncy/7aa32c09c601e1107eea6bb7085a3f3f to your computer and use it in GitHub Desktop.
plotting queue size upon successful termination as a function of temperature
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import random | |
from queue import PriorityQueue | |
# here is the task | |
# we want to construct a goal number, starting from a template of expression | |
# i.e. (E * E) + E = 11 | |
# we can expand the expression E node further following the grammar | |
# E -> E + E | E * E | -3 | -2 | -1 | 1 | 2 | 3 | |
# so you can end up with something like: (2 * 3) + (2 + 3) = 11 | |
# and as you can see, it obeys the top-level template given by the user of (E * E) + E = 11 | |
# in addition, we can have additional constraint like so: | |
# any time you have E1 + E2 or E1 * E2, E1 an E2 cannot be too different from each other | |
# so let's say 1/4 < |E1| / |E2| < 4, i.e. one is no more than 4 times as large as another in magenetude | |
# in addition, we can have a distribution that prefers diversity of + and * | |
# for instance, if we have current context of [+ + + + *] on the call stack | |
# we will prefer * 4 times as much as + | |
# so put it altogether, it simulates our setting quite well | |
# natural program can be other natural program or primitive | |
# there is an "end goal" of the goal number | |
# and intermediate constraints needing to be satisfied | |
# and that we have probability distribution based on a context of call stack | |
# so hopefully everything of importance is modeled in this simple example | |
# so here we go | |
# gives the grammar as S expressions (lisp like) | |
# instead of E * E I have a more fancy expression to simulate a more free-form np | |
E = "E" | |
GRAMMAR = [("+", E, E), ("*", ("+", E, E), ("+", E, 2)), -3, -2, -1, 1, 2, 3] | |
# propose | |
def propose(context): | |
context = str(context) | |
# count number of * in context, 1 more for prior | |
num_star = context.count("*") + 1 | |
# count number of + in context, 1 more for prior | |
num_plus = context.count("+") + 1 | |
count_total = num_star + num_plus | |
# this will cause a more even distribution of star and plus (hopefully) | |
relative_plus_ratio = num_star / count_total | |
relative_star_ratio = num_plus / count_total | |
# make the propose so that it always prefers numbers over * and + | |
operation_preference = 0.01 | |
number_preference = 1 - operation_preference | |
relative_weights = [relative_plus_ratio * operation_preference, | |
relative_star_ratio * operation_preference] +\ | |
[1/6 * number_preference for _ in range(6)] | |
# convert to numpy array | |
relative_weights = np.array(relative_weights) | |
# add a small noise to break ties | |
relative_weights += np.random.rand(len(relative_weights)) * 0.001 | |
# normalize | |
relative_weights = np.array(relative_weights) / np.sum(relative_weights) | |
return relative_weights | |
def constraint(E1, E2): | |
try: | |
return 1/4 < abs(E1) / abs(E2) < 4 | |
except ZeroDivisionError: | |
return False | |
# execution | |
def execute(program): | |
# check if program is a number | |
if isinstance(program, int): | |
return program | |
# check if program is E | |
elif program == E: | |
return "undefined" | |
else: | |
op, arg1, arg2 = program | |
arg1_eval = execute(arg1) | |
arg2_eval = execute(arg2) | |
if arg1_eval == "crash" or arg2_eval == "crash": | |
return "crash" | |
if arg1_eval == "undefined" or arg2_eval == "undefined": | |
return "undefined" | |
else: | |
# check constraint | |
if not constraint(arg1_eval, arg2_eval): | |
return "crash" | |
if op == "+": | |
return arg1_eval + arg2_eval | |
elif op == "*": | |
return arg1_eval * arg2_eval | |
def logsumexp(x): | |
c = x.max() | |
return c + np.log(np.sum(np.exp(x - c))) | |
# the search algorithm | |
def best_first_search(user_template, goal_number, beam_width = 1000, max_length = 100, temperature = 1.0): | |
# make a priority queue | |
pqueue = PriorityQueue() | |
# make a set of visited nodes | |
visited = set() | |
# put user template onto the priority queue | |
pqueue.put((0, user_template)) | |
# push the user template to visited | |
visited.add(user_template) | |
# while queue is not empty | |
while not pqueue.empty(): | |
# pop the most likely expr off the queue (queue stores priority in reversed order) | |
neg_logpr_score, expr = pqueue.get() | |
logpr = -1 * neg_logpr_score | |
# attempt to execute the expr | |
result = execute(expr) | |
# print ("current_logpr: ", str(logpr)[:10], "current_expr: ", expr, "evals_to: ", result) | |
if result == goal_number: | |
print ("it work!") | |
return expr, pqueue | |
elif result == "crash": | |
continue | |
elif len(str(expr)) > max_length: | |
continue | |
elif result == "undefined": | |
# locate first the unknown E within expr (this part is kinda janky) | |
# first turn expr into a string | |
expr_str = str(expr) | |
# locate the location of the substring E | |
E_loc = expr_str.find("E") | |
# print (expr_str, E_loc) | |
# a very naive version of "call stack context" of just the prefix string | |
context = expr_str[:E_loc] | |
# get the proposal distribution | |
relative_weights = propose(context) | |
# convert to log probability | |
log_relative_weights = np.log(relative_weights) | |
# use temperature T to adjust the log_relative_weights | |
log_relative_weights = log_relative_weights / temperature | |
# renormalize using logsumexp trick | |
log_relative_weights = log_relative_weights - logsumexp(log_relative_weights) | |
# select the top beam_width candidates sorted by log probability | |
top_candidates = np.argsort(log_relative_weights)[-beam_width:] | |
# recover the logprob of the top candidates | |
top_candidates_logpr = log_relative_weights[top_candidates] | |
# perform the replacement of E with each of the top candidates | |
for i in range(len(top_candidates)): | |
candidate = GRAMMAR[top_candidates[i]] | |
# make a new expr, kinda jank, but works | |
new_expr_str = expr_str[:E_loc-1] + str(candidate) + expr_str[E_loc + 2:] | |
new_expr = eval(new_expr_str) | |
# print (new_expr_str) | |
# print (execute(new_expr)) | |
# check if new_expr is already visited | |
if new_expr not in visited: | |
# push the new expr onto the queue, with updated log probability | |
new_logpr = logpr + top_candidates_logpr[i] | |
# the pqueue score is ranked in opposite way, so we store the negative of the log probability | |
pqueue.put((-new_logpr, new_expr)) | |
# push the new expr to visited | |
visited.add(new_expr) | |
def run_once(T): | |
# the user template | |
user_template = ("+", E, E) | |
print (execute(user_template)) | |
print (propose("E * E + + + + E = 11")) | |
# the goal number | |
goal_number = 11 | |
# the search algorithm | |
result, queue_elements = best_first_search(user_template, goal_number, max_length=100, temperature=T) | |
# print the result | |
print ("result is") | |
print (result) | |
print ("result has length") | |
print (len(str(result))) | |
print ("number of queue elements") | |
print (len(queue_elements.queue)) | |
print ("average number of non-terminals") | |
# for each element in the queue | |
n_nonterminals = 0 | |
for _, expr in queue_elements.queue: | |
# count the number of non-terminals | |
n_nonterminals += len(str(expr).split("E")) - 1 | |
print (n_nonterminals / len(queue_elements.queue)) | |
return T, len(queue_elements.queue), n_nonterminals / len(queue_elements.queue) | |
if __name__ == "__main__": | |
results = [] | |
for _ in range(20): | |
for T in [0.001, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]: | |
results.append(run_once(T)) | |
for x in results: | |
print (f"{x[0]}, {x[1]}, {x[2]}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment