Created
May 31, 2014 18:32
-
-
Save dalegaspi/e672a8e053675347bd16 to your computer and use it in GitHub Desktop.
A simple loader from SQL Server to Cassandra using DataStax C# Drivers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using Cassandra.Data.Linq; | |
using Cassandra.Data; | |
using Cassandra; | |
using System.Data; | |
using System.Data.SqlClient; | |
using System.Configuration; | |
using System.Xml.Linq; | |
using Cassandra.Data.Linq; | |
namespace APPLDocLoader | |
{ | |
[AllowFiltering] | |
[Table("latest_documents")] | |
public class APPLDocVersion | |
{ | |
[PartitionKey(0)] | |
[Column("item_id")] | |
public Guid ItemId { get; set; } | |
[ClusteringKey(0)] | |
[Column("record_seqno")] | |
public int RecordSequenceNumber { get; set; } | |
} | |
[AllowFiltering] | |
[Table("document_versions")] | |
public class LatestAPPLDocVersion | |
{ | |
[PartitionKey(0)] | |
[Column("item_id")] | |
public Guid ItemId { get; set; } | |
[ClusteringKey(0, "DESC")] | |
[Column("record_seqno")] | |
public int RecordSequenceNumber { get; set; } | |
} | |
[AllowFiltering] | |
[Table("documents")] | |
public class APPLDoc | |
{ | |
[PartitionKey(0)] | |
[Column("item_id_rsn")] | |
public string ItemIdRSN { get; set; } | |
[Column("status")] | |
public string Status { get; set; } | |
[Column("appl")] | |
public string APPL { get; set; } | |
[Column("update_time")] | |
public DateTime UpdateTime { get; set; } | |
[Column("arrival_time")] | |
public DateTime ArrivalTime { get; set; } | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var cluster = Cluster.Builder() | |
.AddContactPoint("127.0.0.1") | |
.Build(); | |
var s = cluster.Connect("ci_appl"); | |
using (SqlConnection c = new SqlConnection(ConfigurationManager.ConnectionStrings["CI_API"].ConnectionString)) | |
{ | |
c.Open(); | |
Table<APPLDoc> docs_table = s.GetTable<APPLDoc>(); | |
Table<APPLDocVersion> docs_versions_table = s.GetTable<APPLDocVersion>(); | |
Table<LatestAPPLDocVersion> latest_docs_table = s.GetTable<LatestAPPLDocVersion>(); | |
docs_table.CreateIfNotExists(); | |
docs_versions_table.CreateIfNotExists(); | |
latest_docs_table.CreateIfNotExists(); | |
var sw = new System.Diagnostics.Stopwatch(); | |
sw.Start(); | |
int pg_size = 500; | |
for (var pg = 1; pg <= 50000; pg++) | |
{ | |
var batch = s.CreateBatch(); | |
Console.Out.WriteLine("{0} in page {1} elapsed {2}", DateTime.Now, pg, sw.Elapsed.ToString()); | |
string scmd = string.Format(@"WITH OrderedAPPL AS ( | |
SELECT ROW_NUMBER() OVER(ORDER BY ItemIdentifier, RecordSequenceNumber) AS RowNum, RecordSequenceNumber, APPL ,ArrivalDateTime ,IsLatest | |
FROM FAST.IndexDocuments with (nolock)) SELECT * FROM OrderedAPPL WHERE RowNum BETWEEN {0} AND {1}", ((pg - 1) * pg_size) + 1, pg * pg_size); | |
System.Console.WriteLine("{0} ...RowNum BETWEEN {1} AND {2}", DateTime.Now, ((pg - 1) * pg_size) + 1, pg * pg_size); | |
SqlCommand cmd = new SqlCommand(scmd, c); | |
cmd.CommandTimeout = 240; | |
using (var rdr = cmd.ExecuteReader()) | |
{ | |
if (!rdr.HasRows) | |
{ | |
Console.Out.WriteLine("page {0} returns 0 rows!", pg); | |
break; | |
} | |
while (rdr.Read()) | |
{ | |
var appl = (string)rdr["APPL"] as string; | |
var rsn = (Int32)rdr["RecordSequenceNumber"]; | |
var adt = (DateTime)rdr["ArrivalDateTime"]; | |
var is_latest = (bool)rdr["IsLatest"]; | |
// read the appl + rsn + arrival date | |
XElement xa = XElement.Parse(appl); | |
XNamespace ap = "http://ap.org/schemas/03/2005/appl"; | |
var item_id = (from e in xa.Descendants(ap + "ItemId") select e.Value).FirstOrDefault(); | |
// insert into cassandra | |
var new_doc = new APPLDoc | |
{ | |
ItemIdRSN = item_id + "_" + rsn.ToString(), | |
ArrivalTime = adt, | |
UpdateTime = DateTime.UtcNow, | |
APPL = appl, | |
Status = "Usable" | |
}; | |
batch.Append(docs_table.Insert(new_doc)); | |
if (is_latest) | |
{ | |
var latest_doc = new LatestAPPLDocVersion | |
{ | |
ItemId = new Guid(item_id), | |
RecordSequenceNumber = rsn | |
}; | |
batch.Append(latest_docs_table.Insert(latest_doc)); | |
} | |
var doc_version = new APPLDocVersion | |
{ | |
ItemId = new Guid(item_id), | |
RecordSequenceNumber = rsn | |
}; | |
batch.Append(docs_versions_table.Insert(doc_version)); | |
} | |
System.Threading.Thread.Sleep(50); | |
rdr.Close(); | |
batch.Execute(); | |
} | |
} | |
System.Console.ReadLine(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment