Skip to content

Instantly share code, notes, and snippets.

@brachi-wernick
Last active February 10, 2019 15:10
Show Gist options
  • Save brachi-wernick/90494be6f54f44d19a75cdc008c8e100 to your computer and use it in GitHub Desktop.
Save brachi-wernick/90494be6f54f44d19a75cdc008c8e100 to your computer and use it in GitHub Desktop.
ParDO with a tag
@ProcessElement
public void processElement(@Element JsonNode> element, OutputReceiver<TableRowWithEvent> out) {
public static final TupleTag<JsonNode> MAIN_OUT = new TupleTag<JsonNode>() {};
public static final TupleTag<BigQueryProcessError> DEADLETTER_OUT = new TupleTag<BigQueryProcessError>() {};
TableRow convertedRow = new TableRow();
try {
insertLong(element.get("server_time"), "server_time", convertedRow);
insertFloat(element.get("screen_dpi"), "screen_dpi", convertedRow);
// more transformation to come
context.output(output);
} catch (Exception e) {
logger.error("Failed transform "+e.getMessage(),e);
context.output(DEADLETTER_OUT, new BigQueryProcessError(convertedRow.toString(), e.getMessage(), ERROR_TYPE.BQ_PROCESS, originEvent));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment