Created
February 26, 2017 18:45
-
-
Save danibrear/c1ec3a6301420bff5f2ade88d74c0b8a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from probability_solution import ( | |
make_power_plant_net, | |
set_probability, | |
get_alarm_prob, | |
get_gauge_prob, | |
get_temperature_prob, | |
get_marginal_temp_prob, | |
get_game_network, | |
calculate_posterior, | |
Gibbs_sampler, | |
MH_sampler, | |
compare_sampling, | |
sampling_question, | |
) | |
import unittest | |
from numpy import random | |
""" | |
Contains various local tests for Assignment 3. | |
""" | |
#Part 1a | |
def network_setup_test(power_plant): | |
"""Test that the power plant network has the proper number of nodes and edges.""" | |
nodes = power_plant.nodes | |
if(len(nodes)==5): | |
print('correct number of nodes') | |
total_links = sum([len(n.children) for n in nodes] + [len(n.parents) for n in nodes]) | |
if(total_links == 10): | |
print('correct number of edges between nodes') | |
return True | |
else: | |
print('incorrect number of edges between nodes') | |
else: | |
print('incorrect number of nodes') | |
return False | |
#Part 1b | |
def probability_setup_test(power_plant): | |
"""Test that all nodes in the power plant network have proper probability distributions. | |
Note that all nodes have to be named predictably for tests to run correctly.""" | |
print('checking probability distribution for Temperature node...') | |
# first test temperature distribution | |
T_node = power_plant.get_node_by_name('temperature') | |
if(T_node is not None): | |
T_dist = T_node.dist.table | |
if(len(T_dist) == 2): | |
print('correct temperature distribution size') | |
test_prob = T_dist[0] | |
if(int(test_prob*100) == 80): | |
print('correct temperature distribution') | |
else: | |
print('incorrect temperature distribution') | |
return False | |
else: | |
print('incorrect temperature distribution size') | |
return False | |
# then faulty gauge distribution | |
print('checking probability distribution for Faulty Gauge node...') | |
F_G_node = power_plant.get_node_by_name('faulty gauge') | |
if(F_G_node is not None): | |
F_G_dist = F_G_node.dist.table | |
rows, cols = F_G_dist.shape | |
if(rows == 2 and cols == 2): | |
print('correct faulty gauge distribution size') | |
test_prob1 = F_G_dist[0][1] | |
test_prob2 = F_G_dist[1][0] | |
if(int(test_prob1*100) == 5 and int(test_prob2*100) == 20): | |
print('correct faulty gauge distribution') | |
else: | |
print('incorrect faulty gauge distribution') | |
return False | |
else: | |
print('incorrect faulty gauge distribution size') | |
return False | |
# faulty alarm distribution | |
print('checking probability distribution for Faulty Alarm node...') | |
F_A_node = power_plant.get_node_by_name('faulty alarm') | |
if(F_A_node is not None): | |
F_A_dist = F_A_node.dist.table | |
if(len(F_A_dist) == 2): | |
print('correct faulty alarm distribution size') | |
test_prob = F_A_dist[0] | |
if(int(test_prob*100) == 85): | |
print('correct faulty alarm distribution') | |
else: | |
print('incorrect faulty alarm distribution') | |
return False | |
else: | |
print('incorrect faulty alarm distribution size') | |
return False | |
# gauge distribution | |
# can't test exact probabilities because | |
# order of probabilities is not guaranteed | |
print('checking only the probability distribution size for Gauge node...') | |
G_node = power_plant.get_node_by_name('gauge') | |
if(G_node is not None): | |
G_dist = G_node.dist.table | |
rows1, rows2, cols = G_dist.shape | |
if(rows1 == 2 and rows2 == 2 and cols == 2): | |
print('correct gauge distribution size') | |
else: | |
print('incorrect gauge distribution size') | |
return False | |
# alarm distribution | |
print('checking only the probability distribution size for Alarm node...') | |
A_node = power_plant.get_node_by_name('alarm') | |
if(A_node is not None): | |
A_dist = A_node.dist.table | |
rows1, rows2, cols = A_dist.shape | |
if(rows1 == 2 and rows2 == 2 and cols == 2): | |
print('correct alarm distribution size') | |
else: | |
print('incorrect alarm distribution size') | |
return False | |
return True | |
#Part 2a Test | |
def games_network_test(games_net): | |
"""Test that the games network has the proper number of nodes and edges.""" | |
print('checking the total number of edges and nodes in the network...') | |
nodes = games_net.nodes | |
if(len(nodes)==6): | |
print('correct number of nodes') | |
total_links = sum([len(n.children) for n in nodes] + [len(n.parents) for n in nodes]) | |
if(total_links == 12 ): | |
print('correct number of edges') | |
else: | |
print('incorrect number of edges') | |
return False | |
else: | |
print('incorrect number of nodes') | |
return False | |
# Now testing that all nodes in the games network have proper probability distributions. | |
# Note that all nodes have to be named predictably for tests to run correctly. | |
# First testing team distributions. | |
# You can check this for all teams i.e. A,B,C (by replacing the first line for 'B','C') | |
print ('checking probability distribution for Team A...') | |
A_node = games_net.get_node_by_name('A') | |
if(A_node is not None): | |
A_dist = A_node.dist.table | |
if(len(A_dist) == 4): | |
print('correct team distribution size') | |
test_prob = A_dist[0] | |
test_prob2 = A_dist[2] | |
if(int(test_prob*100) == 15 and int(test_prob2*100)==30): | |
print('correct team distribution') | |
else: | |
print('incorrect team distributions') | |
return False | |
else: | |
print('incorrect team distribution size') | |
return False | |
else: | |
print 'No node with the name A exists.' | |
return False | |
# Now testing match distributions. | |
# You can check this for all matches i.e. AvB,BvC,CvA (by replacing the first line) | |
print ('checking probability distribution for match AvB...') | |
AvB_node = games_net.get_node_by_name('AvB') | |
if(AvB_node is not None): | |
AvB_dist = AvB_node.dist.table | |
rows1, rows2, cols = AvB_dist.shape | |
if(rows1 == 4 and rows2 == 4 and cols == 3): | |
print('correct match distribution size') | |
flag1 = True | |
flag2 = True | |
flag3 = True | |
for i in range(0, 4): | |
for j in range(0,4): | |
x = AvB_dist[i,j,] | |
if i==j: | |
if x[0]!=x[1]: | |
flag1=False | |
if j>i: | |
if not(x[1]>x[0] and x[1]>x[2]): | |
flag2=False | |
if j<i: | |
if not (x[0]>x[1] and x[0]>x[2]): | |
flag3=False | |
if (flag1 and flag2 and flag3): | |
print('correct match distribution') | |
else: | |
if not flag1: | |
print('incorrect match distribution for equal skill levels') | |
return False | |
if ((not flag2) or (not flag3)): | |
print('incorrect match distribution: team with higher skill should have higher probability of winning') | |
return False | |
else: | |
print('incorrect match distribution size') | |
return False | |
else: | |
print 'No node with the name AvB exists.' | |
return False | |
return True | |
#Part 2b Test | |
def posterior_test(posterior): | |
if (abs(posterior[0]-0.25)<0.01 and abs(posterior[1]-0.42)<0.01 and abs(posterior[2]-0.31)<0.01): | |
print('correct posterior {}'.format(posterior)) | |
else: | |
print('incorrect posterior calculated {}, diff({},{},{})'.format(posterior, abs(posterior[0]-0.25), abs(posterior[1]-0.42), abs(posterior[2]-0.31))) | |
return False | |
return True | |
class HW3Tests(unittest.TestCase): | |
def test_network_setup_test(self): | |
power_plant = make_power_plant_net() | |
self.assertTrue(network_setup_test(power_plant)) | |
def test_probability_setup(self): | |
power_plant = make_power_plant_net() | |
set_probability(power_plant) | |
self.assertTrue(probability_setup_test(power_plant)) | |
def test_get_alarm_prob(self): | |
power_plant = make_power_plant_net() | |
set_probability(power_plant) | |
alarm_prob = get_alarm_prob(power_plant, True) | |
self.assertAlmostEqual(alarm_prob, 0.25, 3) | |
def test_get_gauge_prob(self): | |
power_plant = make_power_plant_net() | |
set_probability(power_plant) | |
gauge_prob = get_gauge_prob(power_plant, True) | |
self.assertAlmostEqual(gauge_prob, 0.14, 3) | |
def test_get_temperature_prob(self): | |
power_plant = make_power_plant_net() | |
set_probability(power_plant) | |
temp_prob = get_temperature_prob(power_plant, True) | |
print('temp probability given the alarm is going off and nothing is faulty: {}'.format(temp_prob)) | |
self.assertGreater(temp_prob, 0.2) | |
def test_get_marginal_temperature_prob(self): | |
power_plant = make_power_plant_net() | |
set_probability(power_plant) | |
temp_prob = get_marginal_temp_prob(power_plant, True) | |
self.assertAlmostEqual(temp_prob, 0.2, 4) | |
def test_games_network(self): | |
game_network = get_game_network() | |
self.assertTrue(games_network_test(game_network)) | |
def test_posterior(self): | |
game_network = get_game_network() | |
posterior = calculate_posterior(game_network) | |
self.assertTrue(posterior_test(posterior)) | |
def test_gibbs_sampler(self): | |
initial_state = [0, 0, 0, 0, 0, 0] | |
games_network = get_game_network() | |
random.seed(12345) | |
sample = Gibbs_sampler(games_network, initial_state) | |
diffs = [x != y for (x,y) in zip(sample, initial_state)] | |
self.assertEqual(sum(diffs), 1) | |
def test_gibbs_sampler_none(self): | |
games_network = get_game_network() | |
random.seed(12345) | |
sample = Gibbs_sampler(games_network, None) | |
def test_gibbs_sampler_blank(self): | |
initial_state = [0, 0, 0, 0, 0, 2] | |
games_network = get_game_network() | |
random.seed(12345) | |
sample = Gibbs_sampler(games_network, []) | |
def test_mh_sampler(self): | |
initial_state = [0, 0, 0, 0, 0, 2] | |
games_network = get_game_network() | |
random.seed(12345) | |
sample = MH_sampler(games_network, initial_state) | |
diffs = [x != y for (x, y) in zip(sample, initial_state)] | |
self.assertGreater(sum(diffs), 1) | |
def test_mh_sampler_none(self): | |
games_network = get_game_network() | |
random.seed(12345) | |
sample = MH_sampler(games_network, None) | |
def test_mh_sampler_none(self): | |
games_network = get_game_network() | |
random.seed(12345) | |
sample = MH_sampler(games_network, []) | |
def test_sampling_question(self): | |
print(sampling_question()) | |
def test_compare_sampling(self): | |
initial_state = [0, 0, 0, 0, 0, 2] | |
delta = 0.001 | |
games_network = get_game_network() | |
G_conv, MH_conv, G_count, MH_count, MH_rej_count = compare_sampling(games_network, initial_state, delta) | |
print(''' | |
Gibbs Convergence: {} | |
MH Convergence: {} | |
Gibbs Count: {} | |
MH Count: {} | |
MH Rejection Count:{}'''.format(G_conv, | |
MH_conv, | |
G_count, | |
MH_count, | |
MH_rej_count)) | |
print('MH posterior') | |
print(posterior_test(MH_conv)) | |
print('Gibbs posterior') | |
print(posterior_test(G_conv)) | |
def test_compare_sampling_None(self): | |
initial_state = None | |
delta = 0.001 | |
games_network = get_game_network() | |
G_conv, MH_conv, G_count, MH_count, MH_rej_count = compare_sampling(games_network, initial_state, delta) | |
print(''' | |
Gibbs Convergence: {} | |
MH Convergence: {} | |
Gibbs Count: {} | |
MH Count: {} | |
MH Rejection Count:{}'''.format(G_conv, | |
MH_conv, | |
G_count, | |
MH_count, | |
MH_rej_count)) | |
print('MH posterior') | |
print(posterior_test(MH_conv)) | |
print('Gibbs posterior') | |
print(posterior_test(G_conv)) | |
def test_compare_sampling_blank_state(self): | |
initial_state = [] | |
delta = 0.001 | |
games_network = get_game_network() | |
G_conv, MH_conv, G_count, MH_count, MH_rej_count = compare_sampling(games_network, initial_state, delta) | |
print(''' | |
Gibbs Convergence: {} | |
MH Convergence: {} | |
Gibbs Count: {} | |
MH Count: {} | |
MH Rejection Count:{}'''.format(G_conv, | |
MH_conv, | |
G_count, | |
MH_count, | |
MH_rej_count)) | |
print('MH posterior') | |
print(posterior_test(MH_conv)) | |
print('Gibbs posterior') | |
print(posterior_test(G_conv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment