Skip to content

Instantly share code, notes, and snippets.

@brachi-wernick
Last active February 5, 2020 12:07
Show Gist options
  • Save brachi-wernick/f7c4e4f860b4a73655e48caa5105909c to your computer and use it in GitHub Desktop.
Save brachi-wernick/f7c4e4f860b4a73655e48caa5105909c to your computer and use it in GitHub Desktop.
error handling BQ insert
WriteResult writeResult = tableRowToInsertCollection
.apply("BQ-write", BigQueryIO.write()
// specify that failed rows will be returned with their error
.withExtendedErrorInfo()
.to(tableSpec)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
//Specfies a policy for handling failed inserts.
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
// write failed rows with their error to error table
writeResult
.getFailedInsertsWithErr()
.apply(ParDo.of(//prepare table row by the error)
.apply("BQ-insert-error-write", BigQueryIO.writeTableRows()
.to(errTableSpec)
.withJsonSchema(errSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment