Last active
November 17, 2024 03:04
-
-
Save mldisibio/3ff948ceb95fd8ac2a9cf34403287f13 to your computer and use it in GitHub Desktop.
Demo of legacy APM pattern for DuckDB.Net to invoke an async table-value-function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Experimental("DuckDBNET001")] | |
async Task Main() | |
{ | |
using var duckDb = new DuckDBConnection("Data Source=:memory:"); | |
duckDb.Open(); | |
// create and fill a table to demonstrate a TVF joined to a table | |
LoadPlayers(duckDb, "Player"); | |
// register the function | |
RegisterTVH(duckDb); | |
// show just the TVH | |
using var cmd = duckDb.CreateCommand(); | |
cmd.CommandText = "SELECT * FROM get_user_credit('Player', 25);"; | |
var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false); | |
SimplePrint((DuckDBDataReader)reader); | |
// demo the TVH joined to a table | |
cmd.CommandText = """ | |
SELECT p.UserId, p.Avatar, c.Credits | |
FROM get_user_credit('Player', 25) c | |
LEFT OUTER JOIN Players p ON c.UserName = p.UserId; | |
"""; | |
reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false); | |
SimplePrint((DuckDBDataReader)reader); | |
} | |
public DuckDBConnection LoadPlayers(DuckDBConnection duckDb, string userCategory) | |
{ | |
using var cmd = duckDb.CreateCommand(); | |
cmd.CommandText = "CREATE TABLE Players(UserId text, Avatar text);"; | |
cmd.ExecuteNonQuery(); | |
string[] avatars = ["Happy", "Joyful", "Cheerful", "Bouncy", "Silly", "Witty", "Zany", "Quirky", "Jolly", "Playful"]; | |
using (var appender = duckDb.CreateAppender("Players")) | |
{ | |
for (int i = 0; i < 10; i++) | |
appender.CreateRow().AppendValue($"{userCategory}{i + 1:D2}").AppendValue(avatars[i]).EndRow(); | |
} | |
return duckDb; | |
} | |
[Experimental("DuckDBNET001")] | |
public DuckDBConnection RegisterTVH(DuckDBConnection duckDb) | |
{ | |
duckDb.RegisterTableFunction<string, int>("get_user_credit", | |
(parameters) => | |
{ | |
string userCategory = parameters[0].GetValue<string>(); | |
int startingCredit = parameters[1].GetValue<int>(); | |
var iar = BeginGetUserCredits(userCategory, startingCredit, null, null); | |
// also blocks (like .GetAwaiter().GetResult()) but hopefully is now a background thread not blocking the main thread | |
iar.AsyncWaitHandle.WaitOne(); | |
var results = EndGetUserCredits(iar); | |
iar.AsyncWaitHandle.Close(); | |
return new TableFunction([new ColumnInfo("UserName", typeof(string)), new ColumnInfo("Credits", typeof(int))], results); | |
}, | |
(item, writers, rowIndex) => | |
{ | |
var person = item as UserCredit; | |
writers[0].WriteValue(person?.Name, rowIndex); | |
writers[1].WriteValue(person?.Credits, rowIndex); | |
}); | |
return duckDb; | |
} | |
// Wraps the row returned by the table-value-function | |
public record UserCredit(string Name, int Credits); | |
// Sample async method returning an IEnumerable<UserCredit> retrieved asynchronously. | |
public static async Task<List<UserCredit>> GetUserCreditsAsync(string userCategory, int startingCredit) | |
{ | |
Random random = new Random(); | |
var data = new List<UserCredit>(); | |
for (int i = 0; i < 10; i++) | |
{ | |
string name = $"{userCategory}{i + 1:D2}"; | |
int age = startingCredit + random.Next(1, 50); | |
data.Add(new UserCredit(name, age)); | |
// Simulate asynchronous operation | |
await Task.Delay(100); | |
} | |
return data; | |
} | |
void SimplePrint(DuckDBDataReader reader) | |
{ | |
Console.WriteLine("--------------------------------------------"); | |
while (reader.Read()) | |
{ | |
for (int i = 0; i < reader.FieldCount; i++) | |
Console.Write($"{reader.GetValue(i),-12}"); | |
Console.WriteLine(); | |
} | |
Console.WriteLine("--------------------------------------------"); | |
} | |
// Convert the async (TAP) function call to the "pre-async" Begin/End (APM) pattern. | |
public IAsyncResult BeginGetUserCredits(string userCategory, int startingCredit, AsyncCallback? callback, object? state) | |
=> GetUserCreditsAsync(userCategory, startingCredit).AsApm(callback, state); | |
public List<UserCredit> EndGetUserCredits(IAsyncResult asyncResult) | |
=> ((Task<List<UserCredit>>)asyncResult).Result; | |
// see https://devblogs.microsoft.com/pfxteam/using-tasks-to-implement-the-apm-pattern/ | |
// https://learn.microsoft.com/en-us/dotnet/api/system.iasyncresult?view=net-9.0 | |
// https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/interop-with-other-asynchronous-patterns-and-types#from-tap-to-apm | |
public static class ApmExtensions | |
{ | |
public static IAsyncResult AsApm<T>(this Task<T> task, AsyncCallback? callback, object? state) | |
{ | |
if (task == null) | |
throw new ArgumentNullException("task"); | |
var tcs = new TaskCompletionSource<T>(state); | |
task.ContinueWith(t => | |
{ | |
if (t.IsFaulted) | |
tcs.TrySetException(t.Exception.InnerExceptions); | |
else if (t.IsCanceled) | |
tcs.TrySetCanceled(); | |
else | |
tcs.TrySetResult(t.Result); | |
if (callback != null) | |
callback(tcs.Task); | |
}, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default); | |
return tcs.Task; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Should print something like: