Last active
August 16, 2017 15:22
-
-
Save aritzg/c7269030520ef8e6a88eb4b6fdeca25a to your computer and use it in GitHub Desktop.
new data challenge
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ''' | |
| def to_list(a): | |
| return [a] | |
| def append(a, b): | |
| a.append(b) | |
| return a | |
| def extend(a, b): | |
| a.extend(b) | |
| return a | |
| ''' | |
| def to_dict(a): | |
| return dict([a]) | |
| def append_to_dict(a, b): | |
| print("A " + str(a)) | |
| print("B " + str(b)) | |
| a.update(dict([b])) | |
| return a | |
| def extend_dict(a, b): | |
| a.update(b) | |
| return a | |
| def func(a, b): | |
| return a + " " + b[:-4] + "0:00" | |
| def getValFromDict(dict , code): | |
| val = dict.get(code) | |
| if val is None or val=='': | |
| return 0 | |
| else: | |
| return val | |
| text_file = sc.textFile("file:///home/aritz/data/personal/proiektuak/nemDataChallenge/data/dataset.csv") | |
| header = text_file.first() | |
| text_file = text_file.filter(lambda line: line != header) | |
| print("Read and skip header") | |
| step1 = text_file.map(lambda x: x.split(',')) | |
| step2 = step1.map(lambda x: ( func(x[1], x[2]) + ', ' + x[0] + '#' + x[3], (float(x[4]),1))) | |
| step3 = step2.reduceByKey(lambda a, b: (1*a[0]+ 1*b[0],a[1] + 1*b[1])) | |
| step4 = step3.map(lambda x: (x[0][:19] , (x[0][21:], x[1][0]/x[1][1]))) | |
| print("Ten-minutal means calculated") | |
| print(step4.first()) | |
| #step5 = step4.sortBy(lambda a: (a[0], a[1])).combineByKey(to_list, append, extend) | |
| step5 = step4.sortBy(lambda a: (a[0], a[1])).combineByKey(to_dict, append_to_dict, extend_dict) | |
| print("Sort by time and code") | |
| print(step5.collect()) | |
| step7 = step5.map(lambda x: (x[0], getValFromDict(x[1], 'A001#VAR4508'), getValFromDict(x[1], 'A001#VAR4068'), getValFromDict(x[1], 'A001#VAR4017'), getValFromDict(x[1], 'A001#VAR4352'), getValFromDict(x[1], 'A001#VAR4001'), getValFromDict(x[1], 'A001#VAR4552'), getValFromDict(x[1], 'A001#VAR1329'), getValFromDict(x[1], 'A001#VAR306'), getValFromDict(x[1], 'A001#VAR3734'), getValFromDict(x[1], 'A001#VAR3923'), getValFromDict(x[1], 'A001#VAR142'), getValFromDict(x[1], 'A001#VAR3741'), getValFromDict(x[1], 'A001#VAR3567'), getValFromDict(x[1], 'A001#VAR131'), getValFromDict(x[1], 'A001#VAR4953'), getValFromDict(x[1], 'A001#VAR1492'), getValFromDict(x[1], 'A001#VAR3572'), getValFromDict(x[1], 'A001#VAR2693'), getValFromDict(x[1], 'A001#VAR222'), getValFromDict(x[1], 'A001#VAR2343'), getValFromDict(x[1], 'A001#VAR1624'), getValFromDict(x[1], 'A001#VAR2940'), getValFromDict(x[1], 'A001#VAR4406'), getValFromDict(x[1], 'A001#VAR2197'), getValFromDict(x[1], 'A001#VAR948'), getValFromDict(x[1], 'A001#VAR2762'), getValFromDict(x[1], 'A001#VAR3931'), getValFromDict(x[1], 'A001#VAR1534'), getValFromDict(x[1], 'A002#VAR1957'), getValFromDict(x[1], 'A002#VAR1212'), getValFromDict(x[1], 'A002#VAR1346'), getValFromDict(x[1], 'A002#VAR2914'), getValFromDict(x[1], 'A002#VAR4021'), getValFromDict(x[1], 'A002#VAR1746'), getValFromDict(x[1], 'A002#VAR4349'), getValFromDict(x[1], 'A002#VAR1437'), getValFromDict(x[1], 'A002#VAR2560'), getValFromDict(x[1], 'A002#VAR1119'), getValFromDict(x[1], 'A002#VAR589'), getValFromDict(x[1], 'A002#VAR3325'), getValFromDict(x[1], 'A002#VAR4847'), getValFromDict(x[1], 'A002#VAR3634'), getValFromDict(x[1], 'A002#VAR2063'), getValFromDict(x[1], 'A002#VAR2207'), getValFromDict(x[1], 'A002#VAR3959'), getValFromDict(x[1], 'A002#VAR1583'), getValFromDict(x[1], 'A002#VAR2708'), getValFromDict(x[1], 'A002#VAR3937'), getValFromDict(x[1], 'A002#VAR099'), getValFromDict(x[1], 'A002#VAR4566'), getValFromDict(x[1], 'A002#VAR3331'), getValFromDict(x[1], 'A002#VAR4293'), getValFromDict(x[1], 'A002#VAR1028'), getValFromDict(x[1], 'A002#VAR1570'), getValFromDict(x[1], 'A002#VAR4310'), getValFromDict(x[1], 'A002#VAR4132'), getValFromDict(x[1], 'A003#VAR4681'), getValFromDict(x[1], 'A003#VAR561'), getValFromDict(x[1], 'A003#VAR3579'), getValFromDict(x[1], 'A003#VAR4226'), getValFromDict(x[1], 'A003#VAR1160'), getValFromDict(x[1], 'A003#VAR538'), getValFromDict(x[1], 'A003#VAR410'), getValFromDict(x[1], 'A003#VAR1340'), getValFromDict(x[1], 'A003#VAR4224'), getValFromDict(x[1], 'A003#VAR1099'), getValFromDict(x[1], 'A003#VAR3857'), getValFromDict(x[1], 'A003#VAR3642'), getValFromDict(x[1], 'A003#VAR2947'), getValFromDict(x[1], 'A003#VAR3757'), getValFromDict(x[1], 'A003#VAR4683'), getValFromDict(x[1], 'A003#VAR2191'), getValFromDict(x[1], 'A003#VAR3830'), getValFromDict(x[1], 'A003#VAR3258'), getValFromDict(x[1], 'A003#VAR4105'), getValFromDict(x[1], 'A003#VAR587'), getValFromDict(x[1], 'A003#VAR809'), getValFromDict(x[1], 'A003#VAR666'), getValFromDict(x[1], 'A003#VAR4518'), getValFromDict(x[1], 'A003#VAR2753'), getValFromDict(x[1], 'A003#VAR1506'), getValFromDict(x[1], 'A003#VAR2795'), getValFromDict(x[1], 'A003#VAR3299'), getValFromDict(x[1], 'A003#VAR3260'), getValFromDict(x[1], 'A004#VAR2192'), getValFromDict(x[1], 'A004#VAR451'), getValFromDict(x[1], 'A004#VAR4141'), getValFromDict(x[1], 'A004#VAR1262'), getValFromDict(x[1], 'A004#VAR1086'), getValFromDict(x[1], 'A004#VAR4456'), getValFromDict(x[1], 'A004#VAR2936'), getValFromDict(x[1], 'A004#VAR2919'), getValFromDict(x[1], 'A004#VAR313'), getValFromDict(x[1], 'A004#VAR595'), getValFromDict(x[1], 'A004#VAR235'), getValFromDict(x[1], 'A004#VAR2945'), getValFromDict(x[1], 'A004#VAR806'), getValFromDict(x[1], 'A004#VAR343'), getValFromDict(x[1], 'A004#VAR1227'), getValFromDict(x[1], 'A004#VAR3371'), getValFromDict(x[1], 'A004#VAR1493'), getValFromDict(x[1], 'A004#VAR016'), getValFromDict(x[1], 'A004#VAR810'), getValFromDict(x[1], 'A004#VAR2870'), getValFromDict(x[1], 'A004#VAR4594'), getValFromDict(x[1], 'A004#VAR1214'), getValFromDict(x[1], 'A004#VAR2085'), getValFromDict(x[1], 'A004#VAR1386'), getValFromDict(x[1], 'A004#VAR2631'), getValFromDict(x[1], 'A004#VAR027'), getValFromDict(x[1], 'A004#VAR4281'), getValFromDict(x[1], 'A004#VAR3887'), getValFromDict(x[1], 'A005#VAR4804'), getValFromDict(x[1], 'A005#VAR1859'), getValFromDict(x[1], 'A005#VAR3913'), getValFromDict(x[1], 'A005#VAR3034'), getValFromDict(x[1], 'A005#VAR412'), getValFromDict(x[1], 'A005#VAR2310'), getValFromDict(x[1], 'A005#VAR2004'), getValFromDict(x[1], 'A005#VAR3060'), getValFromDict(x[1], 'A005#VAR1110'), getValFromDict(x[1], 'A005#VAR3093'), getValFromDict(x[1], 'A005#VAR4490'), getValFromDict(x[1], 'A005#VAR2660'), getValFromDict(x[1], 'A005#VAR059'), getValFromDict(x[1], 'A005#VAR4571'), getValFromDict(x[1], 'A005#VAR3688'), getValFromDict(x[1], 'A005#VAR1250'), getValFromDict(x[1], 'A005#VAR1554'), getValFromDict(x[1], 'A005#VAR4004'), getValFromDict(x[1], 'A005#VAR4570'), getValFromDict(x[1], 'A005#VAR2038'), getValFromDict(x[1], 'A005#VAR3027'), getValFromDict(x[1], 'A005#VAR578'), getValFromDict(x[1], 'A005#VAR1486'), getValFromDict(x[1], 'A005#VAR1684'), getValFromDict(x[1], 'A005#VAR3059'), getValFromDict(x[1], 'A005#VAR4367'), getValFromDict(x[1], 'A005#VAR4161'), getValFromDict(x[1], 'A005#VAR3840'))) | |
| #step7 = step6.map(lambda x: (x[0], x[1][1], x[1][3], x[1][5], x[1][7], x[1][9], x[1][11], x[1][13], x[1][15], x[1][17], x[1][19], x[1][21], x[1][23], x[1][25], x[1][27], x[1][29], x[1][31], x[1][33], x[1][35], x[1][37], x[1][39], x[1][41], x[1][43], x[1][45], x[1][47], x[1][49], x[1][51], x[1][53], x[1][55], x[1][57], x[1][59], x[1][61], x[1][63], x[1][65], x[1][67], x[1][69], x[1][71], x[1][73], x[1][75], x[1][77], x[1][79], x[1][81], x[1][83], x[1][85], x[1][87], x[1][89], x[1][91], x[1][93], x[1][95], x[1][97], x[1][99], x[1][101], x[1][103], x[1][105], x[1][107], x[1][109], x[1][111], x[1][113], x[1][115], x[1][117], x[1][119], x[1][121], x[1][123], x[1][125], x[1][127], x[1][129], x[1][131], x[1][133], x[1][135], x[1][137], x[1][139], x[1][141], x[1][143], x[1][145], x[1][147], x[1][149], x[1][151], x[1][153], x[1][155], x[1][157], x[1][159], x[1][161], x[1][163], x[1][165], x[1][167])) | |
| #step6.map(lambda x: (x[0], len(x[1]))).collect() | |
| #print("Remove keys") | |
| #print(step7.takeSample(False, 2, 1)) | |
| df=spark.createDataFrame(step7) | |
| df.write.format("com.databricks.spark.csv").option("header", "false").save("data/full3.csv") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment