-
-
Save sharpe5/890a378ef8278a81792039201d4d690b to your computer and use it in GitHub Desktop.
Examples of BulkInsert for PostgreSQL, MySQL and MS SQL using ServiceStack OrmLite. Work in progress...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void BulkInsertNpgsql<T>(this IDbConnection dbConn, IEnumerable<T> list, IEnumerable<string> insertFields = null) | |
{ | |
if (list == null) return; | |
if (list.Count() < 1) return; | |
var objWithAttributes = list.FirstOrDefault(); | |
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType()); | |
if (insertFields == null) insertFields = new List<string>(); | |
var sbColumnNames = new StringBuilder(); | |
var sbColumnValues = new StringBuilder(); | |
var targetTableName = ""; | |
bool isFirstRecord = true; | |
var insertFieldsCount = insertFields.Count(); | |
foreach (var record in list) | |
{ | |
if (isFirstRecord) targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef); | |
var defsCount = modelDef.FieldDefinitions.Count; | |
var counter = 0; | |
foreach (var fieldDef in modelDef.FieldDefinitions) | |
{ | |
counter += 1; | |
if (fieldDef.IsComputed) continue; | |
if (fieldDef.AutoIncrement) continue; | |
//insertFields contains Attribute "Name" of fields to insert ( that's how expressions work ) | |
if (insertFieldsCount > 0 && !insertFields.Contains(fieldDef.Name)) continue; | |
if (sbColumnNames.Length > 0 && isFirstRecord) sbColumnNames.Append(","); | |
try | |
{ | |
if (isFirstRecord) sbColumnNames.Append(OrmLiteConfig.DialectProvider.GetQuotedColumnName(fieldDef.FieldName)); | |
var stringValue = ""; | |
var value = fieldDef.GetValue(record); | |
if (value == null) stringValue = "\\N"; | |
else if (fieldDef.FieldType == typeof(DateTime)) | |
{ | |
var dateValue = (DateTime)value; | |
const string iso8601Format = "yyyy-MM-dd HH:mm:ss.fff"; | |
stringValue = dateValue.ToString(iso8601Format); | |
} | |
else if (fieldDef.FieldType == typeof(Guid)) | |
{ | |
var guidValue = (Guid)value; | |
stringValue = guidValue.ToString(); | |
} | |
else | |
{ | |
stringValue = value.ToString(); | |
} | |
sbColumnValues.Append(stringValue); | |
if (counter < defsCount) | |
{ | |
sbColumnValues.Append("\t"); | |
} | |
} | |
catch (Exception ex) | |
{ | |
throw; | |
} | |
} | |
sbColumnValues.Append("\r\n"); | |
isFirstRecord = false; | |
} | |
var strColumnValues = sbColumnValues.ToString(); | |
byte[] byteArrayColumnValues = System.Text.Encoding.UTF8.GetBytes(strColumnValues); | |
MemoryStream msColumnValues = new MemoryStream(byteArrayColumnValues); | |
//var connection = dbConn as NpgsqlConnection; | |
var ormliteConn = dbConn as OrmLiteConnection; | |
var connection = ormliteConn.DbConnection as NpgsqlConnection; | |
//var connection = (NpgsqlConnection)dbConn.Database.ToDbConnection(); | |
//var connection = dbConn.DbConnection; | |
using (var command = new NpgsqlCommand(string.Format("COPY {0} ({1}) FROM STDIN DELIMITER '\t' ", targetTableName, sbColumnNames), connection)) | |
{ | |
var cin = new NpgsqlCopyIn(command, connection, msColumnValues); | |
try | |
{ | |
cin.Start(); | |
} | |
catch (Exception e) | |
{ | |
try | |
{ | |
cin.Cancel("Undo copy"); | |
} | |
catch (NpgsqlException e2) | |
{ | |
// we should get an error in response to our cancel request: | |
if (!("" + e2).Contains("Undo copy")) | |
{ | |
throw new Exception("Failed to cancel copy: " + e2 + " upon failure: " + e); | |
} | |
} | |
throw e; | |
} | |
} | |
} | |
private static string ToMySqlBulkInsertValue<T>(this FieldDefinition fieldDef, T record) | |
{ | |
var stringValue = ""; | |
var value = fieldDef.GetValue(record); | |
if (value == null) stringValue = ""; | |
else if (fieldDef.FieldType == typeof(DateTime)) | |
{ | |
var dateValue = (DateTime)value; | |
const string iso8601Format = "yyyy-MM-dd HH:mm:ss.fff"; | |
stringValue = dateValue.ToString(iso8601Format); | |
} | |
else if (fieldDef.FieldType == typeof(Guid)) | |
{ | |
var guidValue = (Guid)value; | |
stringValue = guidValue.ToString(); | |
} | |
else | |
{ | |
stringValue = value.ToString(); | |
} | |
return stringValue; | |
} | |
// http://theonetechnologies.com/outsourcing/post/mysql-bulk-data-import-using-net-connector-mysqlbulkloader-class.aspx | |
public static void BulkInsertMySql<T>(this IDbConnection myConnection, IEnumerable<T> list/*, IEnumerable<string> insertFields = null*/) | |
{ | |
if (list == null) return; | |
if (list.Count() < 1) return; | |
var objWithAttributes = list.FirstOrDefault(); | |
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType()); | |
var targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef); | |
var Server = HttpContext.Current.Server; | |
string strFile = targetTableName + "_" + DateTime.Now.Ticks.ToString() + ".csv"; | |
//Create directory if not exist... Make sure directory has required rights.. | |
if (!Directory.Exists(Server.MapPath("~/TempFolder/"))) | |
Directory.CreateDirectory(Server.MapPath("~/TempFolder/")); | |
strFile = Server.MapPath("~/TempFolder/") + strFile; | |
//If file does not exist then create it and right data into it.. | |
if (!File.Exists(strFile)) | |
{ | |
FileStream fs = new FileStream(strFile, FileMode.Create, FileAccess.Write); | |
fs.Close(); | |
fs.Dispose(); | |
} | |
bool isFirstRecord = true; | |
StreamWriter sw = new StreamWriter(strFile, false); | |
//var insertFieldsCount = insertFields.Count(); | |
var defsCount = modelDef.FieldDefinitions.Count; | |
foreach (var record in list) | |
{ | |
if (isFirstRecord) | |
{ | |
sw.Write(string.Join("\t", modelDef.FieldDefinitions.Select(x => x.Name))); | |
sw.Write(sw.NewLine); | |
isFirstRecord = false; | |
} | |
sw.Write(string.Join("\t", modelDef.FieldDefinitions.Select(x => x.ToMySqlBulkInsertValue<T>(record)))); | |
sw.Write(sw.NewLine); | |
} | |
sw.Close(); | |
sw.Dispose(); | |
var connection = myConnection as MySqlConnection; | |
var bl = new MySqlBulkLoader(connection); | |
bl.TableName = targetTableName; | |
bl.FieldTerminator = "\t"; | |
bl.LineTerminator = sw.NewLine; | |
bl.FileName = strFile; | |
bl.NumberOfLinesToSkip = 1; | |
var inserted = bl.Load(); | |
} | |
public static void BulkInsertMsSql<T>(this IDbConnection dbConn, IEnumerable<T> list) | |
{ | |
if (list == null) return; | |
if (list.Count() < 1) return; | |
var objWithAttributes = list.FirstOrDefault(); | |
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType()); | |
var targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef); | |
using (var bulkCopy = new SqlBulkCopy(dbConn as SqlConnection)) | |
{ | |
bulkCopy.BatchSize = list.Count(); | |
bulkCopy.DestinationTableName = targetTableName; | |
var table = new DataTable(); | |
foreach (var fieldDef in modelDef.FieldDefinitions) | |
{ | |
if (fieldDef.IsComputed) continue; | |
if (fieldDef.AutoIncrement) continue; | |
bulkCopy.ColumnMappings.Add(fieldDef.Name, fieldDef.Name); | |
table.Columns.Add(fieldDef.Name, Nullable.GetUnderlyingType(fieldDef.FieldType) ?? fieldDef.FieldType); | |
} | |
foreach (var record in list) | |
{ | |
//var values = new List<object>(); | |
//var dataRows = new List<DataRow>(); | |
DataRow row = table.NewRow(); | |
foreach (var fieldDef in modelDef.FieldDefinitions) | |
{ | |
if (fieldDef.IsComputed) continue; | |
if (fieldDef.AutoIncrement) continue; | |
//values.Add(fieldDef.GetValue(record)); | |
//values.Add(SqlServerDialect.Provider.ConvertDbValue(fieldDef.GetValue(record), fieldDef.FieldType)); | |
//row. | |
row[fieldDef.Name] = SqlServerDialect.Provider.ConvertDbValue(fieldDef.GetValue(record), fieldDef.FieldType); //Nullable.GetUnderlyingType(fieldDef.FieldType) ?? fieldDef.FieldType; | |
} | |
table.Rows.Add(row); | |
//table.Rows.Add(values); | |
} | |
bulkCopy.WriteToServer(table); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To get this compiling with ServiceStack v4.5.4, update
GetModelDefinition
toGetModelMetadata
, and updateConvertDbValue
toFromDbValue
. Not sure if this works, but at least it compiles!