Skip to content

Instantly share code, notes, and snippets.

@maowug
Last active December 20, 2015 03:59
Show Gist options
  • Save maowug/6067481 to your computer and use it in GitHub Desktop.
Save maowug/6067481 to your computer and use it in GitHub Desktop.
[07.24a] changes in _dataCleanning.py and survey.py
#!/usr/bin/env python
#encoding: utf-8
#__author__ = 'actor2019'
import csv
import cStringIO
import itertools
# with open('test.csv', 'rb') as csvfile:
# spamreader = csv.reader(csvfile, delimiter=',', quotechar='|')
# for row in spamreader:
# print '-'.join(row)
# with open('test.csv', 'rb') as csvfile:
# EXAMPLE_SIZE=256
# sniffer = csv.Sniffer()
# _dialect='csv.Dialect()'
# _hasHeader=True
# OB=[]
# _file=csvfile.read()
# _example=_file[0:EXAMPLE_SIZE]
# _dialect,_hasHeader=sniffer.sniff(_example,delimiters=' ,\t'),sniffer.has_header(_example)
#
# #todo:note - with open('test.csv', 'rb') as csvfile, csvfile could only be use once
# with open('test.csv', 'rb') as csvfile:
# print type(csvfile)
# iocsv=cStringIO.StringIO(csvfile.read())
# print type(iocsv)
# csvRows=csv.reader(iocsv,dialect=_dialect)
# # csvRows= _csvLines if _hasHeader else _csvLines
# for row in csvRows:
# OB.append(row)
# result=dict(
# OB=OB,
# # _csvLines='\n'.join([', '.join(row) for row in _csvLines]),
# # _example=_example,
# # _csvLines=str(_csvLines),
# support=0.3,
# accuracy=0.5,
# )
# print result
def plCleaning(plfile,DEC=[]):
"""
def csvCleaning(csvfile): return data
"""
plRows=plfile.readlines()
cleanRows=[x for x in map(lambda s: s.strip(),plRows) if x is not '']
#init
numOB,numAT,support,accuracy,decision,infoMsg,attNameList,AT,OB=([] for i in range(9))
for row in cleanRows:
argList=row[row.find('(')+1:row.rfind(')')].split(',')
if row.startswith('object'):
numOB=int(argList[0])#numOB
numAT=int(argList[1])#numAT
elif row.startswith('support'):
support=float(argList[0])#support
elif row.startswith('accuracy'):
accuracy=float(argList[0])#accuracy
elif row.startswith('decision'):
decision=int(argList[0]) #decision
elif row.startswith('attrib_values'):
import re
indexes=[i.start() for i in re.finditer(r',',row)]
attNameList.append(row[indexes[0]+1:indexes[1]])#att_name_list
argListAtt=row[row.find('[')+1:row.rfind(']')].split(',')
#todo: transform every arg into a list for the purpose of OR descriptor (including DEC)
AT.append([[arg] for arg in argListAtt])#AT
elif row.startswith('data'):
argListData=row[row.find('[')+1:row.rfind(']')].split(',')
t_argList=[[] for _ in itertools.repeat(None,numAT)]
count=0
x=0
while count<numAT:# How about using eval
if argListData[x].find('[') != -1: #found [
t_argList[count].append(argListData[x].replace('[',''))
x=x+1
while argListData[x].find(']') == -1: # while didn't find the next half ]
t_argList[count].append(argListData[x])
x=x+1
t_argList[count].append(argListData[x].replace(']',''))
x=x+1
count=count+1
continue
if argListData[x].find('[') == -1: # did not find [
if argListData[x] in ['nil','none','*','-']:
t_argList[count]=[arg[0] for arg in AT[count]]
else:
t_argList[count].append(argListData[x])
x=x+1
count=count+1
OB.append(t_argList)
elif any(row.startswith(x) for x in ['resolution','total_cases']):
infoMsg='Lines begin with '+','.join(['total_cases','resolution']) +' not used.'
else:
pass
#todo:qa
countOB=len(OB)
if DEC:
_OB=[]
_numOB=0
for ob in OB:
if ob[decision-1][0] in DEC:
_OB.append(ob)
_numOB+=1
OB = _OB
numOB=_numOB
AT[decision-1].remove(DEC)
from collections import defaultdict
dict2return,localDict = defaultdict(),locals()
for var in 'numOB,numAT,support,accuracy,decision,infoMsg,attNameList,AT,OB'.split(','):
dict2return[var]=localDict[var]
# print OB
# print AT
# print u'Number of objects: {new} ( <- from {old})'.format(old=countOB,new=len(OB))
print u'Number of objects: {new}'.format(old=countOB,new=len(OB))
return dict(dict2return)
#!/usr/bin/env python
#encoding: utf-8
from _dataCleanning import plCleaning
from _utilities import calCriteriaFromDC
def ruleGeneration(argWrapper, orDescriptors=None):
if not orDescriptors: orDescriptors = []
dataWrapper=argWrapper
localsList=['numOB', 'numAT', 'support', 'accuracy', 'decision',
'infoMsg', 'attNameList','AT', 'OB']
numOB,numAT,support,accuracy,decision,infoMsg,attNameList,AT,OB=\
tuple([dataWrapper[k] for k in localsList])
# Todo: modify AT based on orDescriptors
# print str(orDescriptors) >>>[[u'temperature', [u'normal', u'high']]]
for od in orDescriptors:
idx_att=attNameList.index(od[0])
at_index_att=AT[idx_att]
for cb in od[1:]:
for av in cb:
at_index_att.remove(at_index_att[at_index_att.index([av])])
AT[idx_att].append(cb)
D = decision - 1
conIndexes=filter(lambda x:x !=D,xrange(0,numAT))
import itertools
CF=[]
DCStep1=[]
gStep=2
NumOfParticles=6
# todo:s1 rules with ONE CON attribute
for d in xrange(0,len(AT[D])):
CF.append([ [] for _ in itertools.repeat(None,len(conIndexes))])
lDCAll=[[] for _ in itertools.repeat(None,numAT)]
for C in conIndexes:
# OR descriptor(AT[C]):
# ['normal', 'high', 'very_high'] -> [['normal', 'high'], ['very_high']]
for k in xrange(0,len(AT[C])):
thisDC=[[] for _ in itertools.repeat(None,NumOfParticles)]
#todo: sATV=set([AT[C][k]])? AT[C][k] is already a list if use OR descriptor
sATV=set(AT[C][k])
sDECV=set(AT[D][d])
for ob in xrange(0,numOB):
sCON=set(OB[ob][C])
sDEC=set(OB[ob][D])
if sCON==sATV:#inf(p)
if sDEC==sDECV:
thisDC[0].append(ob+1)
elif sDECV.issubset(sDEC):
thisDC[1].append(ob+1)
else:
thisDC[2].append(ob+1)
elif sATV.issubset(sCON): #sup(p)-inf(p)
if sDEC==sDECV:
thisDC[3].append(ob+1)
elif sDECV.issubset(sDEC):
thisDC[4].append(ob+1)
else:
thisDC[5].append(ob+1)
else: #p'
#thisDC[6].append(ob+1)
pass
# compute criteria values
minsupp,minacc,maxsupp,maxacc = calCriteriaFromDC(thisDC,numOB)
if minsupp >=support and minacc>=accuracy:
CF[d][0].append([[C,k,d,minsupp,minacc,maxsupp,maxacc,'Lower']])
elif maxsupp >=support and maxacc>=accuracy:
CF[d][0].append([[C,k,d,minsupp,minacc,maxsupp,maxacc,'Upper']])
elif maxsupp >=support:
CF[d][0].append([[C,k,d,minsupp,minacc,maxsupp,maxacc,'flag4merge']])
else:
#only append to CF the promising DC better than 'flag4merge'
pass
#append all thisDC to lDCAll
lDCAll[C].append(thisDC)
#eof: for k
#eof: for C
DCStep1.append(lDCAll)
# eof: for d
if gStep==2:
#current step
cf=1
DCCurrent=[ [] for _ in itertools.repeat(None,len(AT[D]))]
for d in xrange(0,len(AT[D])):
lenCFm1=len(CF[d][cf-1])
for k in xrange(0,lenCFm1):
# flag4merge rules CAN be used in multiple complex rules
if CF[d][cf-1][k][0][7] != 'flag4merge':
continue
iC, iK, iD = CF[d][cf-1][k][0][0:3]
# for: k to merge
for km in xrange(k+1,lenCFm1):
if CF[d][cf-1][km][0][7] != 'flag4merge':
continue
iCm,iKm,iDm=CF[d][cf-1][km][0][0:3]
# not appropriate to merge
if iC==iCm or iD!=iDm:
continue
DCK=DCStep1[iD][iC][iK]
DCKm=DCStep1[iDm][iCm][iKm]
mDC=[ [] for _ in itertools.repeat(None,NumOfParticles)]
mDC[0]=list(set(DCK[0])& set(DCKm[0]))
mDC[1]=list(set(DCK[1])& set(DCKm[1]))
mDC[2]=list(set(DCK[2])& set(DCKm[2]))
mDC[3]=list((set(DCK[0])& set(DCKm[3]))|(set(DCK[3])& set(DCKm[0]))|(set(DCK[3])& set(DCKm[3])))
mDC[4]=list((set(DCK[1])& set(DCKm[4]))|(set(DCK[4])& set(DCKm[1]))|(set(DCK[4])& set(DCKm[4])))
mDC[5]=list((set(DCK[2])& set(DCKm[5]))|(set(DCK[5])& set(DCKm[2]))|(set(DCK[5])& set(DCKm[5])))
minsupp,minacc,maxsupp,maxacc = calCriteriaFromDC(mDC,numOB)
if minsupp >=support and minacc>=accuracy:
# CF[d][cf-1][km][0][7] = 'flag4mergeDone'
CF[d][cf].append([[iC,iK,iD,minsupp,minacc,maxsupp,maxacc,'Lower'],
[iCm,iKm,iDm,minsupp,minacc,maxsupp,maxacc,'Lower']])
#todo:mark DCStep2 unused
# DCCurrent[d].append([[iC,iK,iD],[iCm,iKm,iDm],mDC])
elif maxsupp >=support and maxacc>=accuracy:
# CF[d][cf-1][km][0][7] = 'flag4mergeDone'
CF[d][cf].append([[iC,iK,iD,minsupp,minacc,maxsupp,maxacc,'Upper'],
[iCm,iKm,iDm,minsupp,minacc,maxsupp,maxacc,'Upper']])
# DCCurrent[d].append([[iC,iK,iD],[iCm,iKm,iDm],mDC])
else:
#the merged DC still don't satisfy
pass
# eof: for km
#eof: for k
#eof: for d
# #DCStep2
DCStep2=DCCurrent
#eof: if gStep==2
for k in ['gStep','D','CF','DCStep1']:
dataWrapper[k]=locals()[k]
return dataWrapper
#eof: def ruleGeneration
def _dTCF2list(argWrapper):
localsList=['numOB', 'numAT', 'support', 'accuracy', 'decision',
'infoMsg', 'attNameList','AT', 'OB','gStep','D','CF','DCStep1']
numOB,numAT,support,accuracy,decision,infoMsg,attNameList,AT,OB,gStep,D,CF,DCStep1=\
tuple([argWrapper[k] for k in localsList])
data=[]
from collections import OrderedDict
# thisOb: {'CON,p -> DEC,q':,m4:, 'Lower/Upper':}
for d in xrange(0,len(AT[D])):
for icf in xrange(0,gStep):
for k in xrange(0,len(CF[d][icf])):
if CF[d][icf][k][0][7]=='flag4merge':
continue
thisOb=OrderedDict({'Rules':len(data)+1})
cpdq=''
indd=CF[d][icf][k][0][2]
for j in xrange(0,icf+1):
indC,indk,indd=CF[d][icf][k][j][0:3]
# '&' ->','
avCk=AT[indC][indk][0] if len(AT[indC][indk])==1 else str(AT[indC][indk])
cpdq=cpdq+attNameList[indC]+':'+avCk+'& '
avDd=AT[D][indd][0] if len(AT[D][indd])==1 else str(AT[D][indd])
cpdq=cpdq[0:-2]+'->'+attNameList[D]+':'+avDd
thisOb['CON,p -> DEC,q']=cpdq
m4p1=CF[d][icf][k][0][3:8]
for i,a in enumerate(['minsupp','minacc','maxsupp','maxacc','Lower/Upper']):
thisOb[a]=m4p1[i]
data.append(thisOb)
return data
# (ちょっと古いバージョンから)
with open('flu.pl', 'rb') as plFile:
# # 例1:flu の決定属性値は yes、no
DEC=['yes']
argWrapper = plCleaning(plFile,DEC)
# # 例2:mammo の決定属性値は0、1
# DEC=['1']
# argWrapper = plCleaning(plFile,DEC)
# orDescriptors example:
#['temperature',['normal','high']]
# ['AttName2',[0,1,2],[8,9,10]]
# orDescriptors=[[u'temperature', [u'normal', u'high']]]
# dataWrapper=ruleGeneration(argWrapper,orDescriptors=orDescriptors)
dataWrapper=ruleGeneration(argWrapper)
rules=_dTCF2list(dataWrapper)
for rule in rules:
print u' '.join([str(v) for v in rule.values()])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment