-
-
Save zelid/6965002 to your computer and use it in GitHub Desktop.
public static void BulkInsertNpgsql<T>(this IDbConnection dbConn, IEnumerable<T> list, IEnumerable<string> insertFields = null) | |
{ | |
if (list == null) return; | |
if (list.Count() < 1) return; | |
var objWithAttributes = list.FirstOrDefault(); | |
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType()); | |
if (insertFields == null) insertFields = new List<string>(); | |
var sbColumnNames = new StringBuilder(); | |
var sbColumnValues = new StringBuilder(); | |
var targetTableName = ""; | |
bool isFirstRecord = true; | |
var insertFieldsCount = insertFields.Count(); | |
foreach (var record in list) | |
{ | |
if (isFirstRecord) targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef); | |
var defsCount = modelDef.FieldDefinitions.Count; | |
var counter = 0; | |
foreach (var fieldDef in modelDef.FieldDefinitions) | |
{ | |
counter += 1; | |
if (fieldDef.IsComputed) continue; | |
if (fieldDef.AutoIncrement) continue; | |
//insertFields contains Attribute "Name" of fields to insert ( that's how expressions work ) | |
if (insertFieldsCount > 0 && !insertFields.Contains(fieldDef.Name)) continue; | |
if (sbColumnNames.Length > 0 && isFirstRecord) sbColumnNames.Append(","); | |
try | |
{ | |
if (isFirstRecord) sbColumnNames.Append(OrmLiteConfig.DialectProvider.GetQuotedColumnName(fieldDef.FieldName)); | |
var stringValue = ""; | |
var value = fieldDef.GetValue(record); | |
if (value == null) stringValue = "\\N"; | |
else if (fieldDef.FieldType == typeof(DateTime)) | |
{ | |
var dateValue = (DateTime)value; | |
const string iso8601Format = "yyyy-MM-dd HH:mm:ss.fff"; | |
stringValue = dateValue.ToString(iso8601Format); | |
} | |
else if (fieldDef.FieldType == typeof(Guid)) | |
{ | |
var guidValue = (Guid)value; | |
stringValue = guidValue.ToString(); | |
} | |
else | |
{ | |
stringValue = value.ToString(); | |
} | |
sbColumnValues.Append(stringValue); | |
if (counter < defsCount) | |
{ | |
sbColumnValues.Append("\t"); | |
} | |
} | |
catch (Exception ex) | |
{ | |
throw; | |
} | |
} | |
sbColumnValues.Append("\r\n"); | |
isFirstRecord = false; | |
} | |
var strColumnValues = sbColumnValues.ToString(); | |
byte[] byteArrayColumnValues = System.Text.Encoding.UTF8.GetBytes(strColumnValues); | |
MemoryStream msColumnValues = new MemoryStream(byteArrayColumnValues); | |
//var connection = dbConn as NpgsqlConnection; | |
var ormliteConn = dbConn as OrmLiteConnection; | |
var connection = ormliteConn.DbConnection as NpgsqlConnection; | |
//var connection = (NpgsqlConnection)dbConn.Database.ToDbConnection(); | |
//var connection = dbConn.DbConnection; | |
using (var command = new NpgsqlCommand(string.Format("COPY {0} ({1}) FROM STDIN DELIMITER '\t' ", targetTableName, sbColumnNames), connection)) | |
{ | |
var cin = new NpgsqlCopyIn(command, connection, msColumnValues); | |
try | |
{ | |
cin.Start(); | |
} | |
catch (Exception e) | |
{ | |
try | |
{ | |
cin.Cancel("Undo copy"); | |
} | |
catch (NpgsqlException e2) | |
{ | |
// we should get an error in response to our cancel request: | |
if (!("" + e2).Contains("Undo copy")) | |
{ | |
throw new Exception("Failed to cancel copy: " + e2 + " upon failure: " + e); | |
} | |
} | |
throw e; | |
} | |
} | |
} | |
private static string ToMySqlBulkInsertValue<T>(this FieldDefinition fieldDef, T record) | |
{ | |
var stringValue = ""; | |
var value = fieldDef.GetValue(record); | |
if (value == null) stringValue = ""; | |
else if (fieldDef.FieldType == typeof(DateTime)) | |
{ | |
var dateValue = (DateTime)value; | |
const string iso8601Format = "yyyy-MM-dd HH:mm:ss.fff"; | |
stringValue = dateValue.ToString(iso8601Format); | |
} | |
else if (fieldDef.FieldType == typeof(Guid)) | |
{ | |
var guidValue = (Guid)value; | |
stringValue = guidValue.ToString(); | |
} | |
else | |
{ | |
stringValue = value.ToString(); | |
} | |
return stringValue; | |
} | |
// http://theonetechnologies.com/outsourcing/post/mysql-bulk-data-import-using-net-connector-mysqlbulkloader-class.aspx | |
public static void BulkInsertMySql<T>(this IDbConnection myConnection, IEnumerable<T> list/*, IEnumerable<string> insertFields = null*/) | |
{ | |
if (list == null) return; | |
if (list.Count() < 1) return; | |
var objWithAttributes = list.FirstOrDefault(); | |
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType()); | |
var targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef); | |
var Server = HttpContext.Current.Server; | |
string strFile = targetTableName + "_" + DateTime.Now.Ticks.ToString() + ".csv"; | |
//Create directory if not exist... Make sure directory has required rights.. | |
if (!Directory.Exists(Server.MapPath("~/TempFolder/"))) | |
Directory.CreateDirectory(Server.MapPath("~/TempFolder/")); | |
strFile = Server.MapPath("~/TempFolder/") + strFile; | |
//If file does not exist then create it and right data into it.. | |
if (!File.Exists(strFile)) | |
{ | |
FileStream fs = new FileStream(strFile, FileMode.Create, FileAccess.Write); | |
fs.Close(); | |
fs.Dispose(); | |
} | |
bool isFirstRecord = true; | |
StreamWriter sw = new StreamWriter(strFile, false); | |
//var insertFieldsCount = insertFields.Count(); | |
var defsCount = modelDef.FieldDefinitions.Count; | |
foreach (var record in list) | |
{ | |
if (isFirstRecord) | |
{ | |
sw.Write(string.Join("\t", modelDef.FieldDefinitions.Select(x => x.Name))); | |
sw.Write(sw.NewLine); | |
isFirstRecord = false; | |
} | |
sw.Write(string.Join("\t", modelDef.FieldDefinitions.Select(x => x.ToMySqlBulkInsertValue<T>(record)))); | |
sw.Write(sw.NewLine); | |
} | |
sw.Close(); | |
sw.Dispose(); | |
var connection = myConnection as MySqlConnection; | |
var bl = new MySqlBulkLoader(connection); | |
bl.TableName = targetTableName; | |
bl.FieldTerminator = "\t"; | |
bl.LineTerminator = sw.NewLine; | |
bl.FileName = strFile; | |
bl.NumberOfLinesToSkip = 1; | |
var inserted = bl.Load(); | |
} | |
public static void BulkInsertMsSql<T>(this IDbConnection dbConn, IEnumerable<T> list) | |
{ | |
if (list == null) return; | |
if (list.Count() < 1) return; | |
var objWithAttributes = list.FirstOrDefault(); | |
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType()); | |
var targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef); | |
using (var bulkCopy = new SqlBulkCopy(dbConn as SqlConnection)) | |
{ | |
bulkCopy.BatchSize = list.Count(); | |
bulkCopy.DestinationTableName = targetTableName; | |
var table = new DataTable(); | |
foreach (var fieldDef in modelDef.FieldDefinitions) | |
{ | |
if (fieldDef.IsComputed) continue; | |
if (fieldDef.AutoIncrement) continue; | |
bulkCopy.ColumnMappings.Add(fieldDef.Name, fieldDef.Name); | |
table.Columns.Add(fieldDef.Name, Nullable.GetUnderlyingType(fieldDef.FieldType) ?? fieldDef.FieldType); | |
} | |
foreach (var record in list) | |
{ | |
//var values = new List<object>(); | |
//var dataRows = new List<DataRow>(); | |
DataRow row = table.NewRow(); | |
foreach (var fieldDef in modelDef.FieldDefinitions) | |
{ | |
if (fieldDef.IsComputed) continue; | |
if (fieldDef.AutoIncrement) continue; | |
//values.Add(fieldDef.GetValue(record)); | |
//values.Add(SqlServerDialect.Provider.ConvertDbValue(fieldDef.GetValue(record), fieldDef.FieldType)); | |
//row. | |
row[fieldDef.Name] = SqlServerDialect.Provider.ConvertDbValue(fieldDef.GetValue(record), fieldDef.FieldType); //Nullable.GetUnderlyingType(fieldDef.FieldType) ?? fieldDef.FieldType; | |
} | |
table.Rows.Add(row); | |
//table.Rows.Add(values); | |
} | |
bulkCopy.WriteToServer(table); | |
} | |
} | |
Thanks for the examples, is this released under an OSS license?
Yep. OSS. @mythz Feel free to integrate it to OrmLite if needed.
Funny thing that I completely forgot about this Gist and found a link to it on https://forums.servicestack.net/t/how-to-insert-multiple-records-with-1-insert/6494/2 when was googling how to do a bulk insert with OrmLite ;-)
@sharpe5 please also note Postgres Bulk insert API has been changed - http://www.npgsql.org/doc/copy.html
I will try to update the code with your changes and new Bulk Insert API
@zelid Thanks for the update!
When you update the gist can you add the OSS license you're releasing it under e.g. // MIT
or if you want to make it freely available you can release it to public domain with // Any copyright is dedicated to the Public Domain.
.
Welcome to the persnippity world of SqlBulkCopy☹️