Skip to content

Instantly share code, notes, and snippets.

@evanthebouncy
Created April 15, 2022 00:33
Show Gist options
  • Save evanthebouncy/7aa32c09c601e1107eea6bb7085a3f3f to your computer and use it in GitHub Desktop.
Save evanthebouncy/7aa32c09c601e1107eea6bb7085a3f3f to your computer and use it in GitHub Desktop.
plotting queue size upon successful termination as a function of temperature
import numpy as np
import random
from queue import PriorityQueue
# here is the task
# we want to construct a goal number, starting from a template of expression
# i.e. (E * E) + E = 11
# we can expand the expression E node further following the grammar
# E -> E + E | E * E | -3 | -2 | -1 | 1 | 2 | 3
# so you can end up with something like: (2 * 3) + (2 + 3) = 11
# and as you can see, it obeys the top-level template given by the user of (E * E) + E = 11
# in addition, we can have additional constraint like so:
# any time you have E1 + E2 or E1 * E2, E1 an E2 cannot be too different from each other
# so let's say 1/4 < |E1| / |E2| < 4, i.e. one is no more than 4 times as large as another in magenetude
# in addition, we can have a distribution that prefers diversity of + and *
# for instance, if we have current context of [+ + + + *] on the call stack
# we will prefer * 4 times as much as +
# so put it altogether, it simulates our setting quite well
# natural program can be other natural program or primitive
# there is an "end goal" of the goal number
# and intermediate constraints needing to be satisfied
# and that we have probability distribution based on a context of call stack
# so hopefully everything of importance is modeled in this simple example
# so here we go
# gives the grammar as S expressions (lisp like)
# instead of E * E I have a more fancy expression to simulate a more free-form np
E = "E"
GRAMMAR = [("+", E, E), ("*", ("+", E, E), ("+", E, 2)), -3, -2, -1, 1, 2, 3]
# propose
def propose(context):
context = str(context)
# count number of * in context, 1 more for prior
num_star = context.count("*") + 1
# count number of + in context, 1 more for prior
num_plus = context.count("+") + 1
count_total = num_star + num_plus
# this will cause a more even distribution of star and plus (hopefully)
relative_plus_ratio = num_star / count_total
relative_star_ratio = num_plus / count_total
# make the propose so that it always prefers numbers over * and +
operation_preference = 0.01
number_preference = 1 - operation_preference
relative_weights = [relative_plus_ratio * operation_preference,
relative_star_ratio * operation_preference] +\
[1/6 * number_preference for _ in range(6)]
# convert to numpy array
relative_weights = np.array(relative_weights)
# add a small noise to break ties
relative_weights += np.random.rand(len(relative_weights)) * 0.001
# normalize
relative_weights = np.array(relative_weights) / np.sum(relative_weights)
return relative_weights
def constraint(E1, E2):
try:
return 1/4 < abs(E1) / abs(E2) < 4
except ZeroDivisionError:
return False
# execution
def execute(program):
# check if program is a number
if isinstance(program, int):
return program
# check if program is E
elif program == E:
return "undefined"
else:
op, arg1, arg2 = program
arg1_eval = execute(arg1)
arg2_eval = execute(arg2)
if arg1_eval == "crash" or arg2_eval == "crash":
return "crash"
if arg1_eval == "undefined" or arg2_eval == "undefined":
return "undefined"
else:
# check constraint
if not constraint(arg1_eval, arg2_eval):
return "crash"
if op == "+":
return arg1_eval + arg2_eval
elif op == "*":
return arg1_eval * arg2_eval
def logsumexp(x):
c = x.max()
return c + np.log(np.sum(np.exp(x - c)))
# the search algorithm
def best_first_search(user_template, goal_number, beam_width = 1000, max_length = 100, temperature = 1.0):
# make a priority queue
pqueue = PriorityQueue()
# make a set of visited nodes
visited = set()
# put user template onto the priority queue
pqueue.put((0, user_template))
# push the user template to visited
visited.add(user_template)
# while queue is not empty
while not pqueue.empty():
# pop the most likely expr off the queue (queue stores priority in reversed order)
neg_logpr_score, expr = pqueue.get()
logpr = -1 * neg_logpr_score
# attempt to execute the expr
result = execute(expr)
# print ("current_logpr: ", str(logpr)[:10], "current_expr: ", expr, "evals_to: ", result)
if result == goal_number:
print ("it work!")
return expr, pqueue
elif result == "crash":
continue
elif len(str(expr)) > max_length:
continue
elif result == "undefined":
# locate first the unknown E within expr (this part is kinda janky)
# first turn expr into a string
expr_str = str(expr)
# locate the location of the substring E
E_loc = expr_str.find("E")
# print (expr_str, E_loc)
# a very naive version of "call stack context" of just the prefix string
context = expr_str[:E_loc]
# get the proposal distribution
relative_weights = propose(context)
# convert to log probability
log_relative_weights = np.log(relative_weights)
# use temperature T to adjust the log_relative_weights
log_relative_weights = log_relative_weights / temperature
# renormalize using logsumexp trick
log_relative_weights = log_relative_weights - logsumexp(log_relative_weights)
# select the top beam_width candidates sorted by log probability
top_candidates = np.argsort(log_relative_weights)[-beam_width:]
# recover the logprob of the top candidates
top_candidates_logpr = log_relative_weights[top_candidates]
# perform the replacement of E with each of the top candidates
for i in range(len(top_candidates)):
candidate = GRAMMAR[top_candidates[i]]
# make a new expr, kinda jank, but works
new_expr_str = expr_str[:E_loc-1] + str(candidate) + expr_str[E_loc + 2:]
new_expr = eval(new_expr_str)
# print (new_expr_str)
# print (execute(new_expr))
# check if new_expr is already visited
if new_expr not in visited:
# push the new expr onto the queue, with updated log probability
new_logpr = logpr + top_candidates_logpr[i]
# the pqueue score is ranked in opposite way, so we store the negative of the log probability
pqueue.put((-new_logpr, new_expr))
# push the new expr to visited
visited.add(new_expr)
def run_once(T):
# the user template
user_template = ("+", E, E)
print (execute(user_template))
print (propose("E * E + + + + E = 11"))
# the goal number
goal_number = 11
# the search algorithm
result, queue_elements = best_first_search(user_template, goal_number, max_length=100, temperature=T)
# print the result
print ("result is")
print (result)
print ("result has length")
print (len(str(result)))
print ("number of queue elements")
print (len(queue_elements.queue))
print ("average number of non-terminals")
# for each element in the queue
n_nonterminals = 0
for _, expr in queue_elements.queue:
# count the number of non-terminals
n_nonterminals += len(str(expr).split("E")) - 1
print (n_nonterminals / len(queue_elements.queue))
return T, len(queue_elements.queue), n_nonterminals / len(queue_elements.queue)
if __name__ == "__main__":
results = []
for _ in range(20):
for T in [0.001, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]:
results.append(run_once(T))
for x in results:
print (f"{x[0]}, {x[1]}, {x[2]}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment