Skip to content

Instantly share code, notes, and snippets.

@maowug
Last active December 19, 2015 09:09
Show Gist options
  • Save maowug/5931311 to your computer and use it in GitHub Desktop.
Save maowug/5931311 to your computer and use it in GitHub Desktop.
rule-generation-v1-without-ORdescriptor
#!/usr/bin/env python
#encoding: utf-8
from _dataCleanning import plCleaning
from _utilities import calCriteriaFromDC
def ruleGeneration(argWrapper, orDescriptors=None, dec=0):
if not orDescriptors: orDescriptors = []
dataWrapper=argWrapper
localsList=['numOB', 'numAT', 'support', 'accuracy', 'decision',
'infoMsg', 'attNameList','AT', 'OB']
numOB,numAT,support,accuracy,decision,infoMsg,attNameList,AT,OB=\
tuple([dataWrapper[k] for k in localsList])
# Todo: modify AT based on orDescriptors
# print str(orDescriptors) >>>[[u'temperature', [u'normal', u'high']]]
for od in orDescriptors:
idx_att=attNameList.index(od[0])
at_index_att=AT[idx_att]
for cb in od[1:]:
for av in cb:
at_index_att.remove(at_index_att[at_index_att.index([av])])
AT[idx_att].append(cb)
D = decision - 1
conIndexes=filter(lambda x:x !=D,xrange(0,numAT))
import itertools
CF=[]
DCStep1=[]
gStep=2
NumOfParticles=6
# todo:s1 rules with ONE CON attribute
for d in xrange(0,len(AT[D])):
if dec and d!=dec-1:
continue
CF.append([ [] for _ in itertools.repeat(None,len(conIndexes))])
lDCAll=[[] for _ in itertools.repeat(None,numAT)]
for C in conIndexes:
# OR descriptor(AT[C]):
# ['normal', 'high', 'very_high'] -> [['normal', 'high'], ['very_high']]
for k in xrange(0,len(AT[C])):
thisDC=[[] for _ in itertools.repeat(None,NumOfParticles)]
#todo: sATV=set([AT[C][k]])? AT[C][k] is already a list if use OR descriptor
sATV=set(AT[C][k])
sDECV=set(AT[D][d])
for ob in xrange(0,numOB):
sCON=set(OB[ob][C])
sDEC=set(OB[ob][D])
if sCON==sATV:#inf(p)
if sDEC==sDECV:
thisDC[0].append(ob+1)
elif sDECV.issubset(sDEC):
thisDC[1].append(ob+1)
else:
thisDC[2].append(ob+1)
elif sATV.issubset(sCON): #sup(p)-inf(p)
if sDEC==sDECV:
thisDC[3].append(ob+1)
elif sDECV.issubset(sDEC):
thisDC[4].append(ob+1)
else:
thisDC[5].append(ob+1)
else: #p'
#thisDC[6].append(ob+1)
pass
# compute criteria values
_d=d if not dec else 0
minsupp,minacc,maxsupp,maxacc = calCriteriaFromDC(thisDC,numOB)
if minsupp >=support and minacc>=accuracy:
CF[_d][0].append([[C,k,d,minsupp,minacc,maxsupp,maxacc,'Lower']])
elif maxsupp >=support and maxacc>=accuracy:
CF[_d][0].append([[C,k,d,minsupp,minacc,maxsupp,maxacc,'Upper']])
elif maxsupp >=support:
CF[_d][0].append([[C,k,d,minsupp,minacc,maxsupp,maxacc,'flag4merge']])
else:
#only append to CF the promising DC better than 'flag4merge'
pass
#append all thisDC to lDCAll
lDCAll[C].append(thisDC)
#eof: for k
#eof: for C
DCStep1.append(lDCAll)
# eof: for d
if gStep==2:
#current step
cf=1
DCCurrent=[ [] for _ in itertools.repeat(None,len(AT[D]))]
for _d in xrange(0,len(AT[D])):
if dec and _d!=dec-1:
continue
d=_d if not dec else 0
lenCFm1=len(CF[d][cf-1])
for k in xrange(0,lenCFm1):
# flag4merge rules CAN be used in multiple complex rules
if CF[d][cf-1][k][0][7] != 'flag4merge':
continue
iC, iK, iD = CF[d][cf-1][k][0][0:3]
# for: k to merge
for km in xrange(k+1,lenCFm1):
if CF[d][cf-1][km][0][7] != 'flag4merge':
continue
iCm,iKm,iDm=CF[d][cf-1][km][0][0:3]
# not appropriate to merge
if iC==iCm or iD!=iDm:
continue
DCK=DCStep1[iD][iC][iK]
DCKm=DCStep1[iDm][iCm][iKm]
mDC=[ [] for _ in itertools.repeat(None,NumOfParticles)]
mDC[0]=list(set(DCK[0])& set(DCKm[0]))
mDC[1]=list(set(DCK[1])& set(DCKm[1]))
mDC[2]=list(set(DCK[2])& set(DCKm[2]))
mDC[3]=list((set(DCK[0])& set(DCKm[3]))|(set(DCK[3])& set(DCKm[0]))|(set(DCK[3])& set(DCKm[3])))
mDC[4]=list((set(DCK[1])& set(DCKm[4]))|(set(DCK[4])& set(DCKm[1]))|(set(DCK[4])& set(DCKm[4])))
mDC[5]=list((set(DCK[2])& set(DCKm[5]))|(set(DCK[5])& set(DCKm[2]))|(set(DCK[5])& set(DCKm[5])))
minsupp,minacc,maxsupp,maxacc = calCriteriaFromDC(mDC,numOB)
if minsupp >=support and minacc>=accuracy:
# CF[d][cf-1][km][0][7] = 'flag4mergeDone'
CF[d][cf].append([[iC,iK,iD,minsupp,minacc,maxsupp,maxacc,'Lower'],
[iCm,iKm,iDm,minsupp,minacc,maxsupp,maxacc,'Lower']])
#todo:mark DCStep2 unused
# DCCurrent[d].append([[iC,iK,iD],[iCm,iKm,iDm],mDC])
elif maxsupp >=support and maxacc>=accuracy:
# CF[d][cf-1][km][0][7] = 'flag4mergeDone'
CF[d][cf].append([[iC,iK,iD,minsupp,minacc,maxsupp,maxacc,'Upper'],
[iCm,iKm,iDm,minsupp,minacc,maxsupp,maxacc,'Upper']])
# DCCurrent[d].append([[iC,iK,iD],[iCm,iKm,iDm],mDC])
else:
#the merged DC still don't satisfy
pass
# eof: for km
#eof: for k
#eof: for d
# #DCStep2
DCStep2=DCCurrent
#eof: if gStep==2
for k in ['gStep','D','CF','DCStep1']:
dataWrapper[k]=locals()[k]
return dataWrapper
#eof: def ruleGeneration
def _dTCF2list(argWrapper,dec=0):
localsList=['numOB', 'numAT', 'support', 'accuracy', 'decision',
'infoMsg', 'attNameList','AT', 'OB','gStep','D','CF','DCStep1']
numOB,numAT,support,accuracy,decision,infoMsg,attNameList,AT,OB,gStep,D,CF,DCStep1=\
tuple([argWrapper[k] for k in localsList])
data=[]
from collections import OrderedDict
# thisOb: {'CON,p -> DEC,q':,m4:, 'Lower/Upper':}
for _d in xrange(0,len(AT[D])):
if dec and _d!=dec-1:
continue
d=_d if not dec else 0
for icf in xrange(0,gStep):
for k in xrange(0,len(CF[d][icf])):
if CF[d][icf][k][0][7]=='flag4merge':
continue
thisOb=OrderedDict({'Rules':len(data)+1})
cpdq=''
indd=CF[d][icf][k][0][2]
for j in xrange(0,icf+1):
indC,indk,indd=CF[d][icf][k][j][0:3]
# '&' ->','
avCk=AT[indC][indk][0] if len(AT[indC][indk])==1 else str(AT[indC][indk])
cpdq=cpdq+attNameList[indC]+':'+avCk+'& '
avDd=AT[D][indd][0] if len(AT[D][indd])==1 else str(AT[D][indd])
cpdq=cpdq[0:-2]+'->'+attNameList[D]+':'+avDd
thisOb['CON,p -> DEC,q']=cpdq
m4p1=CF[d][icf][k][0][3:8]
for i,a in enumerate(['minsupp','minacc','maxsupp','maxacc','Lower/Upper']):
thisOb[a]=m4p1[i]
data.append(thisOb)
return data
with open('mammo.pl', 'rb') as plFile:
argWrapper = plCleaning(plFile)
# example:
#['temperature',['normal','high']]
# ['AttName2',[0,1,2],[8,9,10]]
# orDescriptors=[[u'temperature', [u'normal', u'high']]]
dataWrapper=ruleGeneration(argWrapper,dec=2) #例:決定属性の3番目の属性値だけで、rule generation
# dataWrapper=ruleGeneration(argWrapper,orDescriptors=orDescriptors)
rules=_dTCF2list(dataWrapper,dec=2)
for rule in rules:
print u' '.join([str(v) for v in rule.values()])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment