Created
November 5, 2018 16:45
-
-
Save liamzebedee/1f5c56d656ceba808a2e99e78e9f6160 to your computer and use it in GitHub Desktop.
Evidence based subjective logic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import math | |
import string | |
import operator | |
from optparse import OptionParser | |
def load_data(filename): | |
# This procedure loads relevant data into table T. | |
# Table[i][1] denotes source of evidence | |
# Table[i][2] denotes subject of evidence | |
# Table[i][3] denotes (signed) amount of evidence. (positive amount means positive evidence and negative amount means negative evidence.) | |
T = [] | |
separator = " " | |
file = open(filename, "r") | |
file.readline() | |
# Assumed is that data set consists of 5 entries each row, where first and last entries are irrelevant. | |
for line in file: | |
tmp = string.split(line, separator) | |
T.append( [int(i) for i in tmp[1:-1] ]) | |
return T | |
def get_max(data): | |
return [max(i) for i in data] | |
def get_theta(data): | |
return max([data[i][2] for i in range(len(data))]) | |
def get_evidence(data): | |
# This procedure computes amount of positive and negative evidences only | |
print "get evidence..." | |
y = get_max([[data[i][j] for i in range(len(data))] for j in range(2)]) | |
Flow = [[[0,0] for i in range(y[0]+1)] for j in range(y[1]+1)] | |
for row in data: | |
val = row[-1] | |
if val >= 0: | |
Flow[row[0]][row[1]][0] += val | |
else: | |
Flow[row[0]][row[1]][1] += -1*val | |
print "finished" | |
return Flow | |
def get_opinion(data, constant): | |
# first extract evidences from dataset | |
print "get evidence..." | |
y = get_max([[data[i][j] for i in range(len(data))] for j in range(2)]) | |
Flow = [[[0,0,constant] for i in range(y[0]+1)] for j in range(y[1]+1)] | |
for row in data: | |
val = row[-1] | |
if val >= 0: | |
# add positive evidence | |
Flow[row[0]][row[1]][0] += val | |
else: | |
# add negative evidence | |
Flow[row[0]][row[1]][1] += -1*val | |
print "finished" | |
# Given evidence compute Evidence Based Opinions using uncertainty constant | |
print "extract opinions..." | |
for i in range(len(Flow)): | |
for j in range(len(Flow)): | |
el = Flow[i][j] | |
ev_sum_el = el[0]+el[1]+el[2] | |
Flow[i][j][0] = float(el[0])/ev_sum_el | |
Flow[i][j][1] = float(el[1])/ev_sum_el | |
Flow[i][j][2] = float(el[2])/ev_sum_el | |
print "finished" | |
return Flow | |
def test_flow(flow): | |
# Test for entries in flow, that have both positiev and negative evidence | |
print "test for both neg and pos evidence" | |
for row in flow: | |
for el in row: | |
if (el[0]!=0) and (el[1]!=0): | |
print el | |
print "test finished" | |
def otimes(ox, oy): | |
# compute SL opinion x otimes opinion y | |
return [ox[0]*oy[0], ox[0]*oy[1], ox[1]+ox[2]+ox[0]*oy[2]] | |
def oplus(ox, oy): | |
# compute SL opinion x oplus opinion y | |
div = ox[2]+oy[2]-ox[2]*oy[2] | |
return map(lambda x: x/div, [ox[2]*oy[0]+oy[2]*ox[0], ox[2]*oy[1]+oy[2]*ox[1], ox[2]*oy[2]]) | |
def scalartimes(scalar, ox): | |
# comput scalar mul | |
div = scalar*(ox[0]+ox[1])+ox[2] | |
return map(lambda x: x/div, [scalar*ox[0], scalar*ox[1], ox[2]]) | |
def boxtimes(ox, oy, func): | |
#compute opinion x boxtimes opinion y given g(x) = func | |
scalar = func(ox) | |
return scalartimes(scalar, oy) | |
def odot(ox, oy, theta, constant): | |
scalar = (float(constant)/theta) * (ox[0]/ox[2]) | |
return scalartimes(scalar, oy) | |
def matrixtimes(A, B, plus=operator.add, times=operator.mul): | |
# compute A.B using own functions for plus and mul | |
# Default plus = + | |
# Default mul = * | |
return [[reduce(plus, map(times, A[i], [B[k][j] for k in range(len(B[0]))])) for j in range(len(B))] for i in range(len(A))] | |
def matrixsquare(A, plus=operator.add, times=operator.mul): | |
# compute A^2 using own functions for plus and mul | |
# Default plus = + | |
# Default mul = * | |
return [[reduce(plus, map(times, A[i], [A[j][k] for k in range(len(A))])) for i in range(len(A))] for j in range(len(A))] | |
def matrixplus(A, B, plus=operator.add): | |
return [map(plus, A[i],B[i]) for i in range(len(A))] | |
def matrixgeo(A, pow, plus=operator.add, times=operator.mul): | |
# compute A^pow using own functions for plus and mul | |
# Default plus = + | |
# Default mul = * | |
R = A | |
for i in range(pow-1): | |
R = matrixplus(A, matrixtimes(R,A, plus, times), plus) | |
return R | |
def matrixgeonew(A, pow, plus=operator.add, times=operator.mul): | |
# compute A^pow using own functions for plus and mul | |
# Default plus = + | |
# Default mul = * | |
# Set diagonal to zero. | |
R = A | |
for i in range(pow-1): | |
R = matrixplus(A, matrixtimes(R,A, plus, times), plus) | |
for j in range(len(R)): | |
R[j][j] = [0.0, 0.0, 1.0] | |
return R | |
def matrixpow(A, pow, plus=operator.add, times=operator.mul): | |
# compute A^pow using own functions for plus and mul | |
# Default plus = + | |
# Default mul = * | |
R = A | |
for i in range(pow-1): | |
R = matrixtimes(R,A, plus, times) | |
return R | |
def matrixpownew(A, pow, plus=operator.add, times=operator.mul): | |
# compute A^pow using own functions for plus and mul | |
# Default plus = + | |
# Default mul = * | |
R = A | |
for i in range(pow-1): | |
R = matrixtimes(R,A, plus, times) | |
for j in range(len(R)): | |
R[j][j] = [0.0, 0.0, 1.0] | |
return R | |
def func_belief(ox): | |
# This function takes g(x) = x_b | |
return ox[0] | |
def func_belief_sqrt(ox): | |
# This function takes g(x) = sqrt(x_b) | |
return math.sqrt(ox[0]) | |
def distance(m1, m2): | |
res = 0.0 | |
for i in range(len(m1)): | |
for j in range(len(m1)): | |
for k in range(3): | |
res += abs(m1[i][j][k] - m2[i][j][k]) | |
return res | |
def matrixgeoconvtest(A, threshold, plus=operator.add, times=operator.mul): | |
# Default plus = + | |
# Default mul = * | |
# Set diagonal to zero. | |
t = threshold + 1.0 | |
R = A | |
count = 0 | |
while (t > threshold and count < 100): | |
count = count + 1 | |
Y = R | |
R = matrixplus(A, matrixtimes(R,A, plus, times), plus) | |
for j in range(len(R)): | |
R[j][j] = [0.0, 0.0, 1.0] | |
t = distance(Y , R) | |
return R, count, t | |
def extract_evidence(M, constant): | |
return [[[constant*(M[i][j][0]/M[i][j][2]), constant*(M[i][j][1]/M[i][j][2])] for j in range(len(M[i]))] for i in range(len(M))] | |
def write_to_file(M, openfile): | |
openfile.write("{") | |
for i in range(len(M)): | |
openfile.write("{") | |
for j in range(len(M[i])): | |
openfile.write("{") | |
for k in range(len(M[i][j])-1): | |
openfile.write(str(M[i][j][k])) | |
openfile.write(",") | |
openfile.write(str(M[i][j][-1])) | |
openfile.write("},") | |
openfile.write("},\n") | |
openfile.write("}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"V1" "V2" "V3" "V4" | |
"1" 0 1 75 22688 | |
"2" 0 1 -75 22688 | |
"3" 0 2 575 22688 | |
"4" 0 2 -35 22688 | |
"1" 0 3 5 22688 | |
"2" 0 3 -25 22688 | |
"3" 0 4 32 22688 | |
"4" 0 4 -38 22688 | |
"5" 1 0 1000 22688 | |
"6" 1 0 -500 22688 | |
"7" 1 2 225 22688 | |
"8" 1 2 -25 22688 | |
"5" 1 3 100 22688 | |
"6" 1 3 -236 22688 | |
"7" 1 4 2225 22688 | |
"8" 1 4 -325 22688 | |
"9" 2 0 -375 22688 | |
"10" 2 0 -275 22688 | |
"11" 2 1 -575 22688 | |
"11" 2 1 1575 22688 | |
"9" 2 3 -75 22688 | |
"10" 2 3 275 22688 | |
"11" 2 4 -57 22688 | |
"11" 2 4 157 22688 | |
"5" 3 0 100 22688 | |
"6" 3 0 -50 22688 | |
"7" 3 2 22 22688 | |
"8" 3 2 -2 22688 | |
"5" 3 1 10 22688 | |
"6" 3 1 -26 22688 | |
"7" 3 4 225 22688 | |
"8" 3 4 -35 22688 | |
"9" 4 0 -75 22688 | |
"10" 4 0 275 22688 | |
"11" 4 1 -55 22688 | |
"11" 4 1 575 22688 | |
"9" 4 3 -85 22688 | |
"10" 4 3 25 22688 | |
"11" 4 2 -57 22688 | |
"11" 4 2 17 22688 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import string | |
import operator | |
from optparse import OptionParser | |
from EBSL_lib import load_data, get_evidence, get_opinion, get_theta, test_flow, write_to_file, extract_evidence, matrixgeoconvtest | |
from EBSL_lib import otimes, oplus, odot, scalartimes, boxtimes, matrixtimes, matrixplus, matrixsquare, matrixpow, matrixgeo, matrixgeonew, func_belief, func_belief_sqrt | |
def roundopinion(ox, digits): | |
rnd = lambda x: round(x*(10**digits))/(10**digits) | |
return map(rnd, ox) | |
def distance(m1, m2): | |
res = 0.0 | |
for i in range(len(m1)): | |
for j in range(len(m1)): | |
for k in range(3): | |
res += abs(m1[i][j][k] - m2[i][j][k]) | |
return res | |
parser = OptionParser() | |
parser.add_option("-o", "--outputfile", | |
help="Filename for output data.") | |
parser.add_option("-d", "--data", | |
help="Filename for data set.") | |
parser.add_option("-c", "--constant", type="int", | |
help="Constant used in Uncertainty Calculation.") | |
parser.add_option("-m", "--mode", type="int", | |
help="Mode used for mul. mode = 0 --> boxtimes, mode=1--> odot.") | |
parser.add_option("-t", "--threshold", type="float", | |
help="Convergence threshold") | |
parser.set_defaults(outputfile = "output.txt", data = "exampleSet.txt", constant=2, mode=0, threshold=1.0) | |
options, args = parser.parse_args() | |
outputfile = open(string.split(options.outputfile, ".")[0]+"-conv."+string.split(options.outputfile, ".")[1], "w") | |
outputfile.write("#Convergence Threshold = "+ str(options.threshold)+".\n") | |
print "Dataset: " + options.data | |
print "Constant: " + str(options.constant) | |
if options.mode == 0: | |
print "boxtimes is set as multiplication, g(x) = x_b" | |
outputfile.write("#boxtimes is set as multiplication, g(x) = x_b\n") | |
else: | |
if options.mode == 1: | |
print "odot is set as multiplication" | |
outputfile.write("#odot is set as multiplication.\n") | |
else: | |
if options.mode == 2: | |
print "boxtimes is set as multiplication, g(x) = sqrt(x_b)" | |
outputfile.write("#boxtimes is set as multiplication, g(x) = sqrt(x_b)\n") | |
else: | |
if options.mode == 4: | |
print "otimes is set as multiplication" | |
outputfile.write("#otimes is set as multiplication.\n") | |
else: | |
print "No valid mode selected; mode can be either 0,1, or 2." | |
print "-----------------------------------------------------------------------------" | |
print "Use -d FILENAME in command line to set dataset to FILENAME" | |
print "Use -c CONSTANT in command line to set uncertianty constant to value CONSTANT" | |
print "Use -m MODE in command line to set multiplication mode to value MODE" | |
print "Use -o FILENAME in command line to set the output filenames to FILENAME" | |
print "use -t FLOAT in command line to set the convergence threshold" | |
print "-----------------------------------------------------------------------------" | |
data = load_data(options.data) | |
print "dataset loaded..." | |
# Convert data into opinion matrix | |
print "Convert dataset to Opinion Matrix..." | |
P = get_opinion(data, options.constant) | |
# Set plus, times operations with respect to EBSL mode | |
plus = oplus | |
if (options.mode & 4) == 4: | |
print "otimes" | |
times = otimes | |
else: | |
if (options.mode & 1) == 0: | |
if (options.mode & 2) == 0: | |
print "boxtimes, x_b" | |
f = func_belief | |
else: | |
print "boxtimes, sqrt(x_b)" | |
f = func_belief_sqrt | |
times = lambda x, y: boxtimes(x, y, f) | |
else: | |
print "odot" | |
theta = get_theta(data) | |
print "theta = " + str(theta) | |
outputfile.write("#theta = "+str(theta)+ ".\n") | |
times = lambda x, y: odot(x, y, theta, options.constant) | |
print "-----------------------------------------------------------------------------" | |
initfile = open(string.split(options.outputfile, ".")[0]+"-init."+string.split(options.outputfile, ".")[1], "w") | |
print "write initial state to file: " + options.outputfile | |
initfile.write("#Opinions initially\n") | |
write_to_file(P, initfile) | |
initfile.write("#Evidences initially\n") | |
write_to_file(extract_evidence(P, options.constant), initfile) | |
initfile.close() | |
print "ready to test for convergence" | |
print "theshold is set to: " + str(options.threshold) | |
P2, count, t = matrixgeoconvtest(P, options.threshold, plus, times) | |
print "Convergence reached at iteration " + str(count) | |
print "write to file: " + options.outputfile | |
outputfile.write("#Convergence after iteration: "+ str(count)+"\n") | |
outputfile.write("#Actual distance to next iteration: "+ str(t)+"\n") | |
outputfile.write("#Opinions: \n") | |
write_to_file(P2, outputfile) | |
print "done..." | |
outputfile.write("#Underlying derived Evidence: \n") | |
print "convert to Evidence matrix" | |
E2 = extract_evidence(P2, options.constant) | |
print "done..." | |
print "write_to file" | |
write_to_file(E2, outputfile) | |
outputfile.close() | |
print "exit" | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment