Created
November 16, 2018 21:06
-
-
Save boristyukin/8703d2c6ec55d6787843aa133920bf01 to your computer and use it in GitHub Desktop.
kudu getPendingErrors issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kudu.ColumnSchema | |
import org.apache.kudu.Schema | |
import org.apache.kudu.Type | |
import org.apache.kudu.client.AlterTableOptions | |
import org.apache.kudu.client.CreateTableOptions | |
import org.apache.kudu.client.Insert | |
import org.apache.kudu.client.KuduClient | |
import org.apache.kudu.client.KuduException | |
import org.apache.kudu.client.KuduPredicate | |
import org.apache.kudu.client.KuduPredicate.ComparisonOp | |
import org.apache.kudu.client.KuduScanner | |
import org.apache.kudu.client.KuduSession | |
import org.apache.kudu.client.KuduTable | |
import org.apache.kudu.client.PartialRow | |
import org.apache.kudu.client.RowResult | |
import org.apache.kudu.client.RowResultIterator | |
import org.apache.kudu.client.OperationResponse | |
import org.apache.kudu.client.SessionConfiguration | |
import java.time.Instant | |
def tableName = "kudu_groovy_example" | |
def KUDU_MASTERS = "localhost:7051" | |
void createExampleTable(KuduClient client, String tableName) { | |
// Set up a simple schema. | |
def columns = [ | |
new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(), | |
new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).nullable(true).build(), | |
new ColumnSchema.ColumnSchemaBuilder("dt_tm", Type.UNIXTIME_MICROS).nullable(false).build() | |
] | |
def schema = new Schema(columns) | |
// Set up the partition schema, which distributes rows to different tablets by hash. | |
// Kudu also supports partitioning by key range. Hash and range partitioning can be combined. | |
// For more information, see http://kudu.apache.org/docs/schema_design.html. | |
CreateTableOptions cto = new CreateTableOptions() | |
int numBuckets = 8 | |
cto.addHashPartitions(["key"], numBuckets) | |
// Create the table. | |
client.createTable(tableName, schema, cto) | |
println("Created table " + tableName) | |
} | |
void insertRows(KuduClient client, String tableName, int numRows) throws KuduException { | |
// Open the newly-created table and create a KuduSession. | |
start = Instant.now().toEpochMilli() | |
KuduTable table = client.openTable(tableName) | |
KuduSession session = client.newSession() | |
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) | |
println("Inserting $numRows rows in ${session.getFlushMode()} flush mode ...") | |
for (i in 1..numRows) { | |
def insert = table.newInsert() | |
def row = insert.getRow() | |
row.addInt("key", i) | |
// Make even-keyed row have a null 'value'. | |
if (i % 2 == 0) { | |
row.setNull("value") | |
} | |
else { | |
row.addString("value", "value " + i) | |
} | |
// fake a error row: | |
if (!(i==2)) { | |
row.addLong("dt_tm", System.currentTimeMillis() * 1000) | |
} | |
println(row.toString()) | |
response = session.apply(insert) | |
// session.flush() | |
// response will be NULL for AUTO_FLUSH_BACKGROUND | |
// if (response && response.hasRowError()) { | |
// println(response.rowError.toString()) | |
// } | |
} | |
// Call session.close() to end the session and ensure the rows are | |
// flushed and errors are returned. | |
// You can also call session.flush() to do the same without ending the session. | |
// When flushing in AUTO_FLUSH_BACKGROUND mode (the default mode recommended | |
// for most workloads, you must check the pending errors as shown below, since | |
// write operations are flushed to Kudu in background threads. | |
session.close() | |
if (session.countPendingErrors() != 0) { | |
println("${session.countPendingErrors()} errors inserting rows") | |
roStatus = session.getPendingErrors() | |
errs = roStatus.getRowErrors() | |
numErrs = Math.min(errs.length, 10) | |
println("there were errors inserting rows to Kudu") | |
println("the first few errors follow:") | |
for (int i = 0; i < numErrs; i++) { | |
println(errs[i]) | |
} | |
if (roStatus.isOverflowed()) { | |
println("error buffer overflowed: some errors were discarded") | |
} | |
throw new RuntimeException("error inserting rows to Kudu"); | |
} | |
// session.flush() | |
duration = Instant.now().toEpochMilli() - start | |
println("$numRows rows inserted in $duration ms") | |
} | |
long countRows(KuduClient client, String tableName) throws KuduException { | |
start = Instant.now().toEpochMilli() | |
table = client.openTable(tableName) | |
schema = table.getSchema() | |
int counter = 0 | |
scanBuilder = table.getAsyncClient().syncClient().newScannerBuilder(table) | |
// for (KuduPredicate predicate : predicates) { | |
// scanBuilder.addPredicate(predicate) | |
// } | |
scanBuilder.setProjectedColumnIndexes([0]) | |
scanner = scanBuilder.build() | |
while (scanner.hasMoreRows()) { | |
counter += scanner.nextRows().getNumRows() | |
} | |
duration = Instant.now().toEpochMilli() - start | |
println("Rows counted in $duration ms") | |
return counter | |
} | |
void scanTableToStrings(KuduClient client, String tableName) throws Exception { | |
table = client.openTable(tableName) | |
rowStrings = [] | |
scanner = table.getAsyncClient().syncClient().newScannerBuilder(table).build() | |
while (scanner.hasMoreRows()) { | |
rows = scanner.nextRows() | |
for (RowResult r : rows) { | |
rowStrings.add(r.rowToString()) | |
} | |
} | |
for (row in rowStrings) { | |
println(row) | |
} | |
// Collections.sort(rowStrings); | |
// return rowStrings | |
} | |
// Init Kudu client | |
start = Instant.now().toEpochMilli() | |
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTERS).build() | |
duration = Instant.now().toEpochMilli() - start | |
println("Client connected in $duration ms") | |
// Get a list of Kudu tables | |
//client.getTablesList().getTablesList().each {table -> | |
// println(table) | |
//} | |
if (client.tableExists(tableName)) { | |
client.deleteTable(tableName) | |
} | |
createExampleTable(client, tableName) | |
try { | |
insertRows(client,tableName,10)} | |
catch (e) | |
{ | |
} | |
println("Table has ${countRows(client, tableName)} rows") | |
scanTableToStrings(client, tableName) | |
client.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment