Skip to content

Instantly share code, notes, and snippets.

@zelid
Last active August 3, 2023 17:23
Show Gist options
  • Save zelid/6965002 to your computer and use it in GitHub Desktop.
Save zelid/6965002 to your computer and use it in GitHub Desktop.
Examples of BulkInsert for PostgreSQL, MySQL and MS SQL using ServiceStack OrmLite. Work in progress...
public static void BulkInsertNpgsql<T>(this IDbConnection dbConn, IEnumerable<T> list, IEnumerable<string> insertFields = null)
{
if (list == null) return;
if (list.Count() < 1) return;
var objWithAttributes = list.FirstOrDefault();
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType());
if (insertFields == null) insertFields = new List<string>();
var sbColumnNames = new StringBuilder();
var sbColumnValues = new StringBuilder();
var targetTableName = "";
bool isFirstRecord = true;
var insertFieldsCount = insertFields.Count();
foreach (var record in list)
{
if (isFirstRecord) targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef);
var defsCount = modelDef.FieldDefinitions.Count;
var counter = 0;
foreach (var fieldDef in modelDef.FieldDefinitions)
{
counter += 1;
if (fieldDef.IsComputed) continue;
if (fieldDef.AutoIncrement) continue;
//insertFields contains Attribute "Name" of fields to insert ( that's how expressions work )
if (insertFieldsCount > 0 && !insertFields.Contains(fieldDef.Name)) continue;
if (sbColumnNames.Length > 0 && isFirstRecord) sbColumnNames.Append(",");
try
{
if (isFirstRecord) sbColumnNames.Append(OrmLiteConfig.DialectProvider.GetQuotedColumnName(fieldDef.FieldName));
var stringValue = "";
var value = fieldDef.GetValue(record);
if (value == null) stringValue = "\\N";
else if (fieldDef.FieldType == typeof(DateTime))
{
var dateValue = (DateTime)value;
const string iso8601Format = "yyyy-MM-dd HH:mm:ss.fff";
stringValue = dateValue.ToString(iso8601Format);
}
else if (fieldDef.FieldType == typeof(Guid))
{
var guidValue = (Guid)value;
stringValue = guidValue.ToString();
}
else
{
stringValue = value.ToString();
}
sbColumnValues.Append(stringValue);
if (counter < defsCount)
{
sbColumnValues.Append("\t");
}
}
catch (Exception ex)
{
throw;
}
}
sbColumnValues.Append("\r\n");
isFirstRecord = false;
}
var strColumnValues = sbColumnValues.ToString();
byte[] byteArrayColumnValues = System.Text.Encoding.UTF8.GetBytes(strColumnValues);
MemoryStream msColumnValues = new MemoryStream(byteArrayColumnValues);
//var connection = dbConn as NpgsqlConnection;
var ormliteConn = dbConn as OrmLiteConnection;
var connection = ormliteConn.DbConnection as NpgsqlConnection;
//var connection = (NpgsqlConnection)dbConn.Database.ToDbConnection();
//var connection = dbConn.DbConnection;
using (var command = new NpgsqlCommand(string.Format("COPY {0} ({1}) FROM STDIN DELIMITER '\t' ", targetTableName, sbColumnNames), connection))
{
var cin = new NpgsqlCopyIn(command, connection, msColumnValues);
try
{
cin.Start();
}
catch (Exception e)
{
try
{
cin.Cancel("Undo copy");
}
catch (NpgsqlException e2)
{
// we should get an error in response to our cancel request:
if (!("" + e2).Contains("Undo copy"))
{
throw new Exception("Failed to cancel copy: " + e2 + " upon failure: " + e);
}
}
throw e;
}
}
}
private static string ToMySqlBulkInsertValue<T>(this FieldDefinition fieldDef, T record)
{
var stringValue = "";
var value = fieldDef.GetValue(record);
if (value == null) stringValue = "";
else if (fieldDef.FieldType == typeof(DateTime))
{
var dateValue = (DateTime)value;
const string iso8601Format = "yyyy-MM-dd HH:mm:ss.fff";
stringValue = dateValue.ToString(iso8601Format);
}
else if (fieldDef.FieldType == typeof(Guid))
{
var guidValue = (Guid)value;
stringValue = guidValue.ToString();
}
else
{
stringValue = value.ToString();
}
return stringValue;
}
// http://theonetechnologies.com/outsourcing/post/mysql-bulk-data-import-using-net-connector-mysqlbulkloader-class.aspx
public static void BulkInsertMySql<T>(this IDbConnection myConnection, IEnumerable<T> list/*, IEnumerable<string> insertFields = null*/)
{
if (list == null) return;
if (list.Count() < 1) return;
var objWithAttributes = list.FirstOrDefault();
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType());
var targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef);
var Server = HttpContext.Current.Server;
string strFile = targetTableName + "_" + DateTime.Now.Ticks.ToString() + ".csv";
//Create directory if not exist... Make sure directory has required rights..
if (!Directory.Exists(Server.MapPath("~/TempFolder/")))
Directory.CreateDirectory(Server.MapPath("~/TempFolder/"));
strFile = Server.MapPath("~/TempFolder/") + strFile;
//If file does not exist then create it and right data into it..
if (!File.Exists(strFile))
{
FileStream fs = new FileStream(strFile, FileMode.Create, FileAccess.Write);
fs.Close();
fs.Dispose();
}
bool isFirstRecord = true;
StreamWriter sw = new StreamWriter(strFile, false);
//var insertFieldsCount = insertFields.Count();
var defsCount = modelDef.FieldDefinitions.Count;
foreach (var record in list)
{
if (isFirstRecord)
{
sw.Write(string.Join("\t", modelDef.FieldDefinitions.Select(x => x.Name)));
sw.Write(sw.NewLine);
isFirstRecord = false;
}
sw.Write(string.Join("\t", modelDef.FieldDefinitions.Select(x => x.ToMySqlBulkInsertValue<T>(record))));
sw.Write(sw.NewLine);
}
sw.Close();
sw.Dispose();
var connection = myConnection as MySqlConnection;
var bl = new MySqlBulkLoader(connection);
bl.TableName = targetTableName;
bl.FieldTerminator = "\t";
bl.LineTerminator = sw.NewLine;
bl.FileName = strFile;
bl.NumberOfLinesToSkip = 1;
var inserted = bl.Load();
}
public static void BulkInsertMsSql<T>(this IDbConnection dbConn, IEnumerable<T> list)
{
if (list == null) return;
if (list.Count() < 1) return;
var objWithAttributes = list.FirstOrDefault();
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType());
var targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef);
using (var bulkCopy = new SqlBulkCopy(dbConn as SqlConnection))
{
bulkCopy.BatchSize = list.Count();
bulkCopy.DestinationTableName = targetTableName;
var table = new DataTable();
foreach (var fieldDef in modelDef.FieldDefinitions)
{
if (fieldDef.IsComputed) continue;
if (fieldDef.AutoIncrement) continue;
bulkCopy.ColumnMappings.Add(fieldDef.Name, fieldDef.Name);
table.Columns.Add(fieldDef.Name, Nullable.GetUnderlyingType(fieldDef.FieldType) ?? fieldDef.FieldType);
}
foreach (var record in list)
{
//var values = new List<object>();
//var dataRows = new List<DataRow>();
DataRow row = table.NewRow();
foreach (var fieldDef in modelDef.FieldDefinitions)
{
if (fieldDef.IsComputed) continue;
if (fieldDef.AutoIncrement) continue;
//values.Add(fieldDef.GetValue(record));
//values.Add(SqlServerDialect.Provider.ConvertDbValue(fieldDef.GetValue(record), fieldDef.FieldType));
//row.
row[fieldDef.Name] = SqlServerDialect.Provider.ConvertDbValue(fieldDef.GetValue(record), fieldDef.FieldType); //Nullable.GetUnderlyingType(fieldDef.FieldType) ?? fieldDef.FieldType;
}
table.Rows.Add(row);
//table.Rows.Add(values);
}
bulkCopy.WriteToServer(table);
}
}
@zelid
Copy link
Author

zelid commented Mar 2, 2021

Thanks for the examples, is this released under an OSS license?

Yep. OSS. @mythz Feel free to integrate it to OrmLite if needed.

Funny thing that I completely forgot about this Gist and found a link to it on https://forums.servicestack.net/t/how-to-insert-multiple-records-with-1-insert/6494/2 when was googling how to do a bulk insert with OrmLite ;-)

@zelid
Copy link
Author

zelid commented Mar 2, 2021

@sharpe5 please also note Postgres Bulk insert API has been changed - http://www.npgsql.org/doc/copy.html
I will try to update the code with your changes and new Bulk Insert API

@mythz
Copy link

mythz commented Mar 3, 2021

@zelid Thanks for the update!

When you update the gist can you add the OSS license you're releasing it under e.g. // MIT or if you want to make it freely available you can release it to public domain with // Any copyright is dedicated to the Public Domain..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment