Skip to content

Instantly share code, notes, and snippets.

@Eibwen
Last active October 25, 2018 19:22
Show Gist options
  • Save Eibwen/d3ffc958a6c2d1cb33c422bf5f2036fb to your computer and use it in GitHub Desktop.
Save Eibwen/d3ffc958a6c2d1cb33c422bf5f2036fb to your computer and use it in GitHub Desktop.
Use IEnumerable<T> to feed SqlBulkCopy
void Main()
{
gwalker_test_table.Dump();
// gwalker_test_table.DeleteAllOnSubmit(gwalker_test_table);
// SubmitChanges();
// return;
var rand = new Random();
var myData = new List<MyClass>
{
new MyClass
{
DateAdded = DateTime.UtcNow,
Identifier = rand.Next(10000),
ForeginKey = null,
Value = "Hi mom"
},
new MyClass
{
DateAdded = DateTime.UtcNow,
Identifier = rand.Next(10000),
ForeginKey = 7,
Value = "Hi dude"
}
};
var mapping = new Dictionary<string, string>
{
{ nameof(MyClass.Identifier), "Id" }
};
var reader = new ObjectDataReader<MyClass>(myData, mapping);
using (var bulkCopy = new SqlBulkCopy(this.Connection.ConnectionString))
{
bulkCopy.DestinationTableName = nameof(gwalker_test_table);
reader.AddMappings(bulkCopy.ColumnMappings);
bulkCopy.WriteToServer(reader);
}
gwalker_test_table.Dump();
}
public class MyClass
{
public int Identifier { get; set; }
public DateTime DateAdded { get; set; }
public int? ForeginKey { get; set; }
public string Sku { get; set; }
public string Value { get; set; }
}
/// <summary>
/// This wraps an <c>IEnumerable</c> of objects into a IDataReader, so that it can be used with <c>SqlBulkCopy</c>
///
/// Minimal properties implemented for that singular use-case right now
/// </summary>
/// <typeparam name="T"></typeparam>
public class ObjectDataReader<T> : IDataReader
{
private readonly IEnumerator<T> _dataEnumerator;
private readonly Dictionary<string, string> _columnMapping;
private PropertyInfo[] _propertyArray;
private Dictionary<string, int> _columnIdLookup;
public ObjectDataReader(IEnumerable<T> data, IDictionary<string, string> columnMapping)
{
_dataEnumerator = data.GetEnumerator();
_columnMapping = columnMapping != null
? new Dictionary<string, string>(columnMapping, StringComparer.InvariantCultureIgnoreCase)
: new Dictionary<string, string>(StringComparer.InvariantCultureIgnoreCase);
Initialize();
}
private void Initialize()
{
var t = typeof(T);
_propertyArray = t.GetProperties().Where(x => x.CanRead).ToArray();
_columnIdLookup = _propertyArray.Select((p, i) => new { p.Name, Id = i })
.ToDictionary(x => x.Name, x => x.Id);
foreach (var prop in _propertyArray)
{
if (!_columnMapping.ContainsKey(prop.Name))
{
_columnMapping.Add(prop.Name, prop.Name);
}
}
}
public void AddMappings(SqlBulkCopyColumnMappingCollection columnMappings)
{
foreach (var mapping in _columnMapping)
{
columnMappings.Add(mapping.Key, mapping.Value);
}
}
void IDataReader.Close()
{
throw new NotImplementedException();
}
DataTable IDataReader.GetSchemaTable()
{
throw new NotImplementedException();
}
bool IDataReader.NextResult()
{
throw new NotImplementedException();
}
bool IDataReader.Read()
{
return _dataEnumerator.MoveNext();
}
int IDataReader.Depth => throw new NotImplementedException();
bool IDataReader.IsClosed => throw new NotImplementedException();
int IDataReader.RecordsAffected => throw new NotImplementedException();
bool IDataRecord.GetBoolean(int i)
{
throw new NotImplementedException();
}
byte IDataRecord.GetByte(int i)
{
throw new NotImplementedException();
}
long IDataRecord.GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
char IDataRecord.GetChar(int i)
{
throw new NotImplementedException();
}
long IDataRecord.GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
IDataReader IDataRecord.GetData(int i)
{
throw new NotImplementedException();
}
string IDataRecord.GetDataTypeName(int i)
{
throw new NotImplementedException();
}
DateTime IDataRecord.GetDateTime(int i)
{
throw new NotImplementedException();
}
decimal IDataRecord.GetDecimal(int i)
{
throw new NotImplementedException();
}
double IDataRecord.GetDouble(int i)
{
throw new NotImplementedException();
}
Type IDataRecord.GetFieldType(int i)
{
throw new NotImplementedException();
}
float IDataRecord.GetFloat(int i)
{
throw new NotImplementedException();
}
Guid IDataRecord.GetGuid(int i)
{
throw new NotImplementedException();
}
short IDataRecord.GetInt16(int i)
{
throw new NotImplementedException();
}
int IDataRecord.GetInt32(int i)
{
throw new NotImplementedException();
}
long IDataRecord.GetInt64(int i)
{
throw new NotImplementedException();
}
string IDataRecord.GetName(int i)
{
throw new NotImplementedException();
}
int IDataRecord.GetOrdinal(string name)
{
return _columnIdLookup[name];
}
string IDataRecord.GetString(int i)
{
throw new NotImplementedException();
}
object IDataRecord.GetValue(int i)
{
return _propertyArray[i].GetValue(_dataEnumerator.Current);
}
int IDataRecord.GetValues(object[] values)
{
throw new NotImplementedException();
}
bool IDataRecord.IsDBNull(int i)
{
throw new NotImplementedException();
}
int IDataRecord.FieldCount => _propertyArray.Length;
object IDataRecord.this[int i] => throw new NotImplementedException();
object IDataRecord.this[string name] => throw new NotImplementedException();
void IDisposable.Dispose()
{
_dataEnumerator.Dispose();
}
}
public static class SqlConnectionExtensions
{
/// <summary>
/// This uses reflection to get the properties of your object.
///
/// Then transforms your objects into a DataTable
///
/// Then uses SqlBulkCopy to put it into SQL
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="connection"></param>
/// <param name="insertIntoTable"></param>
/// <param name="data">Currently this method is stupid, the type's properties should match the database table columns, if not use the column mapping dictionary</param>
/// <param name="batchSize"></param>
/// <param name="notifyCallback"></param>
/// <param name="columnMapping">Manual mapping of object property names to database column names</param>
/// <param name="timeoutSeconds">Timeout the queries after</param>
public static void BulkInsert<T>(
this SqlConnection connection,
string insertIntoTable,
IEnumerable<T> data,
int batchSize = 1000,
Action<object, SqlRowsCopiedEventArgs> notifyCallback = null,
Dictionary<string, string> columnMapping = null,
int timeoutSeconds = 30)
{
var reader = new ObjectDataReader<T>(data, null);
using (var bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = insertIntoTable;
bulkCopy.BatchSize = batchSize;
bulkCopy.BulkCopyTimeout = timeoutSeconds;
//This is to allow arbitrary order of properties and/or the columnMapping dictionary
reader.AddMappings(bulkCopy.ColumnMappings);
if (notifyCallback != null)
{
bulkCopy.SqlRowsCopied += new SqlRowsCopiedEventHandler(notifyCallback);
bulkCopy.NotifyAfter = 100 * 1000;
}
bulkCopy.WriteToServer(reader);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment