Last active
August 29, 2015 13:57
-
-
Save alexanderdean/9588012 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Loader for Thrift SnowplowRawEvent objects which | |
* are inbound as a simple Byte Array. | |
*/ | |
object ThriftByteArrayLoader extends CollectorLoader[Array[Byte]] { | |
private val thriftDeserializer = new TDeserializer | |
/** | |
* Converts the source string into a MaybeCanonicalInput. | |
* | |
* @param line A serialized Thrift object Byte array mapped to a String. | |
* The method calling this should encode the serialized object | |
* with `snowplowRawEventBytes.map(_.toChar)`. | |
* Reference: http://stackoverflow.com/questions/5250324/ | |
* @return either a set of validation errors or an Option-boxed | |
* CanonicalInput object, wrapped in a Scalaz ValidatioNel. | |
*/ | |
def toCanonicalInput(line: Array[Byte]): ValidatedMaybeCanonicalInput = { | |
var snowplowRawEvent = new SnowplowRawEvent() | |
try { | |
this.synchronized { | |
thriftDeserializer.deserialize( | |
snowplowRawEvent, | |
line | |
) | |
} | |
ThriftRawEventLoader.toCanonicalInput(snowplowRawEvent) | |
} catch { | |
// TODO: Check for deserialization errors. | |
case _: Throwable => | |
"Record does not match Thrift SnowplowRawEvent schema".failNel[Option[CanonicalInput]] | |
} | |
} | |
} | |
/** | |
* Loader for Thrift SnowplowRawEvent objects which have | |
* already been unmarshalled. | |
*/ | |
object ThriftRawEventLoader extends CollectorLoader[ThriftRawEvent] { | |
/** | |
* Converts the source string into a MaybeCanonicalInput. | |
* | |
* @param line A XXX | |
* @return either a set of validation errors or an Option-boxed | |
* CanonicalInput object, wrapped in a Scalaz ValidatioNel. | |
*/ | |
def toCanonicalInput(line: ThriftRawEvent): ValidatedMaybeCanonicalInput = { | |
val payload = TrackerPayload.extractGetPayload( | |
Option(line.payload.data), | |
line.encoding | |
) | |
val ip = line.ipAddress.some // Required | |
val hostname = Option(line.hostname) | |
val userAgent = Option(line.userAgent) | |
val refererUri = Option(line.refererUri) | |
val networkUserId = Option(line.networkUserId) | |
val headers = Option(line.headers) | |
.map(_.toList).getOrElse(Nil) | |
(payload.toValidationNel) map { (p: NameValueNel) => | |
Some( | |
CanonicalInput( | |
new DateTime(line.timestamp, DateTimeZone.UTC), | |
new NvGetPayload(p), | |
InputSource(line.collector, hostname), | |
line.encoding, | |
ip, | |
userAgent, | |
refererUri, | |
headers, | |
networkUserId | |
) | |
) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment