Skip to content

Instantly share code, notes, and snippets.

@aritzg
Last active August 16, 2017 15:22
Show Gist options
  • Save aritzg/c7269030520ef8e6a88eb4b6fdeca25a to your computer and use it in GitHub Desktop.
Save aritzg/c7269030520ef8e6a88eb4b6fdeca25a to your computer and use it in GitHub Desktop.
new data challenge
'''
def to_list(a):
return [a]
def append(a, b):
a.append(b)
return a
def extend(a, b):
a.extend(b)
return a
'''
def to_dict(a):
return dict([a])
def append_to_dict(a, b):
print("A " + str(a))
print("B " + str(b))
a.update(dict([b]))
return a
def extend_dict(a, b):
a.update(b)
return a
def func(a, b):
return a + " " + b[:-4] + "0:00"
def getValFromDict(dict , code):
val = dict.get(code)
if val is None or val=='':
return 0
else:
return val
text_file = sc.textFile("file:///home/aritz/data/personal/proiektuak/nemDataChallenge/data/dataset.csv")
header = text_file.first()
text_file = text_file.filter(lambda line: line != header)
print("Read and skip header")
step1 = text_file.map(lambda x: x.split(','))
step2 = step1.map(lambda x: ( func(x[1], x[2]) + ', ' + x[0] + '#' + x[3], (float(x[4]),1)))
step3 = step2.reduceByKey(lambda a, b: (1*a[0]+ 1*b[0],a[1] + 1*b[1]))
step4 = step3.map(lambda x: (x[0][:19] , (x[0][21:], x[1][0]/x[1][1])))
print("Ten-minutal means calculated")
print(step4.first())
#step5 = step4.sortBy(lambda a: (a[0], a[1])).combineByKey(to_list, append, extend)
step5 = step4.sortBy(lambda a: (a[0], a[1])).combineByKey(to_dict, append_to_dict, extend_dict)
print("Sort by time and code")
print(step5.collect())
step7 = step5.map(lambda x: (x[0], getValFromDict(x[1], 'A001#VAR4508'), getValFromDict(x[1], 'A001#VAR4068'), getValFromDict(x[1], 'A001#VAR4017'), getValFromDict(x[1], 'A001#VAR4352'), getValFromDict(x[1], 'A001#VAR4001'), getValFromDict(x[1], 'A001#VAR4552'), getValFromDict(x[1], 'A001#VAR1329'), getValFromDict(x[1], 'A001#VAR306'), getValFromDict(x[1], 'A001#VAR3734'), getValFromDict(x[1], 'A001#VAR3923'), getValFromDict(x[1], 'A001#VAR142'), getValFromDict(x[1], 'A001#VAR3741'), getValFromDict(x[1], 'A001#VAR3567'), getValFromDict(x[1], 'A001#VAR131'), getValFromDict(x[1], 'A001#VAR4953'), getValFromDict(x[1], 'A001#VAR1492'), getValFromDict(x[1], 'A001#VAR3572'), getValFromDict(x[1], 'A001#VAR2693'), getValFromDict(x[1], 'A001#VAR222'), getValFromDict(x[1], 'A001#VAR2343'), getValFromDict(x[1], 'A001#VAR1624'), getValFromDict(x[1], 'A001#VAR2940'), getValFromDict(x[1], 'A001#VAR4406'), getValFromDict(x[1], 'A001#VAR2197'), getValFromDict(x[1], 'A001#VAR948'), getValFromDict(x[1], 'A001#VAR2762'), getValFromDict(x[1], 'A001#VAR3931'), getValFromDict(x[1], 'A001#VAR1534'), getValFromDict(x[1], 'A002#VAR1957'), getValFromDict(x[1], 'A002#VAR1212'), getValFromDict(x[1], 'A002#VAR1346'), getValFromDict(x[1], 'A002#VAR2914'), getValFromDict(x[1], 'A002#VAR4021'), getValFromDict(x[1], 'A002#VAR1746'), getValFromDict(x[1], 'A002#VAR4349'), getValFromDict(x[1], 'A002#VAR1437'), getValFromDict(x[1], 'A002#VAR2560'), getValFromDict(x[1], 'A002#VAR1119'), getValFromDict(x[1], 'A002#VAR589'), getValFromDict(x[1], 'A002#VAR3325'), getValFromDict(x[1], 'A002#VAR4847'), getValFromDict(x[1], 'A002#VAR3634'), getValFromDict(x[1], 'A002#VAR2063'), getValFromDict(x[1], 'A002#VAR2207'), getValFromDict(x[1], 'A002#VAR3959'), getValFromDict(x[1], 'A002#VAR1583'), getValFromDict(x[1], 'A002#VAR2708'), getValFromDict(x[1], 'A002#VAR3937'), getValFromDict(x[1], 'A002#VAR099'), getValFromDict(x[1], 'A002#VAR4566'), getValFromDict(x[1], 'A002#VAR3331'), getValFromDict(x[1], 'A002#VAR4293'), getValFromDict(x[1], 'A002#VAR1028'), getValFromDict(x[1], 'A002#VAR1570'), getValFromDict(x[1], 'A002#VAR4310'), getValFromDict(x[1], 'A002#VAR4132'), getValFromDict(x[1], 'A003#VAR4681'), getValFromDict(x[1], 'A003#VAR561'), getValFromDict(x[1], 'A003#VAR3579'), getValFromDict(x[1], 'A003#VAR4226'), getValFromDict(x[1], 'A003#VAR1160'), getValFromDict(x[1], 'A003#VAR538'), getValFromDict(x[1], 'A003#VAR410'), getValFromDict(x[1], 'A003#VAR1340'), getValFromDict(x[1], 'A003#VAR4224'), getValFromDict(x[1], 'A003#VAR1099'), getValFromDict(x[1], 'A003#VAR3857'), getValFromDict(x[1], 'A003#VAR3642'), getValFromDict(x[1], 'A003#VAR2947'), getValFromDict(x[1], 'A003#VAR3757'), getValFromDict(x[1], 'A003#VAR4683'), getValFromDict(x[1], 'A003#VAR2191'), getValFromDict(x[1], 'A003#VAR3830'), getValFromDict(x[1], 'A003#VAR3258'), getValFromDict(x[1], 'A003#VAR4105'), getValFromDict(x[1], 'A003#VAR587'), getValFromDict(x[1], 'A003#VAR809'), getValFromDict(x[1], 'A003#VAR666'), getValFromDict(x[1], 'A003#VAR4518'), getValFromDict(x[1], 'A003#VAR2753'), getValFromDict(x[1], 'A003#VAR1506'), getValFromDict(x[1], 'A003#VAR2795'), getValFromDict(x[1], 'A003#VAR3299'), getValFromDict(x[1], 'A003#VAR3260'), getValFromDict(x[1], 'A004#VAR2192'), getValFromDict(x[1], 'A004#VAR451'), getValFromDict(x[1], 'A004#VAR4141'), getValFromDict(x[1], 'A004#VAR1262'), getValFromDict(x[1], 'A004#VAR1086'), getValFromDict(x[1], 'A004#VAR4456'), getValFromDict(x[1], 'A004#VAR2936'), getValFromDict(x[1], 'A004#VAR2919'), getValFromDict(x[1], 'A004#VAR313'), getValFromDict(x[1], 'A004#VAR595'), getValFromDict(x[1], 'A004#VAR235'), getValFromDict(x[1], 'A004#VAR2945'), getValFromDict(x[1], 'A004#VAR806'), getValFromDict(x[1], 'A004#VAR343'), getValFromDict(x[1], 'A004#VAR1227'), getValFromDict(x[1], 'A004#VAR3371'), getValFromDict(x[1], 'A004#VAR1493'), getValFromDict(x[1], 'A004#VAR016'), getValFromDict(x[1], 'A004#VAR810'), getValFromDict(x[1], 'A004#VAR2870'), getValFromDict(x[1], 'A004#VAR4594'), getValFromDict(x[1], 'A004#VAR1214'), getValFromDict(x[1], 'A004#VAR2085'), getValFromDict(x[1], 'A004#VAR1386'), getValFromDict(x[1], 'A004#VAR2631'), getValFromDict(x[1], 'A004#VAR027'), getValFromDict(x[1], 'A004#VAR4281'), getValFromDict(x[1], 'A004#VAR3887'), getValFromDict(x[1], 'A005#VAR4804'), getValFromDict(x[1], 'A005#VAR1859'), getValFromDict(x[1], 'A005#VAR3913'), getValFromDict(x[1], 'A005#VAR3034'), getValFromDict(x[1], 'A005#VAR412'), getValFromDict(x[1], 'A005#VAR2310'), getValFromDict(x[1], 'A005#VAR2004'), getValFromDict(x[1], 'A005#VAR3060'), getValFromDict(x[1], 'A005#VAR1110'), getValFromDict(x[1], 'A005#VAR3093'), getValFromDict(x[1], 'A005#VAR4490'), getValFromDict(x[1], 'A005#VAR2660'), getValFromDict(x[1], 'A005#VAR059'), getValFromDict(x[1], 'A005#VAR4571'), getValFromDict(x[1], 'A005#VAR3688'), getValFromDict(x[1], 'A005#VAR1250'), getValFromDict(x[1], 'A005#VAR1554'), getValFromDict(x[1], 'A005#VAR4004'), getValFromDict(x[1], 'A005#VAR4570'), getValFromDict(x[1], 'A005#VAR2038'), getValFromDict(x[1], 'A005#VAR3027'), getValFromDict(x[1], 'A005#VAR578'), getValFromDict(x[1], 'A005#VAR1486'), getValFromDict(x[1], 'A005#VAR1684'), getValFromDict(x[1], 'A005#VAR3059'), getValFromDict(x[1], 'A005#VAR4367'), getValFromDict(x[1], 'A005#VAR4161'), getValFromDict(x[1], 'A005#VAR3840')))
#step7 = step6.map(lambda x: (x[0], x[1][1], x[1][3], x[1][5], x[1][7], x[1][9], x[1][11], x[1][13], x[1][15], x[1][17], x[1][19], x[1][21], x[1][23], x[1][25], x[1][27], x[1][29], x[1][31], x[1][33], x[1][35], x[1][37], x[1][39], x[1][41], x[1][43], x[1][45], x[1][47], x[1][49], x[1][51], x[1][53], x[1][55], x[1][57], x[1][59], x[1][61], x[1][63], x[1][65], x[1][67], x[1][69], x[1][71], x[1][73], x[1][75], x[1][77], x[1][79], x[1][81], x[1][83], x[1][85], x[1][87], x[1][89], x[1][91], x[1][93], x[1][95], x[1][97], x[1][99], x[1][101], x[1][103], x[1][105], x[1][107], x[1][109], x[1][111], x[1][113], x[1][115], x[1][117], x[1][119], x[1][121], x[1][123], x[1][125], x[1][127], x[1][129], x[1][131], x[1][133], x[1][135], x[1][137], x[1][139], x[1][141], x[1][143], x[1][145], x[1][147], x[1][149], x[1][151], x[1][153], x[1][155], x[1][157], x[1][159], x[1][161], x[1][163], x[1][165], x[1][167]))
#step6.map(lambda x: (x[0], len(x[1]))).collect()
#print("Remove keys")
#print(step7.takeSample(False, 2, 1))
df=spark.createDataFrame(step7)
df.write.format("com.databricks.spark.csv").option("header", "false").save("data/full3.csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment