Skip to content

Instantly share code, notes, and snippets.

@dalegaspi
Created May 31, 2014 18:32
Show Gist options
  • Save dalegaspi/e672a8e053675347bd16 to your computer and use it in GitHub Desktop.
Save dalegaspi/e672a8e053675347bd16 to your computer and use it in GitHub Desktop.
A simple loader from SQL Server to Cassandra using DataStax C# Drivers
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Cassandra.Data.Linq;
using Cassandra.Data;
using Cassandra;
using System.Data;
using System.Data.SqlClient;
using System.Configuration;
using System.Xml.Linq;
using Cassandra.Data.Linq;
namespace APPLDocLoader
{
[AllowFiltering]
[Table("latest_documents")]
public class APPLDocVersion
{
[PartitionKey(0)]
[Column("item_id")]
public Guid ItemId { get; set; }
[ClusteringKey(0)]
[Column("record_seqno")]
public int RecordSequenceNumber { get; set; }
}
[AllowFiltering]
[Table("document_versions")]
public class LatestAPPLDocVersion
{
[PartitionKey(0)]
[Column("item_id")]
public Guid ItemId { get; set; }
[ClusteringKey(0, "DESC")]
[Column("record_seqno")]
public int RecordSequenceNumber { get; set; }
}
[AllowFiltering]
[Table("documents")]
public class APPLDoc
{
[PartitionKey(0)]
[Column("item_id_rsn")]
public string ItemIdRSN { get; set; }
[Column("status")]
public string Status { get; set; }
[Column("appl")]
public string APPL { get; set; }
[Column("update_time")]
public DateTime UpdateTime { get; set; }
[Column("arrival_time")]
public DateTime ArrivalTime { get; set; }
}
class Program
{
static void Main(string[] args)
{
var cluster = Cluster.Builder()
.AddContactPoint("127.0.0.1")
.Build();
var s = cluster.Connect("ci_appl");
using (SqlConnection c = new SqlConnection(ConfigurationManager.ConnectionStrings["CI_API"].ConnectionString))
{
c.Open();
Table<APPLDoc> docs_table = s.GetTable<APPLDoc>();
Table<APPLDocVersion> docs_versions_table = s.GetTable<APPLDocVersion>();
Table<LatestAPPLDocVersion> latest_docs_table = s.GetTable<LatestAPPLDocVersion>();
docs_table.CreateIfNotExists();
docs_versions_table.CreateIfNotExists();
latest_docs_table.CreateIfNotExists();
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
int pg_size = 500;
for (var pg = 1; pg <= 50000; pg++)
{
var batch = s.CreateBatch();
Console.Out.WriteLine("{0} in page {1} elapsed {2}", DateTime.Now, pg, sw.Elapsed.ToString());
string scmd = string.Format(@"WITH OrderedAPPL AS (
SELECT ROW_NUMBER() OVER(ORDER BY ItemIdentifier, RecordSequenceNumber) AS RowNum, RecordSequenceNumber, APPL ,ArrivalDateTime ,IsLatest
FROM FAST.IndexDocuments with (nolock)) SELECT * FROM OrderedAPPL WHERE RowNum BETWEEN {0} AND {1}", ((pg - 1) * pg_size) + 1, pg * pg_size);
System.Console.WriteLine("{0} ...RowNum BETWEEN {1} AND {2}", DateTime.Now, ((pg - 1) * pg_size) + 1, pg * pg_size);
SqlCommand cmd = new SqlCommand(scmd, c);
cmd.CommandTimeout = 240;
using (var rdr = cmd.ExecuteReader())
{
if (!rdr.HasRows)
{
Console.Out.WriteLine("page {0} returns 0 rows!", pg);
break;
}
while (rdr.Read())
{
var appl = (string)rdr["APPL"] as string;
var rsn = (Int32)rdr["RecordSequenceNumber"];
var adt = (DateTime)rdr["ArrivalDateTime"];
var is_latest = (bool)rdr["IsLatest"];
// read the appl + rsn + arrival date
XElement xa = XElement.Parse(appl);
XNamespace ap = "http://ap.org/schemas/03/2005/appl";
var item_id = (from e in xa.Descendants(ap + "ItemId") select e.Value).FirstOrDefault();
// insert into cassandra
var new_doc = new APPLDoc
{
ItemIdRSN = item_id + "_" + rsn.ToString(),
ArrivalTime = adt,
UpdateTime = DateTime.UtcNow,
APPL = appl,
Status = "Usable"
};
batch.Append(docs_table.Insert(new_doc));
if (is_latest)
{
var latest_doc = new LatestAPPLDocVersion
{
ItemId = new Guid(item_id),
RecordSequenceNumber = rsn
};
batch.Append(latest_docs_table.Insert(latest_doc));
}
var doc_version = new APPLDocVersion
{
ItemId = new Guid(item_id),
RecordSequenceNumber = rsn
};
batch.Append(docs_versions_table.Insert(doc_version));
}
System.Threading.Thread.Sleep(50);
rdr.Close();
batch.Execute();
}
}
System.Console.ReadLine();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment