Skip to content

Instantly share code, notes, and snippets.

@pixie79
Created June 28, 2013 16:03
Show Gist options
  • Save pixie79/5885831 to your computer and use it in GitHub Desktop.
Save pixie79/5885831 to your computer and use it in GitHub Desktop.
register /home/hadoop/lib/pig/piggybank.jar
register jar/datafu-0.0.10.jar
register jar/guava-14.0.1.jar;
define Sessionize datafu.pig.sessions.Sessionize('30m');
define UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
define Max org.apache.pig.piggybank.evaluation.math.Max();
define Median datafu.pig.stats.Median();
define Quantile datafu.pig.stats.StreamingQuantile('0.75','0.90','0.95');
define VAR datafu.pig.stats.VAR();
pv = load 'pixel.tsv' as (time:long, segment:int, user:long);
pv = foreach pv
generate UnixToISO(time) as isoTime,
time,
user,
segment;
pv_sessionized = foreach (group pv by user) {
ordered = order pv by isoTime;
generate flatten(Sessionize(ordered)) as (isoTime, time, user, segment, sessionId);
};
gpv_sessionized = GROUP pv_sessionized by sessionId;
last_segment_per_sessionId = FOREACH gpv_sessionized GENERATE
FLATTEN(group) AS (sessionId,user),
MAX(pv_sessionized.time) AS max_time;
last_segment_per_sessionId = FOREACH gpv_sessionized GENERATE
FLATTEN(group) AS sessionId,
MAX(pv_sessionized.time) AS max_time;
pv_join = join pv_sessionized by sessionId, last_segment_per_sessionId BY sessionId;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment