Skip to content

Instantly share code, notes, and snippets.

@costin
Last active June 30, 2016 04:56
Show Gist options
  • Save costin/8025827 to your computer and use it in GitHub Desktop.
Save costin/8025827 to your computer and use it in GitHub Desktop.
download the zip from: http://download.elasticsearch.org/hadoop/short-video-1/apache.zip
ADD JAR file:///demo/code/lib/elasticsearch-hadoop-1.3.0.M1.jar;
CREATE TABLE logs (type STRING, time STRING, ext STRING, ip STRING, req STRING, res INT, bytes INT, phpmem INT, agent STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
LOAD DATA INPATH '/demo/apache.log' OVERWRITE INTO TABLE logs;
CREATE EXTERNAL TABLE eslogs (time STRING, extension STRING, clientip STRING, request STRING, response INT, agent STRING)
STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler'
TBLPROPERTIES('es.resource' = 'demo/hive',
'es.mapping.names' = 'time:@timestamp');
INSERT OVERWRITE TABLE eslogs SELECT s.time, s.ext, s.ip, s.req, s.res, s.agent FROM logs s;
A = LOAD 'hdfs:///demo/apache.log' USING PigStorage() AS
(type:chararray, time:chararray, extension:chararray, clientip:chararray, request:chararray, response:long, bytes:long, phpmemory:long, agent:chararray);
B = FOREACH A GENERATE time, type, extension, clientip, request, response, bytes, agent;
STORE B INTO 'demo/pig' USING org.elasticsearch.hadoop.pig.ESStorage('es.mapping.names=time:@timestamp');
package demo;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.ESOutputFormat;
public class SimpleJob extends Configured implements Tool {
public static class Tokenizer extends MapReduceBase implements
Mapper<LongWritable, Text, LongWritable, MapWritable> {
private final MapWritable map = new MapWritable();
private final Text[] fields = new Text[] {
new Text("type"), new Text("@timestamp"), new Text("extension"),
new Text("clientip"), new Text("request"), new Text("response"),
new Text("bytes"), new Text("phpmemory"), new Text("agent") };
@Override
public void map(LongWritable key, Text value, OutputCollector<LongWritable, MapWritable> output, Reporter reporter)
throws IOException {
map.clear();
StringTokenizer st = new StringTokenizer(value.toString(), "\t");
for (Text field : fields) {
map.put(field, new Text(st.nextToken()));
}
output.collect(key, map);
}
}
@Override
public int run(String[] args) throws Exception {
JobConf job = new JobConf(getConf(), SimpleJob.class);
job.setJobName("demo-mapreduce");
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(ESOutputFormat.class);
job.setMapperClass(Tokenizer.class);
job.setMapOutputValueClass(MapWritable.class);
job.setSpeculativeExecution(false);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.set("es.resource", args[1]);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new SimpleJob(), args);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment