Skip to content

Instantly share code, notes, and snippets.

@ecerulm
Created October 7, 2020 11:52
Show Gist options
  • Save ecerulm/3454ddb5159ad90e84f746ec656e802c to your computer and use it in GitHub Desktop.
Save ecerulm/3454ddb5159ad90e84f746ec656e802c to your computer and use it in GitHub Desktop.
package com.tele2;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Collector;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetFileWriter;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class MyParquetTest implements Serializable {
public static void main(String[] args) throws Exception {
new MyParquetTest().start();
}
private void start() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
Configuration parameters = new Configuration();
Stream<String> stringStream = IntStream.range(1, 100).mapToObj(n -> String.format("Entry %d", n));
DataSet<String> text = env.fromCollection(stringStream.collect(Collectors.toCollection(ArrayList::new)));
Job job = Job.getInstance();
HadoopOutputFormat<Void,MyRecord> hadoopOutputFormat = new HadoopOutputFormat<>(new AvroParquetOutputFormat<MyRecord>(), job);
FileOutputFormat.setCompressOutput(job, false);
FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("./my-parquet"));
final Schema schema = new Schema.Parser().parse(MyRecord.class.getClassLoader().getResourceAsStream("schema.avsc"));
AvroParquetOutputFormat.setSchema(job, schema);
DataSet<Tuple2<Void, MyRecord>> text2 = text.map(new MapFunction<String, Tuple2<Void, MyRecord>>() {
@Override
public Tuple2<Void, MyRecord> map(String value) throws Exception {
return Tuple2.of(null, new MyRecord(value));
}
});
text2.output(hadoopOutputFormat);
env.execute("Flink Batch Java API Skeleton");
}
public static class MyRecord implements GenericRecord {
private static Schema schema;
static {
try {
schema = new Schema.Parser().parse(MyRecord.class.getClassLoader().getResourceAsStream("schema.avsc"));
} catch (IOException e) {
e.printStackTrace();
}
}
private final String value;
public MyRecord(String value) {
this.value= value;
}
@Override
public void put(String key, Object v) {
throw new NotImplementedException("You can't update this GenericRecord");
}
@Override
public Object get(String key) {
return this.value;
}
@Override
public void put(int i, Object v) {
throw new NotImplementedException("You can't update this GenericRecord");
}
@Override
public Object get(int i) {
return this.value;
}
@Override
public Schema getSchema() {
return schema;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment