Skip to content

Instantly share code, notes, and snippets.

@foyzulkarim
Created September 8, 2018 03:51
Show Gist options
  • Save foyzulkarim/adfa38f1f2c0e859f48c4cbe3c3400af to your computer and use it in GitHub Desktop.
Save foyzulkarim/adfa38f1f2c0e859f48c4cbe3c3400af to your computer and use it in GitHub Desktop.
Data Migration using Threading in C#
using System;
using System.Collections.Generic;
using System.Data.Entity;
using System.Linq;
using System.Linq.Dynamic;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Core = BizBookCoreDbModelLibrary;
using BizBookDbModelLibrary;
using Newtonsoft.Json;
using Transaction = BizBookDbModelLibrary.Transaction;
namespace B2BDataMigratorApp
{
class Program
{
private static List<T> GetList<T>() where T : class
{
BizBookInventoryEntities db = new BizBookInventoryEntities()
{
Configuration = { LazyLoadingEnabled = false, ProxyCreationEnabled = false, AutoDetectChangesEnabled = false }
};
return db.Set<T>().ToList();
}
private static List<T> GetCoreList<T>() where T : class
{
var db = new Core.BizBookCoreDbEntities()
{
Configuration = { LazyLoadingEnabled = false, ProxyCreationEnabled = false, AutoDetectChangesEnabled = false }
};
return db.Set<T>().ToList();
}
static void Main(string[] args)
{
//BizBookInventoryEntities db = new BizBookInventoryEntities()
//{
// Configuration = { LazyLoadingEnabled = false, ProxyCreationEnabled = false, AutoDetectChangesEnabled = false }
//};
//Insert<Shop, Core.Shop>();
//Insert<District, Core.District>();
//Insert<Warehouse, Core.Warehouse>();
//Insert<Zone, Core.Zone>();
//Insert<WarehouseZone, Core.WarehouseZone>();
//Insert<Courier, Core.Courier>();
//Insert<AccountHead, Core.AccountHead>();
//Insert<AccountInfo, Core.AccountInfo>();
//Insert<Brand, Core.Brand>();
//Insert<EmployeeInfo, Core.EmployeeInfo>();
//InsertInThreads<Customer, Core.Customer>();
//Insert<ProductGroup, Core.ProductGroup>();
//Insert<ProductCategory, Core.ProductCategory>();
//Insert<ProductDetail, Core.ProductDetail>();
//Insert<ProductImage, Core.ProductImage>();
//Insert<ProductSerial, Core.ProductSerial>();
//Insert<WarehouseProduct, Core.WarehouseProduct>();
//Insert<Supplier, Core.Supplier>();
//Insert<SupplierProduct, Core.SupplierProduct>();
//Insert<SupplierProductTransaction, Core.SupplierProductTransaction>();
//Insert<Dealer, Core.Dealer>();
//Insert<Damage, Core.Damage>();
//Insert<DealerProduct, Core.DealerProduct>();
//InsertInThreads<Address, Core.Address>(3000);
//Insert<Purchase, Core.Purchase>();
//Insert<PurchaseDetail, Core.PurchaseDetail>();
//InsertSaleInThreads<Sale, Core.Sale>(3000);
//InsertInThreads<SaleDetail, Core.SaleDetail>(3000);
//InsertInThreads<SaleState, Core.SaleState>(3000);
//Insert<StockTransfer, Core.StockTransfer>();
//Insert<StockTransferDetail, Core.StockTransferDetail>();
//InsertInThreads<Installment, Core.Installment>();
//InsertInThreads<InstallmentDetail, Core.InstallmentDetail>();
//Insert<HookDetail, Core.HookDetail>();
//InsertInThreads<Sm, Core.Sm>();
//Insert<SmsHook, Core.SmsHook>();
//InsertInThreads<SmsHistory, Core.SmsHistory>();
InsertInThreads<Transaction, Core.Transaction>();
//Insert<DealerProductTransaction, Core.DealerProductTransaction>();
//Insert<SupplierProductTransaction, Core.SupplierProductTransaction>();
//InsertIdentity<AspNetResource, Core.AspNetResource>();
//InsertIdentity<AspNetRole, Core.AspNetRole>();
//InsertIdentity<AspNetPermission, Core.AspNetPermission>();
//InsertIdentity<AspNetUser, Core.AspNetUser>();
//ExecuteUserRole();
//FixSale();
Console.WriteLine("DONE");
Console.ReadKey();
}
private static void FixSale()
{
Core.BizBookCoreDbEntities coreDb = new Core.BizBookCoreDbEntities();
var installments = coreDb.Installments.ToList();
foreach (Core.Installment installment in installments)
{
if (!string.IsNullOrWhiteSpace(installment.SaleId))
{
Core.Sale sale = coreDb.Sales.Find(installment.SaleId);
sale.InstallmentId = installment.Id;
coreDb.SaveChanges();
}
}
}
private static void InsertSaleInThreads<T, TC>(int take = 5000)
where T : Sale where TC : Core.Sale
{
List<T> items = GetList<T>();
foreach (T item in items)
{
item.InstallmentId = null;
}
long count = items.Count;
List<Thread> threads = new List<Thread>();
for (int i = 0; i < count;)
{
var from = i;
var chunkedList = items.Skip(from).Take(take).ToList();
Console.WriteLine($"\n\nData migration started for {typeof(T)}");
Core.BizBookCoreDbEntities coreDb = new Core.BizBookCoreDbEntities()
{
Configuration = { LazyLoadingEnabled = false, ProxyCreationEnabled = false }
};
Thread t = new Thread(() =>
{
Insert<T, TC>(chunkedList, coreDb, (from).ToString());
});
i = i + take;
threads.Add(t);
}
foreach (var thread in threads)
{
thread.Start();
}
}
private static void ExecuteUserRole()
{
Console.Clear();
var db = new BizBookIdentityEntities()
{
Configuration = { LazyLoadingEnabled = false, ProxyCreationEnabled = false, }
};
Console.WriteLine($"\n\nData migration started for {typeof(AspNetUserRole)}");
var list = db.Set<AspNetUserRole>().ToList();
var coreDb = new Core.BizBookCoreIdentityDbEntities()
{
Configuration = { LazyLoadingEnabled = false, ProxyCreationEnabled = false }
};
var total = list.Count;
int i = 0;
foreach (var aspNetUser in list)
{
Console.WriteLine($"\nIterating {i++} of {total} - \t\n");
if (coreDb.AspNetUserRoles.Any(x => x.UserId == aspNetUser.UserId && x.RoleId == aspNetUser.RoleId))
{
continue;
}
string s = JsonConvert.SerializeObject(aspNetUser);
Core.AspNetUserRole userRole = JsonConvert.DeserializeObject<Core.AspNetUserRole>(s);
userRole.Discriminator = DateTime.Now.Ticks.ToString();
coreDb.AspNetUserRoles.Add(userRole);
coreDb.SaveChanges();
}
}
private static void Insert<T, TC>() where T : class where TC : class
{
List<T> list = GetList<T>();
Insert<T, TC>(list);
}
private static void Insert<T, TC>(List<T> list) where T : class where TC : class
{
var coreDbEntities = new Core.BizBookCoreDbEntities();
Insert<T, TC>(list, coreDbEntities);
}
private static void Insert<T, TC>(List<T> list, DbContext coreDb, string thread = "")
where T : class where TC : class
{
var total = list.Count;
int i = 0;
DbSet<TC> cDbSet = coreDb.Set<TC>();
foreach (var item in list.AsParallel())
{
Console.WriteLine($"\nIterating {i++} of {total} - \t {thread} -\n");
var s = JsonConvert.SerializeObject(item);
dynamic d = JsonConvert.DeserializeObject<TC>(s);
bool exists = cDbSet.Where($@"Id=""{d.Id}""").Any();
if (!exists)
{
Console.WriteLine($"saving {typeof(T)} - {d.Id}");
cDbSet.Add(d);
coreDb.SaveChanges();
}
else
{
Console.WriteLine($"exists {typeof(T)} - {d.Id}");
}
}
Console.WriteLine($"\n\nData migration ended for {typeof(T)}");
}
private static List<T> Clean<T>(List<T> items, List<dynamic> coreids, int @from) where T: class
{
List<T> newItems = JsonConvert.DeserializeObject<List<T>>(JsonConvert.SerializeObject(items));
foreach (T item in items)
{
bool exists = coreids.Exists(x => x == ((dynamic) item).Id);
if (exists)
{
var first = newItems.First(x => ((dynamic) x).Id == ((dynamic) item).Id);
newItems.Remove(first);
Console.WriteLine($"removing {typeof(T)} {((dynamic)item).Id} \t {from}");
}
}
return newItems;
}
private static void InsertInThreads<T, TC>(int take = 5000)
where T : class where TC : class
{
List<T> items = GetList<T>();
List<TC> coreList = GetCoreList<TC>();
List<dynamic> coreids = coreList.Select(x => (((dynamic) x).Id).ToString()).ToList();
List<T> newItems = new List<T>();
List<Thread> cleanThreads = new List<Thread>();
for (int i = 0; i < items.Count;)
{
var from = i;
List<T> chunkedList = items.Skip(from).Take(take).ToList();
Console.WriteLine($"\n\nData clean started for {typeof(T)}");
Thread t = new Thread(() =>
{
List<T> cleanedItems = Clean(chunkedList, coreids, from);
newItems.AddRange(cleanedItems);
});
i = i + take;
cleanThreads.Add(t);
}
foreach (var thread in cleanThreads)
{
thread.Start();
}
foreach (var thread in cleanThreads)
{
thread.Join();
}
long count = newItems.Count;
List<Thread> threads = new List<Thread>();
for (int i = 0; i < count;)
{
var from = i;
List<T> chunkedList = newItems.Skip(from).Take(take).ToList();
Console.WriteLine($"\n\nData migration started for {typeof(T)}");
Core.BizBookCoreDbEntities coreDb = new Core.BizBookCoreDbEntities()
{
Configuration = { LazyLoadingEnabled = false, ProxyCreationEnabled = false }
};
Thread t = new Thread(() =>
{
Insert<T, TC>(chunkedList, coreDb, (from).ToString());
});
i = i + take;
threads.Add(t);
}
foreach (var thread in threads)
{
thread.Start();
}
foreach (var thread in threads)
{
thread.Join();
}
}
private static void InsertIdentity<T, TC>() where T : class where TC : class
{
Console.Clear();
var db = new BizBookIdentityEntities()
{
Configuration = { LazyLoadingEnabled = false, ProxyCreationEnabled = false, }
};
Console.WriteLine($"\n\nData migration started for {typeof(T)}");
var list = db.Set<T>().ToList();
var coreDb = new Core.BizBookCoreIdentityDbEntities()
{
Configuration = { LazyLoadingEnabled = false, ProxyCreationEnabled = false }
};
Insert<T, TC>(list, coreDb);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment