Skip to content

Instantly share code, notes, and snippets.

@linroex
Last active July 15, 2016 08:36
Show Gist options
  • Save linroex/8e38f9cfb5c24f28dee35fbe44f93116 to your computer and use it in GitHub Desktop.
Save linroex/8e38f9cfb5c24f28dee35fbe44f93116 to your computer and use it in GitHub Desktop.
import java.util.List;
import java.util.ArrayList;
import java.sql.Timestamp;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TableFieldSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.joda.time.Duration;
import org.joda.time.Instant;
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("system_time").setType("STRING"));
fields.add(new TableFieldSchema().setName("client_timezone").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("client_time").setType("STRING"));
fields.add(new TableFieldSchema().setName("event").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
p.apply(TextIO.Read
.from("gs://****/2016-5-*-*/*")
.withCoder(TableRowJsonCoder.of())
.named("Read Files"))
.apply(ParDo.of(new DoFn<TableRow, TableRow>(){
@Override
public void processElement(ProcessContext c){
TableRow log = new TableRow();
log.put("system_time", c.element().get("@system_time").toString());
log.put("client_timezone", c.element().get("@client_timezone").toString());
log.put("client_time", c.element().get("@client_time").toString());
log.put("event", c.element().get("@event").toString());
c.outputWithTimestamp(log, new Instant(Long.parseLong(log.get("system_epoch_time").toString())));
}
}))
.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(BigQueryIO.Write
.to("****")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
);
p.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment