Last active
February 5, 2020 12:07
-
-
Save brachi-wernick/f7c4e4f860b4a73655e48caa5105909c to your computer and use it in GitHub Desktop.
error handling BQ insert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
WriteResult writeResult = tableRowToInsertCollection | |
.apply("BQ-write", BigQueryIO.write() | |
// specify that failed rows will be returned with their error | |
.withExtendedErrorInfo() | |
.to(tableSpec) | |
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) | |
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) | |
//Specfies a policy for handling failed inserts. | |
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())); | |
// write failed rows with their error to error table | |
writeResult | |
.getFailedInsertsWithErr() | |
.apply(ParDo.of(//prepare table row by the error) | |
.apply("BQ-insert-error-write", BigQueryIO.writeTableRows() | |
.to(errTableSpec) | |
.withJsonSchema(errSchema) | |
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) | |
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment