Created
February 27, 2024 03:45
-
-
Save CurtHagenlocher/306865d4b4202906470f4f18fd410c4e to your computer and use it in GitHub Desktop.
Load SqlDataReader into Arrow RecordBatch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Apache.Arrow; | |
using Apache.Arrow.Types; | |
using Microsoft.Data.SqlClient; | |
using System.Data.Common; | |
using System.Data.SqlTypes; | |
namespace Loader | |
{ | |
internal class Program | |
{ | |
static void Main(string[] args) | |
{ | |
using var connection = new SqlConnection("server=.;database=test;trusted_connection=true;encrypt=false"); | |
connection.Open(); | |
using var command = connection.CreateCommand(); | |
command.CommandText = "select * from customer"; | |
using var reader = command.ExecuteReader(); | |
var schema = new Schema(new[] | |
{ | |
new Field("CUSTKEY", Int64Type.Default, false), | |
new Field("NAME", StringType.Default, false), | |
new Field("ADDRESS", StringType.Default, true), | |
new Field("NATIONKEY", Int64Type.Default, true), | |
new Field("PHONE", StringType.Default, true), | |
new Field("ACCTBAL", new Decimal128Type(20, 6), true), | |
new Field("MKTSEGMENT", StringType.Default, true), | |
new Field("COMMENT", StringType.Default, true), | |
}, null); | |
var builder0 = new Int64Array.Builder(); | |
var builder1 = new StringArray.Builder(); | |
var builder2 = new StringArray.Builder(); | |
var builder3 = new Int64Array.Builder(); | |
var builder4 = new StringArray.Builder(); | |
var builder5 = new Decimal128Array.Builder(new Decimal128Type(20, 6)); | |
var builder6 = new StringArray.Builder(); | |
var builder7 = new StringArray.Builder(); | |
int currentRow = 0; | |
int currentMax = 0; | |
const int pageSize = 2048; | |
while (reader.Read()) | |
{ | |
if (currentRow == currentMax) | |
{ | |
currentMax += pageSize; | |
builder0.Reserve(currentMax); | |
builder1.Reserve(currentMax); | |
builder2.Reserve(currentMax); | |
builder3.Reserve(currentMax); | |
builder4.Reserve(currentMax); | |
builder5.Reserve(currentMax); | |
builder6.Reserve(currentMax); | |
builder7.Reserve(currentMax); | |
} | |
builder0.Append(reader.Int64(0)); | |
builder1.Append(reader.String(1)); | |
builder2.Append(reader.String(2)); | |
builder3.Append(reader.Int64(3)); | |
builder4.Append(reader.String(4)); | |
SqlDecimal? column5 = reader.Decimal(5); | |
if (column5.HasValue) { builder5.Append(column5.Value); } else { builder5.AppendNull(); } | |
builder6.Append(reader.String(6)); | |
builder7.Append(reader.String(7)); | |
currentRow++; | |
} | |
RecordBatch batch = new RecordBatch(schema, new IArrowArray[] | |
{ | |
builder0.Build(), | |
builder1.Build(), | |
builder2.Build(), | |
builder3.Build(), | |
builder4.Build(), | |
builder5.Build(), | |
builder6.Build(), | |
builder7.Build(), | |
}, builder0.Length); | |
System.Console.WriteLine("done"); | |
} | |
} | |
static class Extensions | |
{ | |
public static long? Int64(this DbDataReader reader, int position) | |
{ | |
if (reader.IsDBNull(position)) { return null; } | |
return reader.GetInt64(position); | |
} | |
public static string String(this DbDataReader reader, int position) | |
{ | |
if (reader.IsDBNull(position)) { return null; } | |
return reader.GetString(position); | |
} | |
public static SqlDecimal? Decimal(this SqlDataReader reader, int position) | |
{ | |
if (reader.IsDBNull(position)) { return null; } | |
return reader.GetSqlDecimal(position); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment