Created
May 30, 2014 13:09
-
-
Save stdray/fb3b490602e6d59ee851 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Xml.Linq; | |
using System.Xml.XPath; | |
using Akka.Actor; | |
using Akka.Event; | |
using CsvHelper; | |
namespace ExtDoc2Parser | |
{ | |
using Settings = Properties.Settings; | |
using Data = Tuple<string, string, string>; | |
public class Parser : TypedActor, IHandle<byte[]> | |
{ | |
private readonly ActorRef _writer; | |
private readonly LoggingAdapter _logger = Logging.GetLogger(Context); | |
private readonly Stopwatch _timer = new Stopwatch(); | |
private int _parsedCount = 0; | |
public Parser(ActorRef writer) | |
{ | |
_writer = writer; | |
} | |
public void Handle(byte[] message) | |
{ | |
_timer.Start(); | |
var data = Parse(message); | |
_timer.Stop(); | |
foreach (var d in data) | |
_writer.Tell(d); | |
if (++_parsedCount % Settings.Default.LogPeriod != 0) | |
return; | |
var avr = ((double)_timer.ElapsedMilliseconds) / Settings.Default.LogPeriod; | |
_logger.Log(LogLevel.InfoLevel, "avr time : {0}ms", avr); | |
_timer.Reset(); | |
} | |
private static IEnumerable<Data> Parse(byte[] doc) | |
{ | |
return from el in XElement.Load(new MemoryStream(doc)).Descendants("anyType") | |
let amounts = el.XPathSelectElements(Settings.Default.LastColumnSelector) | |
let amountsValue = Value(amounts) | |
where !string.IsNullOrEmpty(amountsValue) | |
let notificationNumbers = el.Descendants().Where(d => d.Name.LocalName == "notificationNumber") | |
let ordinalNumbers = | |
el.XPathSelectElements(".//*[local-name() = 'lot']/*[local-name() = 'ordinalNumber']") | |
select new Data(Value(notificationNumbers), Value(ordinalNumbers), Value(amounts)); | |
} | |
private static string Value(IEnumerable<XElement> xElements) | |
{ | |
return string.Join(" ", xElements.Select(x => x.Value)); | |
} | |
} | |
public class Writer : TypedActor, IHandle<Data> | |
{ | |
private readonly CsvWriter _csv; | |
private readonly LoggingAdapter _logger = Logging.GetLogger(Context); | |
private int _writedCount = 0; | |
public Writer(TextWriter writer) | |
{ | |
_csv = new CsvWriter(writer); | |
_csv.Configuration.RegisterClassMap<AccumulationDataMap>(); | |
_csv.Configuration.Delimiter = Settings.Default.Delimiter; | |
_csv.WriteHeader<Data>(); | |
} | |
public void Handle(Data message) | |
{ | |
_csv.WriteRecord(message); | |
if (++_writedCount % Settings.Default.LogPeriod == 0) | |
_logger.Log(LogLevel.InfoLevel, "write records: {0}", _writedCount); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using Akka.Actor; | |
using Akka.Routing; | |
using ExtDoc2Parser.Model; | |
namespace ExtDoc2Parser | |
{ | |
using Settings = Properties.Settings; | |
internal class Program | |
{ | |
private static void Main() | |
{ | |
Console.WriteLine("start"); | |
using (var system = ActorSystem.Create("extDoc2")) | |
using (var streamWriter = new StreamWriter(Settings.Default.FileName)) | |
using (var db = new marketDB { CommandTimeout = Settings.Default.CommandTimeout }) | |
{ | |
var writer = system.ActorOf(Props.Create(() => new Writer(streamWriter)), "writer"); | |
var parsers = Enumerable | |
.Range(0, Settings.Default.DegreeOfParallelism) | |
.Select(i => system.ActorOf(Props.Create(() => new Parser(writer)), "parser_" + i)) | |
.ToList(); | |
var router = system.ActorOf(new Props().WithRouter(new RoundRobinGroup(parsers))); | |
var data = db.EXTDOC2 | |
.Where(d => d.EXTERNALSOURCEID == Settings.Default.ExternalSourceId) | |
.Where(d => d.EXTERNALDATAKIND == Settings.Default.DataKindId) | |
.Select(d => d.CONTENT); | |
var rowNumber = 0; | |
var timer = Stopwatch.StartNew(); | |
foreach (var d in data) | |
{ | |
router.Tell(d); | |
if (++rowNumber % Settings.Default.LogPeriod == 0) | |
Console.WriteLine("[{2}] fetch items: {0}, avr time: {1}ms", | |
rowNumber, | |
timer.ElapsedMilliseconds / rowNumber, | |
DateTime.Now); | |
} | |
writer.GracefulStop(TimeSpan.FromSeconds(200)).Wait(); | |
} | |
Console.WriteLine("finish"); | |
Console.ReadLine(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment