|
-- load the amazon piggybank.jar and the standard piggybank.jar |
|
|
|
register ../piggybank-0.3-amzn.jar |
|
register ../piggybank.jar |
|
|
|
-- function definitions. |
|
|
|
DEFINE DATE_TIME org.apache.pig.piggybank.evaluation.datetime.DATE_TIME(); |
|
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT(); |
|
DEFINE FORMAT_DT org.apache.pig.piggybank.evaluation.datetime.FORMAT_DT(); |
|
DEFINE LOWER org.apache.pig.piggybank.evaluation.string.LOWER(); |
|
|
|
-- load all of the parsed tweets as id, timestamp, screenname, tweet |
|
raw = LOAD '../sample.txt' USING PigStorage('\t') AS (id:chararray,timestamp:chararray,screenname:chararray,tweet:chararray); |
|
|
|
fltr = FILTER raw BY tweet matches '.*\\#\\p{Alpha}.*?'; |
|
|
|
extrctd = FOREACH fltr GENERATE FORMAT_DT('YYYYMMddHH', |
|
DATE_TIME(timestamp, 'EEE MMM dd HH:mm:ss Z yyyy', 'UTC')) as timestamp, |
|
FLATTEN(org.apache.pig.piggybank.evaluation.string.RegexExtract(LOWER(tweet),'.*(\\#\\p{Alpha}.*?\\b)',1)) as (tweet:chararray); |
|
|
|
grpd = GROUP extrctd BY (timestamp,tweet); |
|
|
|
cntd = FOREACH grpd GENERATE FLATTEN(group.(timestamp,tweet)) as (timestamp,tweet),COUNT($1) as cnt; |
|
|
|
-- group the grouped and counted results by timestamp, so that we can limit |
|
-- to the top n terms per period |
|
regrpd = GROUP cntd BY timestamp; |
|
|
|
|
|
srtd = FOREACH regrpd{ |
|
ordrd = order cntd BY cnt DESC; |
|
limitd = LIMIT ordrd 10; |
|
generate FLATTEN(limitd);} |
|
dump srtd; |