Last active
February 27, 2018 20:14
-
-
Save dfdeshom/3865731e3fe36b79c7a1f516ddef9cc1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
import json | |
from mrjob.job import MRJob | |
import random | |
class TestStdIn(MRJob): | |
@staticmethod | |
def candidate_key(doc): | |
return doc.get('candidate_id') | |
@staticmethod | |
def candidate_resume_key(doc): | |
return doc['candidate_id'], doc['resume_id'] | |
def most_recent_docs(self, rdd): | |
tc = 'time_created' | |
return rdd.reduceByKey(lambda x, y: x if x[tc] >= y[tc] else y) | |
def merge_candidate_resume(self, candidate_id, candidate, resume): | |
if not candidate or not resume: | |
return | |
return candidate | |
def spark(self, input_path, output_path): | |
# self.stdin = None | |
# self.stdout = None | |
import pyspark | |
spark = (pyspark.sql.SparkSession.builder | |
.appName(self.__class__.__name__) | |
.config("spark.scheduler.mode", "FAIR").getOrCreate()) | |
sc = spark.sparkContext | |
resumes0 = [{'xml':'xml','time_created':1.0, | |
'candidate_id':11, 'resume_id':random.randint(0,1)}] * 50 | |
candidates0 = [{'time_created':1.0, 'candidate_id':11, }] * 50 | |
resumes = (self.most_recent_docs(sc.parallelize(resumes0) | |
.keyBy(self.candidate_resume_key)) | |
.values() | |
.keyBy(self.candidate_key) | |
) | |
candidates = (self.most_recent_docs(sc.parallelize(candidates0) | |
.keyBy(self.candidate_key)) | |
) | |
candidate_resume = (candidates.join(resumes) | |
.map(lambda (candidate_id, (candidate, resume)): | |
self.merge_candidate_resume(candidate_id, | |
candidate, | |
resume)) | |
.filter(lambda x: x) | |
#.coalesce(500) | |
) | |
candidate_resume.cache() | |
print candidate_resume.take(3) | |
sc.stop() | |
if __name__ == "__main__": | |
TestStdIn.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment