Created
February 1, 2017 00:28
-
-
Save meziantou/174e2791dec966be837746750b87d069 to your computer and use it in GitHub Desktop.
ObjectDataReader (BulkInsert)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ObjectDataReader<T> : DbDataReader | |
{ | |
private IEnumerator<T> _iterator; | |
private IDictionary<string, int> _propertyNameToOrdinal = new Dictionary<string, int>(); | |
private IDictionary<int, string> _ordinalToPropertyName = new Dictionary<int, string>(); | |
private Func<T, object>[] _getPropertyValueFuncs; | |
public ObjectDataReader(IEnumerator<T> items) | |
{ | |
_iterator = items ?? throw new ArgumentNullException(nameof(items)); | |
Initialize(); | |
} | |
private void Initialize() | |
{ | |
int ordinal = 0; | |
var properties = typeof(T).GetProperties(); | |
_getPropertyValueFuncs = new Func<T, object>[properties.Length]; | |
foreach (var property in properties) | |
{ | |
string propertyName = property.Name; | |
_propertyNameToOrdinal.Add(propertyName, ordinal); | |
_ordinalToPropertyName.Add(ordinal, propertyName); | |
var parameterExpression = Expression.Parameter(typeof(T), "x"); | |
var func = (Func<T, object>)Expression.Lambda(Expression.Convert(Expression.Property(parameterExpression, propertyName), typeof(object)), parameterExpression).Compile(); | |
_getPropertyValueFuncs[ordinal] = func; | |
ordinal++; | |
} | |
} | |
public override object this[int ordinal] => GetValue(ordinal); | |
public override object this[string name] => GetValue(GetOrdinal(name)); | |
public override int Depth => 0; | |
public override int FieldCount => _ordinalToPropertyName.Count; | |
public override bool HasRows => true; | |
public override bool IsClosed => _iterator != null; | |
public override int RecordsAffected => throw new NotImplementedException(); | |
public override bool GetBoolean(int ordinal) | |
{ | |
return (bool)GetValue(ordinal); | |
} | |
public override byte GetByte(int ordinal) | |
{ | |
return (byte)GetValue(ordinal); | |
} | |
public override long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length) | |
{ | |
throw new NotImplementedException(); | |
} | |
public override char GetChar(int ordinal) | |
{ | |
return (char)GetValue(ordinal); | |
} | |
public override long GetChars(int ordinal, long dataOffset, char[] buffer, int bufferOffset, int length) | |
{ | |
throw new NotImplementedException(); | |
} | |
public override string GetDataTypeName(int ordinal) | |
{ | |
throw new NotImplementedException(); | |
} | |
public override DateTime GetDateTime(int ordinal) | |
{ | |
return (DateTime)GetValue(ordinal); | |
} | |
public override decimal GetDecimal(int ordinal) | |
{ | |
return (decimal)GetValue(ordinal); | |
} | |
public override double GetDouble(int ordinal) | |
{ | |
return (double)GetValue(ordinal); | |
} | |
public override IEnumerator GetEnumerator() | |
{ | |
throw new NotImplementedException(); | |
} | |
public override Type GetFieldType(int ordinal) | |
{ | |
var value = GetValue(ordinal); | |
if (value == null) | |
return typeof(object); | |
return value.GetType(); | |
} | |
public override float GetFloat(int ordinal) | |
{ | |
return (float)GetValue(ordinal); | |
} | |
public override Guid GetGuid(int ordinal) | |
{ | |
return (Guid)GetValue(ordinal); | |
} | |
public override short GetInt16(int ordinal) | |
{ | |
return (short)GetValue(ordinal); | |
} | |
public override int GetInt32(int ordinal) | |
{ | |
return (int)GetValue(ordinal); | |
} | |
public override long GetInt64(int ordinal) | |
{ | |
return (long)GetValue(ordinal); | |
} | |
public override string GetName(int ordinal) | |
{ | |
if (_ordinalToPropertyName.TryGetValue(ordinal, out var name)) | |
return name; | |
return null; | |
} | |
public override int GetOrdinal(string name) | |
{ | |
if (_propertyNameToOrdinal.TryGetValue(name, out var ordinal)) | |
return ordinal; | |
return -1; | |
} | |
public override string GetString(int ordinal) | |
{ | |
return (string)GetValue(ordinal); | |
} | |
public override object GetValue(int ordinal) | |
{ | |
var func = _getPropertyValueFuncs[ordinal]; | |
return func(_iterator.Current); | |
} | |
public override int GetValues(object[] values) | |
{ | |
int max = Math.Min(values.Length, FieldCount); | |
for(var i = 0; i < max; i++) | |
{ | |
values[i] = IsDBNull(i) ? DBNull.Value : GetValue(i); | |
} | |
return max; | |
} | |
public override bool IsDBNull(int ordinal) | |
{ | |
return GetValue(ordinal) == null; | |
} | |
public override bool NextResult() | |
{ | |
return false; | |
} | |
public override bool Read() | |
{ | |
return _iterator.MoveNext(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
InsertAsync().Wait(); | |
} | |
public static async Task InsertAsync(CancellationToken ct = default(CancellationToken)) | |
{ | |
using (var connection = new SqlConnection()) | |
{ | |
connection.ConnectionString = "Server=(local);Database=Sample;Trusted_Connection=True;"; | |
await connection.OpenAsync(ct); | |
using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, null)) | |
{ | |
var customers = Customer.Generate(1000000); | |
using (var enumerator = customers.GetEnumerator()) | |
using (var customerReader = new ObjectDataReader<Customer>(enumerator)) | |
{ | |
bulk.DestinationTableName = "Customer"; | |
bulk.ColumnMappings.Add(nameof(Customer.Id), "Id"); | |
bulk.ColumnMappings.Add(nameof(Customer.FirstName), "FirstName"); | |
bulk.ColumnMappings.Add(nameof(Customer.LastName), "LastName"); | |
bulk.ColumnMappings.Add(nameof(Customer.DateOfBirth), "DateOfBirth"); | |
bulk.EnableStreaming = true; | |
bulk.BatchSize = 10000; | |
bulk.NotifyAfter = 1000; | |
bulk.SqlRowsCopied += (sender, e) => Console.WriteLine("RowsCopied: " + e.RowsCopied); | |
await bulk.WriteToServerAsync(customerReader, ct); | |
} | |
} | |
} | |
} | |
} | |
public class Customer | |
{ | |
public Guid Id { get; set; } | |
public string FirstName { get; set; } | |
public string LastName { get; set; } | |
public DateTime DateOfBirth { get; set; } | |
public static IEnumerable<Customer> Generate(int count) | |
{ | |
for (int i = 0; i < count; i++) | |
{ | |
yield return new Customer | |
{ | |
Id = Guid.NewGuid(), | |
FirstName = "FirstName" + i, | |
LastName = "LastName" + i, | |
DateOfBirth = DateTime.UtcNow | |
}; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I have one objection, when I set
bulk.EnableStreaming = true;
then my code instantly allocates memory before sending to sql.
leave it to default value (false) I observe that during sending to sql memory is slowly increasing.
Can u reproduce this behavior ?