Created
June 2, 2014 09:22
-
-
Save cotdp/b3512dd1328f10ee9257 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spark.examples | |
import org.apache.spark._ | |
import org.apache.spark.SparkContext._ | |
import org.json4s.jackson.JsonMethods | |
import org.json4s.jackson.JsonMethods._ | |
import org.json4s.JsonAST._ | |
object HDFSDeleteExample { | |
def main(args: Array[String]): Unit = { | |
val sc = new SparkContext("local[8]", "HDFSDeleteExample") | |
// This is our HDFS output path | |
val output = "hdfs://localhost:9000/tmp/wimbledon_top_mentions" | |
// Setup HDFS, you can manipulate the config used by your application to override the defaults | |
val hadoopConf = new org.apache.hadoop.conf.Configuration() | |
hadoopConf.setInt( "dfs.block.size", 1073741824 ) // Like a 1G block size | |
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf) | |
// Produces a RDD[String] of JSON messages from Twitter | |
val lines = sc.textFile("wimbledon/*.json.gz") | |
// Extract the number of 'mentions' for each user in the dataset | |
val top_mentions = lines.flatMap(line => { | |
try { | |
Seq( parse(line) ) | |
} catch { case _ : Throwable => { Seq() } } // Ignore any exceptions (broken lines etc.) | |
}).flatMap(json => { | |
val user_mentions = json \ "entities" \ "user_mentions" | |
user_mentions.children | |
}).map(user_mention => { | |
val screen_name = compact(render(user_mention \ "screen_name")) | |
( screen_name, 1 ) | |
}).reduceByKey(_+_).cache() | |
// Delete the existing path, ignore any exceptions thrown if the path doesn't exist | |
try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch { case _ : Throwable => { } } | |
top_mentions.saveAsTextFile(output) | |
// Print the top-100 mentioned users | |
top_mentions.top(100)(Ordering.by(_._2)).foreach(println) | |
/* Result: | |
("Wimbledon",291562) | |
("serenawilliams",42305) | |
("andy_murray",28684) | |
("frankieboyle",24653) | |
("BBCSport",16664) | |
("FedererNews",12880) | |
("Georgia_Ford",12235) | |
("ATPWorldTour",12177) | |
("espn",11494) | |
("TenisExtra",11338) | |
("DjokerNole",10815) | |
("RafaelNadal",9338) | |
("piersmorgan",8721) | |
("100porcientenis",8106) | |
("TeamRFederer",6837) | |
("SuperSportBlitz",6620) | |
("Charles_HRH",6540) | |
*/ | |
sc.stop() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment