Skip to content

Instantly share code, notes, and snippets.

@liamzebedee
Created November 5, 2018 16:45
Show Gist options
  • Save liamzebedee/1f5c56d656ceba808a2e99e78e9f6160 to your computer and use it in GitHub Desktop.
Save liamzebedee/1f5c56d656ceba808a2e99e78e9f6160 to your computer and use it in GitHub Desktop.
Evidence based subjective logic
import sys
import math
import string
import operator
from optparse import OptionParser
def load_data(filename):
# This procedure loads relevant data into table T.
# Table[i][1] denotes source of evidence
# Table[i][2] denotes subject of evidence
# Table[i][3] denotes (signed) amount of evidence. (positive amount means positive evidence and negative amount means negative evidence.)
T = []
separator = " "
file = open(filename, "r")
file.readline()
# Assumed is that data set consists of 5 entries each row, where first and last entries are irrelevant.
for line in file:
tmp = string.split(line, separator)
T.append( [int(i) for i in tmp[1:-1] ])
return T
def get_max(data):
return [max(i) for i in data]
def get_theta(data):
return max([data[i][2] for i in range(len(data))])
def get_evidence(data):
# This procedure computes amount of positive and negative evidences only
print "get evidence..."
y = get_max([[data[i][j] for i in range(len(data))] for j in range(2)])
Flow = [[[0,0] for i in range(y[0]+1)] for j in range(y[1]+1)]
for row in data:
val = row[-1]
if val >= 0:
Flow[row[0]][row[1]][0] += val
else:
Flow[row[0]][row[1]][1] += -1*val
print "finished"
return Flow
def get_opinion(data, constant):
# first extract evidences from dataset
print "get evidence..."
y = get_max([[data[i][j] for i in range(len(data))] for j in range(2)])
Flow = [[[0,0,constant] for i in range(y[0]+1)] for j in range(y[1]+1)]
for row in data:
val = row[-1]
if val >= 0:
# add positive evidence
Flow[row[0]][row[1]][0] += val
else:
# add negative evidence
Flow[row[0]][row[1]][1] += -1*val
print "finished"
# Given evidence compute Evidence Based Opinions using uncertainty constant
print "extract opinions..."
for i in range(len(Flow)):
for j in range(len(Flow)):
el = Flow[i][j]
ev_sum_el = el[0]+el[1]+el[2]
Flow[i][j][0] = float(el[0])/ev_sum_el
Flow[i][j][1] = float(el[1])/ev_sum_el
Flow[i][j][2] = float(el[2])/ev_sum_el
print "finished"
return Flow
def test_flow(flow):
# Test for entries in flow, that have both positiev and negative evidence
print "test for both neg and pos evidence"
for row in flow:
for el in row:
if (el[0]!=0) and (el[1]!=0):
print el
print "test finished"
def otimes(ox, oy):
# compute SL opinion x otimes opinion y
return [ox[0]*oy[0], ox[0]*oy[1], ox[1]+ox[2]+ox[0]*oy[2]]
def oplus(ox, oy):
# compute SL opinion x oplus opinion y
div = ox[2]+oy[2]-ox[2]*oy[2]
return map(lambda x: x/div, [ox[2]*oy[0]+oy[2]*ox[0], ox[2]*oy[1]+oy[2]*ox[1], ox[2]*oy[2]])
def scalartimes(scalar, ox):
# comput scalar mul
div = scalar*(ox[0]+ox[1])+ox[2]
return map(lambda x: x/div, [scalar*ox[0], scalar*ox[1], ox[2]])
def boxtimes(ox, oy, func):
#compute opinion x boxtimes opinion y given g(x) = func
scalar = func(ox)
return scalartimes(scalar, oy)
def odot(ox, oy, theta, constant):
scalar = (float(constant)/theta) * (ox[0]/ox[2])
return scalartimes(scalar, oy)
def matrixtimes(A, B, plus=operator.add, times=operator.mul):
# compute A.B using own functions for plus and mul
# Default plus = +
# Default mul = *
return [[reduce(plus, map(times, A[i], [B[k][j] for k in range(len(B[0]))])) for j in range(len(B))] for i in range(len(A))]
def matrixsquare(A, plus=operator.add, times=operator.mul):
# compute A^2 using own functions for plus and mul
# Default plus = +
# Default mul = *
return [[reduce(plus, map(times, A[i], [A[j][k] for k in range(len(A))])) for i in range(len(A))] for j in range(len(A))]
def matrixplus(A, B, plus=operator.add):
return [map(plus, A[i],B[i]) for i in range(len(A))]
def matrixgeo(A, pow, plus=operator.add, times=operator.mul):
# compute A^pow using own functions for plus and mul
# Default plus = +
# Default mul = *
R = A
for i in range(pow-1):
R = matrixplus(A, matrixtimes(R,A, plus, times), plus)
return R
def matrixgeonew(A, pow, plus=operator.add, times=operator.mul):
# compute A^pow using own functions for plus and mul
# Default plus = +
# Default mul = *
# Set diagonal to zero.
R = A
for i in range(pow-1):
R = matrixplus(A, matrixtimes(R,A, plus, times), plus)
for j in range(len(R)):
R[j][j] = [0.0, 0.0, 1.0]
return R
def matrixpow(A, pow, plus=operator.add, times=operator.mul):
# compute A^pow using own functions for plus and mul
# Default plus = +
# Default mul = *
R = A
for i in range(pow-1):
R = matrixtimes(R,A, plus, times)
return R
def matrixpownew(A, pow, plus=operator.add, times=operator.mul):
# compute A^pow using own functions for plus and mul
# Default plus = +
# Default mul = *
R = A
for i in range(pow-1):
R = matrixtimes(R,A, plus, times)
for j in range(len(R)):
R[j][j] = [0.0, 0.0, 1.0]
return R
def func_belief(ox):
# This function takes g(x) = x_b
return ox[0]
def func_belief_sqrt(ox):
# This function takes g(x) = sqrt(x_b)
return math.sqrt(ox[0])
def distance(m1, m2):
res = 0.0
for i in range(len(m1)):
for j in range(len(m1)):
for k in range(3):
res += abs(m1[i][j][k] - m2[i][j][k])
return res
def matrixgeoconvtest(A, threshold, plus=operator.add, times=operator.mul):
# Default plus = +
# Default mul = *
# Set diagonal to zero.
t = threshold + 1.0
R = A
count = 0
while (t > threshold and count < 100):
count = count + 1
Y = R
R = matrixplus(A, matrixtimes(R,A, plus, times), plus)
for j in range(len(R)):
R[j][j] = [0.0, 0.0, 1.0]
t = distance(Y , R)
return R, count, t
def extract_evidence(M, constant):
return [[[constant*(M[i][j][0]/M[i][j][2]), constant*(M[i][j][1]/M[i][j][2])] for j in range(len(M[i]))] for i in range(len(M))]
def write_to_file(M, openfile):
openfile.write("{")
for i in range(len(M)):
openfile.write("{")
for j in range(len(M[i])):
openfile.write("{")
for k in range(len(M[i][j])-1):
openfile.write(str(M[i][j][k]))
openfile.write(",")
openfile.write(str(M[i][j][-1]))
openfile.write("},")
openfile.write("},\n")
openfile.write("}")
"V1" "V2" "V3" "V4"
"1" 0 1 75 22688
"2" 0 1 -75 22688
"3" 0 2 575 22688
"4" 0 2 -35 22688
"1" 0 3 5 22688
"2" 0 3 -25 22688
"3" 0 4 32 22688
"4" 0 4 -38 22688
"5" 1 0 1000 22688
"6" 1 0 -500 22688
"7" 1 2 225 22688
"8" 1 2 -25 22688
"5" 1 3 100 22688
"6" 1 3 -236 22688
"7" 1 4 2225 22688
"8" 1 4 -325 22688
"9" 2 0 -375 22688
"10" 2 0 -275 22688
"11" 2 1 -575 22688
"11" 2 1 1575 22688
"9" 2 3 -75 22688
"10" 2 3 275 22688
"11" 2 4 -57 22688
"11" 2 4 157 22688
"5" 3 0 100 22688
"6" 3 0 -50 22688
"7" 3 2 22 22688
"8" 3 2 -2 22688
"5" 3 1 10 22688
"6" 3 1 -26 22688
"7" 3 4 225 22688
"8" 3 4 -35 22688
"9" 4 0 -75 22688
"10" 4 0 275 22688
"11" 4 1 -55 22688
"11" 4 1 575 22688
"9" 4 3 -85 22688
"10" 4 3 25 22688
"11" 4 2 -57 22688
"11" 4 2 17 22688
import sys
import string
import operator
from optparse import OptionParser
from EBSL_lib import load_data, get_evidence, get_opinion, get_theta, test_flow, write_to_file, extract_evidence, matrixgeoconvtest
from EBSL_lib import otimes, oplus, odot, scalartimes, boxtimes, matrixtimes, matrixplus, matrixsquare, matrixpow, matrixgeo, matrixgeonew, func_belief, func_belief_sqrt
def roundopinion(ox, digits):
rnd = lambda x: round(x*(10**digits))/(10**digits)
return map(rnd, ox)
def distance(m1, m2):
res = 0.0
for i in range(len(m1)):
for j in range(len(m1)):
for k in range(3):
res += abs(m1[i][j][k] - m2[i][j][k])
return res
parser = OptionParser()
parser.add_option("-o", "--outputfile",
help="Filename for output data.")
parser.add_option("-d", "--data",
help="Filename for data set.")
parser.add_option("-c", "--constant", type="int",
help="Constant used in Uncertainty Calculation.")
parser.add_option("-m", "--mode", type="int",
help="Mode used for mul. mode = 0 --> boxtimes, mode=1--> odot.")
parser.add_option("-t", "--threshold", type="float",
help="Convergence threshold")
parser.set_defaults(outputfile = "output.txt", data = "exampleSet.txt", constant=2, mode=0, threshold=1.0)
options, args = parser.parse_args()
outputfile = open(string.split(options.outputfile, ".")[0]+"-conv."+string.split(options.outputfile, ".")[1], "w")
outputfile.write("#Convergence Threshold = "+ str(options.threshold)+".\n")
print "Dataset: " + options.data
print "Constant: " + str(options.constant)
if options.mode == 0:
print "boxtimes is set as multiplication, g(x) = x_b"
outputfile.write("#boxtimes is set as multiplication, g(x) = x_b\n")
else:
if options.mode == 1:
print "odot is set as multiplication"
outputfile.write("#odot is set as multiplication.\n")
else:
if options.mode == 2:
print "boxtimes is set as multiplication, g(x) = sqrt(x_b)"
outputfile.write("#boxtimes is set as multiplication, g(x) = sqrt(x_b)\n")
else:
if options.mode == 4:
print "otimes is set as multiplication"
outputfile.write("#otimes is set as multiplication.\n")
else:
print "No valid mode selected; mode can be either 0,1, or 2."
print "-----------------------------------------------------------------------------"
print "Use -d FILENAME in command line to set dataset to FILENAME"
print "Use -c CONSTANT in command line to set uncertianty constant to value CONSTANT"
print "Use -m MODE in command line to set multiplication mode to value MODE"
print "Use -o FILENAME in command line to set the output filenames to FILENAME"
print "use -t FLOAT in command line to set the convergence threshold"
print "-----------------------------------------------------------------------------"
data = load_data(options.data)
print "dataset loaded..."
# Convert data into opinion matrix
print "Convert dataset to Opinion Matrix..."
P = get_opinion(data, options.constant)
# Set plus, times operations with respect to EBSL mode
plus = oplus
if (options.mode & 4) == 4:
print "otimes"
times = otimes
else:
if (options.mode & 1) == 0:
if (options.mode & 2) == 0:
print "boxtimes, x_b"
f = func_belief
else:
print "boxtimes, sqrt(x_b)"
f = func_belief_sqrt
times = lambda x, y: boxtimes(x, y, f)
else:
print "odot"
theta = get_theta(data)
print "theta = " + str(theta)
outputfile.write("#theta = "+str(theta)+ ".\n")
times = lambda x, y: odot(x, y, theta, options.constant)
print "-----------------------------------------------------------------------------"
initfile = open(string.split(options.outputfile, ".")[0]+"-init."+string.split(options.outputfile, ".")[1], "w")
print "write initial state to file: " + options.outputfile
initfile.write("#Opinions initially\n")
write_to_file(P, initfile)
initfile.write("#Evidences initially\n")
write_to_file(extract_evidence(P, options.constant), initfile)
initfile.close()
print "ready to test for convergence"
print "theshold is set to: " + str(options.threshold)
P2, count, t = matrixgeoconvtest(P, options.threshold, plus, times)
print "Convergence reached at iteration " + str(count)
print "write to file: " + options.outputfile
outputfile.write("#Convergence after iteration: "+ str(count)+"\n")
outputfile.write("#Actual distance to next iteration: "+ str(t)+"\n")
outputfile.write("#Opinions: \n")
write_to_file(P2, outputfile)
print "done..."
outputfile.write("#Underlying derived Evidence: \n")
print "convert to Evidence matrix"
E2 = extract_evidence(P2, options.constant)
print "done..."
print "write_to file"
write_to_file(E2, outputfile)
outputfile.close()
print "exit"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment