Created
October 16, 2011 08:46
-
-
Save oluies/1290672 to your computer and use it in GitHub Desktop.
XMLToFileWriter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cusin | |
import xmlcusin.bo._ | |
import scalaxb.DataRecord | |
import scalax.io._ | |
import Resource._ | |
import java.util.concurrent.TimeUnit | |
import akka.actor.Actor.actorOf | |
import akka.actor.Actor | |
import akka.actor.ActorRef | |
import akka.dispatch.Dispatchers | |
import akka.routing.CyclicIterator | |
import akka.routing.Routing | |
import akka.dispatch.Future | |
import akka.dispatch.Futures | |
import akka.dispatch.Futures.future | |
import scala.collection.mutable.ArrayBuffer | |
import akka.actor.PoisonPill | |
import akka.actor.Actor.Timeout | |
import utils.Enum | |
import org.xml.sax.SAXParseException | |
import java.util.concurrent.TimeUnit | |
import com.yammer.metrics.Instrumented | |
object FileWriterBackend { | |
import Resource._ | |
implicit val codec = scala.io.Codec.ISO8859 | |
// The different actions available | |
object FileAction extends Enum { | |
sealed trait EnumVal extends Value | |
val INSERT = new EnumVal { val name = "INSERT" } | |
val DELETE = new EnumVal { val name = "DELETE" } | |
val UPDATE = new EnumVal { val name = "UPDATE" } | |
val REPORT = new EnumVal { val name = "REPORT" } | |
} | |
// The handled XML file parts are these | |
object FilePart extends Enum { | |
sealed trait EnumVal extends Value | |
val SUBSCRIPTION = new EnumVal { val name = "SUBSCRIPTION" } | |
val SERVICE = new EnumVal { val name = "SERVICE" } | |
val SERVICEATTRIBUTES = new EnumVal { val name = "SERVICEATTRIBUTES" } | |
} | |
// Some files has New and old parts, needs to be differentiated by the writers | |
object FilePartState extends Enum { | |
sealed trait EnumVal extends Value | |
val OLD = new EnumVal { val name = "OLD" } | |
val NEW = new EnumVal { val name = "NEW" } | |
} | |
// Actor messages | |
case class ParseLineRequest(id: Int, contents: String) | |
//case class ParseLineReply(id: Int, contents: String) | |
case class WriteRequest(id: Int, h: Header, fileAction: FileAction.EnumVal, contents: BodyOption) | |
// 8 threads in this pool | |
val backendDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("FileWriterBackend-dispatcher") | |
.setCorePoolSize(3) | |
.build | |
// actors 4 fileWriters and xmlparsers. One each of the writers (need to be singletins) | |
// dispatcher to the other filewriters (or could send to a db or wathever) | |
val writerService = actorOf[WriterService].start | |
// the xml parser | |
val xmlParserService = loadBalanced(8, actorOf[XMLParserService]) | |
// filewriter actors | |
val fileWriterServiceInsert = actorOf[FileWriterServiceInsert].start | |
val fileWriterServiceDelete = actorOf[FileWriterServiceDelete].start | |
val fileWriterServiceUpdate = actorOf[FileWriterServiceUpdate].start | |
// factory for load balanced actors | |
private def loadBalanced(poolSize: Int, actor: ⇒ ActorRef): ActorRef = { | |
val workers = Vector.fill(poolSize)(actor.start()) | |
Routing.loadBalancerActor(CyclicIterator(workers)).start() | |
} | |
// Parsers a string and sends it on to the filewriter | |
private class XMLParserService extends Actor with Instrumented { | |
self.dispatcher = backendDispatcher | |
lazy val newline = System.getProperty("line.separator") | |
lazy val xmlparserErrorsFile: Seekable = fromFile("C:\\temp\\XMLParserServiceErrors.txt") | |
private var counter = 0 | |
private var failCounter = 0 | |
private val xmlParseTimer = metrics.timer("xmlparse") | |
private val recordsMeter = metrics.meter("xmlrecords", "records") | |
private val errorRecordsMeter = metrics.meter("errorrecords", "records") | |
def receive = { | |
case ParseLineRequest(id, c) ⇒ { | |
counter = counter + 1 | |
recordsMeter.mark(1) | |
try{ | |
import scala.xml._ | |
val theXML = XML.loadString(c) // string to XML | |
val x = xmlParseTimer.time { scalaxb.fromXML[MobileSubscriptionServicesAndCustomer](theXML) } | |
val h = x.Header | |
x.Body.bodyoption match { | |
//case DataRecord(a,b,c) => print("\na="+ a + "\nb= " + b + "\nc=" + c) | |
case DataRecord(_, _, i: Subscription_insert) ⇒ writerService ! WriteRequest(id, h, FileAction.INSERT, i) | |
case DataRecord(_, _, u: Subscription_update) ⇒ writerService ! WriteRequest(id, h, FileAction.UPDATE, u) | |
case DataRecord(_, _, d: Subscription_delete) ⇒ writerService ! WriteRequest(id, h, FileAction.DELETE, d) | |
case DataRecord(a, b, c) ⇒ println("invalid choice " + a + b) | |
case _ ⇒ error("???") | |
} | |
} catch { | |
case xml: SAXParseException => | |
failCounter = failCounter +1 | |
errorRecordsMeter.mark(1) | |
xmlparserErrorsFile.append(xml.getMessage() + newline + c + newline) | |
case e: Exception => | |
failCounter = failCounter +1 | |
errorRecordsMeter.mark(1) | |
println(e.getMessage()) | |
} | |
} | |
} | |
} | |
// usses the action to send it on to the correct writer | |
private class WriterService extends Actor { | |
self.dispatcher = backendDispatcher | |
def receive = { | |
case m: WriteRequest ⇒ m.fileAction match { | |
case FileAction.INSERT ⇒ fileWriterServiceInsert ! m | |
case FileAction.DELETE ⇒ fileWriterServiceDelete ! m | |
case FileAction.UPDATE ⇒ fileWriterServiceUpdate ! m | |
case FileAction.REPORT ⇒ println("Report not handled") | |
} | |
} | |
} | |
trait FileWriter{ | |
val sep: String = "|" | |
val newline = System.getProperty("line.separator") | |
val fileWriterName:String | |
val hasOld: Boolean | |
val hasNew: Boolean | |
lazy val subscriptionFile: Seekable = fromFile("C:\\temp\\" + fileWriterName + ".txt") | |
lazy val servicesFile: Seekable = fromFile("C:\\temp\\" + fileWriterName + "_ServicesNew.txt") | |
lazy val servicesAttributeFile: Seekable = fromFile("C:\\temp\\" + fileWriterName + "_ServicesAttributesNew.txt") | |
lazy val servicesFileOld: Seekable = fromFile("C:\\temp\\" + fileWriterName + "_ServicesOld.txt") | |
lazy val servicesAttributeFileOld: Seekable = fromFile("C:\\temp\\" + fileWriterName + "_ServicesAttributesOld.txt") | |
protected def writeFileHeaders() { | |
// write headers | |
subscriptionFile.write(Subscription_data.headerString(sep) + newline) | |
if(hasNew){ | |
servicesFile.write(Service_data.headerString(sep) + newline) | |
servicesAttributeFile.write(Attribute.headerString(sep) + newline) | |
} | |
if(hasOld){ | |
servicesFileOld.write(Service_data.headerString(sep) + newline) | |
servicesAttributeFileOld.write(Attribute.headerString(sep) + newline) | |
} | |
} | |
protected def writeServices(h: Header, services: Seq[Service], oldOrNew: FilePartState.EnumVal = FilePartState.NEW) { | |
for (service ← services) { | |
val serviceText = h.mkString(sep) + sep + service.service_data.mkString(sep) + newline | |
oldOrNew match { | |
case FilePartState.NEW => servicesFile.append(serviceText) | |
case FilePartState.OLD => servicesFileOld.append(serviceText) | |
} | |
service.attributes match { | |
case Some(ao) ⇒ handleServiceAttributes(h, ao.attributestypeoption.map(_.value),oldOrNew) | |
case None ⇒ | |
} | |
} | |
} | |
private def handleServiceAttributes(h: Header, attributes: Seq[Attribute], oldOrNew: FilePartState.EnumVal) { | |
for (a ← attributes) { | |
val attributeText = h.mkString(sep) + sep + a.mkString(sep) + newline | |
oldOrNew match { | |
case FilePartState.NEW => servicesAttributeFile.append(attributeText) | |
case FilePartState.OLD => servicesAttributeFileOld.append(attributeText) | |
} | |
} | |
} | |
} | |
private class FileWriterServiceInsert extends Actor with FileWriter { | |
self.dispatcher = backendDispatcher | |
val fileWriterName:String = "Subscription_Insert" | |
val hasNew = true; | |
val hasOld = false; | |
writeFileHeaders() | |
def receive = { | |
case WriteRequest(id, h, fileAction, contents) ⇒ { | |
contents match { | |
case i: Subscription_insert ⇒ { | |
val subscriptionTxt = h.mkString(sep) + sep + i.subscription_new.subscription.subscription_data.mkString(sep) + newline | |
subscriptionFile.append(subscriptionTxt) | |
i.subscription_new.services match{ | |
case Some(services) => writeServices(h,services.servicesoption.map(_.value)) | |
case None => | |
} | |
} | |
} | |
} | |
} | |
} | |
private class FileWriterServiceDelete extends Actor with FileWriter { | |
self.dispatcher = backendDispatcher | |
val fileWriterName:String = "Subscription_Delete" | |
val hasNew = false; | |
val hasOld = true; | |
writeFileHeaders() | |
def receive = { | |
case WriteRequest(id, h, fileAction, contents) ⇒ { | |
contents match { | |
case d: Subscription_delete ⇒ { | |
val subscriptionTxt = h.mkString(sep) + sep + d.subscription_old.subscription.subscription_data.mkString(sep) + newline | |
subscriptionFile.append(subscriptionTxt) | |
d.subscription_old.services match{ | |
case Some(services) => writeServices(h,services.servicesoption.map(_.value),FilePartState.OLD) | |
case None => | |
} | |
} | |
} | |
} | |
} | |
} | |
private class FileWriterServiceUpdate extends Actor with FileWriter { | |
self.dispatcher = backendDispatcher | |
val fileWriterName:String = "Subscription_Update" | |
val hasNew = true; | |
val hasOld = true; | |
writeFileHeaders() | |
def receive = { | |
case WriteRequest(id, h, fileAction, contents) ⇒ { | |
contents match { | |
case u: Subscription_update ⇒ { | |
val subscriptionTxt = h.mkString(sep) + sep + u.subscription_new.subscription.subscription_data.mkString(sep) + newline | |
subscriptionFile.append(subscriptionTxt) | |
u.subscription_new.services match{ | |
case Some(services) => writeServices(h,services.servicesoption.map(_.value)) | |
case None => | |
} | |
u.subscription_old.services match{ | |
case Some(services) => writeServices(h,services.servicesoption.map(_.value),FilePartState.OLD) | |
case None => | |
} | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment