Skip to content

Instantly share code, notes, and snippets.

@oluies
Created October 16, 2011 08:46
Show Gist options
  • Save oluies/1290672 to your computer and use it in GitHub Desktop.
Save oluies/1290672 to your computer and use it in GitHub Desktop.
XMLToFileWriter
package cusin
import xmlcusin.bo._
import scalaxb.DataRecord
import scalax.io._
import Resource._
import java.util.concurrent.TimeUnit
import akka.actor.Actor.actorOf
import akka.actor.Actor
import akka.actor.ActorRef
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.Routing
import akka.dispatch.Future
import akka.dispatch.Futures
import akka.dispatch.Futures.future
import scala.collection.mutable.ArrayBuffer
import akka.actor.PoisonPill
import akka.actor.Actor.Timeout
import utils.Enum
import org.xml.sax.SAXParseException
import java.util.concurrent.TimeUnit
import com.yammer.metrics.Instrumented
object FileWriterBackend {
import Resource._
implicit val codec = scala.io.Codec.ISO8859
// The different actions available
object FileAction extends Enum {
sealed trait EnumVal extends Value
val INSERT = new EnumVal { val name = "INSERT" }
val DELETE = new EnumVal { val name = "DELETE" }
val UPDATE = new EnumVal { val name = "UPDATE" }
val REPORT = new EnumVal { val name = "REPORT" }
}
// The handled XML file parts are these
object FilePart extends Enum {
sealed trait EnumVal extends Value
val SUBSCRIPTION = new EnumVal { val name = "SUBSCRIPTION" }
val SERVICE = new EnumVal { val name = "SERVICE" }
val SERVICEATTRIBUTES = new EnumVal { val name = "SERVICEATTRIBUTES" }
}
// Some files has New and old parts, needs to be differentiated by the writers
object FilePartState extends Enum {
sealed trait EnumVal extends Value
val OLD = new EnumVal { val name = "OLD" }
val NEW = new EnumVal { val name = "NEW" }
}
// Actor messages
case class ParseLineRequest(id: Int, contents: String)
//case class ParseLineReply(id: Int, contents: String)
case class WriteRequest(id: Int, h: Header, fileAction: FileAction.EnumVal, contents: BodyOption)
// 8 threads in this pool
val backendDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("FileWriterBackend-dispatcher")
.setCorePoolSize(3)
.build
// actors 4 fileWriters and xmlparsers. One each of the writers (need to be singletins)
// dispatcher to the other filewriters (or could send to a db or wathever)
val writerService = actorOf[WriterService].start
// the xml parser
val xmlParserService = loadBalanced(8, actorOf[XMLParserService])
// filewriter actors
val fileWriterServiceInsert = actorOf[FileWriterServiceInsert].start
val fileWriterServiceDelete = actorOf[FileWriterServiceDelete].start
val fileWriterServiceUpdate = actorOf[FileWriterServiceUpdate].start
// factory for load balanced actors
private def loadBalanced(poolSize: Int, actor: ⇒ ActorRef): ActorRef = {
val workers = Vector.fill(poolSize)(actor.start())
Routing.loadBalancerActor(CyclicIterator(workers)).start()
}
// Parsers a string and sends it on to the filewriter
private class XMLParserService extends Actor with Instrumented {
self.dispatcher = backendDispatcher
lazy val newline = System.getProperty("line.separator")
lazy val xmlparserErrorsFile: Seekable = fromFile("C:\\temp\\XMLParserServiceErrors.txt")
private var counter = 0
private var failCounter = 0
private val xmlParseTimer = metrics.timer("xmlparse")
private val recordsMeter = metrics.meter("xmlrecords", "records")
private val errorRecordsMeter = metrics.meter("errorrecords", "records")
def receive = {
case ParseLineRequest(id, c) ⇒ {
counter = counter + 1
recordsMeter.mark(1)
try{
import scala.xml._
val theXML = XML.loadString(c) // string to XML
val x = xmlParseTimer.time { scalaxb.fromXML[MobileSubscriptionServicesAndCustomer](theXML) }
val h = x.Header
x.Body.bodyoption match {
//case DataRecord(a,b,c) => print("\na="+ a + "\nb= " + b + "\nc=" + c)
case DataRecord(_, _, i: Subscription_insert) ⇒ writerService ! WriteRequest(id, h, FileAction.INSERT, i)
case DataRecord(_, _, u: Subscription_update) ⇒ writerService ! WriteRequest(id, h, FileAction.UPDATE, u)
case DataRecord(_, _, d: Subscription_delete) ⇒ writerService ! WriteRequest(id, h, FileAction.DELETE, d)
case DataRecord(a, b, c) ⇒ println("invalid choice " + a + b)
case _ ⇒ error("???")
}
} catch {
case xml: SAXParseException =>
failCounter = failCounter +1
errorRecordsMeter.mark(1)
xmlparserErrorsFile.append(xml.getMessage() + newline + c + newline)
case e: Exception =>
failCounter = failCounter +1
errorRecordsMeter.mark(1)
println(e.getMessage())
}
}
}
}
// usses the action to send it on to the correct writer
private class WriterService extends Actor {
self.dispatcher = backendDispatcher
def receive = {
case m: WriteRequest ⇒ m.fileAction match {
case FileAction.INSERT ⇒ fileWriterServiceInsert ! m
case FileAction.DELETE ⇒ fileWriterServiceDelete ! m
case FileAction.UPDATE ⇒ fileWriterServiceUpdate ! m
case FileAction.REPORT ⇒ println("Report not handled")
}
}
}
trait FileWriter{
val sep: String = "|"
val newline = System.getProperty("line.separator")
val fileWriterName:String
val hasOld: Boolean
val hasNew: Boolean
lazy val subscriptionFile: Seekable = fromFile("C:\\temp\\" + fileWriterName + ".txt")
lazy val servicesFile: Seekable = fromFile("C:\\temp\\" + fileWriterName + "_ServicesNew.txt")
lazy val servicesAttributeFile: Seekable = fromFile("C:\\temp\\" + fileWriterName + "_ServicesAttributesNew.txt")
lazy val servicesFileOld: Seekable = fromFile("C:\\temp\\" + fileWriterName + "_ServicesOld.txt")
lazy val servicesAttributeFileOld: Seekable = fromFile("C:\\temp\\" + fileWriterName + "_ServicesAttributesOld.txt")
protected def writeFileHeaders() {
// write headers
subscriptionFile.write(Subscription_data.headerString(sep) + newline)
if(hasNew){
servicesFile.write(Service_data.headerString(sep) + newline)
servicesAttributeFile.write(Attribute.headerString(sep) + newline)
}
if(hasOld){
servicesFileOld.write(Service_data.headerString(sep) + newline)
servicesAttributeFileOld.write(Attribute.headerString(sep) + newline)
}
}
protected def writeServices(h: Header, services: Seq[Service], oldOrNew: FilePartState.EnumVal = FilePartState.NEW) {
for (service ← services) {
val serviceText = h.mkString(sep) + sep + service.service_data.mkString(sep) + newline
oldOrNew match {
case FilePartState.NEW => servicesFile.append(serviceText)
case FilePartState.OLD => servicesFileOld.append(serviceText)
}
service.attributes match {
case Some(ao) ⇒ handleServiceAttributes(h, ao.attributestypeoption.map(_.value),oldOrNew)
case None ⇒
}
}
}
private def handleServiceAttributes(h: Header, attributes: Seq[Attribute], oldOrNew: FilePartState.EnumVal) {
for (a ← attributes) {
val attributeText = h.mkString(sep) + sep + a.mkString(sep) + newline
oldOrNew match {
case FilePartState.NEW => servicesAttributeFile.append(attributeText)
case FilePartState.OLD => servicesAttributeFileOld.append(attributeText)
}
}
}
}
private class FileWriterServiceInsert extends Actor with FileWriter {
self.dispatcher = backendDispatcher
val fileWriterName:String = "Subscription_Insert"
val hasNew = true;
val hasOld = false;
writeFileHeaders()
def receive = {
case WriteRequest(id, h, fileAction, contents) ⇒ {
contents match {
case i: Subscription_insert ⇒ {
val subscriptionTxt = h.mkString(sep) + sep + i.subscription_new.subscription.subscription_data.mkString(sep) + newline
subscriptionFile.append(subscriptionTxt)
i.subscription_new.services match{
case Some(services) => writeServices(h,services.servicesoption.map(_.value))
case None =>
}
}
}
}
}
}
private class FileWriterServiceDelete extends Actor with FileWriter {
self.dispatcher = backendDispatcher
val fileWriterName:String = "Subscription_Delete"
val hasNew = false;
val hasOld = true;
writeFileHeaders()
def receive = {
case WriteRequest(id, h, fileAction, contents) ⇒ {
contents match {
case d: Subscription_delete ⇒ {
val subscriptionTxt = h.mkString(sep) + sep + d.subscription_old.subscription.subscription_data.mkString(sep) + newline
subscriptionFile.append(subscriptionTxt)
d.subscription_old.services match{
case Some(services) => writeServices(h,services.servicesoption.map(_.value),FilePartState.OLD)
case None =>
}
}
}
}
}
}
private class FileWriterServiceUpdate extends Actor with FileWriter {
self.dispatcher = backendDispatcher
val fileWriterName:String = "Subscription_Update"
val hasNew = true;
val hasOld = true;
writeFileHeaders()
def receive = {
case WriteRequest(id, h, fileAction, contents) ⇒ {
contents match {
case u: Subscription_update ⇒ {
val subscriptionTxt = h.mkString(sep) + sep + u.subscription_new.subscription.subscription_data.mkString(sep) + newline
subscriptionFile.append(subscriptionTxt)
u.subscription_new.services match{
case Some(services) => writeServices(h,services.servicesoption.map(_.value))
case None =>
}
u.subscription_old.services match{
case Some(services) => writeServices(h,services.servicesoption.map(_.value),FilePartState.OLD)
case None =>
}
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment