You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
""" Counts words in new text files created in the given directory Usage: hdfs_wordcount.py <directory> <directory> is the directory that Spark Streaming will use to find and read new text files. To run this on your local machine on directory `localdir`, run this example $ bin/spark-submit examples/src/main/python/streaming/hdfs_wordcount.py localdir Then create a text file in `localdir` and the words in the file will get counted."""importsysfrompysparkimportSparkContextfrompyspark.streamingimportStreamingContextif__name__=="__main__":
iflen(sys.argv) !=2:
print>>sys.stderr, "Usage: hdfs_wordcount.py <directory>"exit(-1)
sc=SparkContext(appName="PythonStreamingHDFSWordCount")
ssc=StreamingContext(sc, 1)
lines=ssc.textFileStream(sys.argv[1])
counts=lines.flatMap(lambdaline: line.split(" "))\
.map(lambdax: (x, 1))\
.reduceByKey(lambdaa, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
Sort
importsysfrompysparkimportSparkContextif__name__=="__main__":
iflen(sys.argv) !=2:
print>>sys.stderr, "Usage: sort <file>"exit(-1)
sc=SparkContext(appName="PythonSort")
lines=sc.textFile(sys.argv[1], 1)
sortedCount=lines.flatMap(lambdax: x.split(' ')) \
.map(lambdax: (int(x), 1)) \
.sortByKey(lambdax: x)
# This is just a demo on how to bring all the sorted data back to a single node.# In reality, we wouldn't want to collect all the data to the driver node.output=sortedCount.collect()
for (num, unitcount) inoutput:
printnumsc.stop()