Created
October 9, 2015 15:45
-
-
Save dat-vikash/ae93b96466b110f5a8fe to your computer and use it in GitHub Desktop.
geoip sink example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait EventStoreServiceWithLogstashSink extends EventStoreService | |
{ | |
this: EventStore => | |
// set config | |
val config = EventStoreCollectorConfig(logstashEndpoint = Some(Application.CONFIGS.get("logstash-endpoint").get.convertTo[String])) | |
case class MyCustomEvent(fact: Any, | |
timestamp: Option[Long] = None, | |
apiVersion: Option[String] = None, | |
originHost: Option[String]= None, | |
originService: Option[String] = None, | |
httpData: Option[HttpHeaderData], | |
nonce: Option[Long] = None | |
) extends EventBase { | |
override def toJson : String = | |
s""" | |
{"resourceURI" : ${stringLiteralForJson(resourceURI)}, | |
"apiVersion": ${stringLiteralForJson(apiVersion)}, | |
"checksum": ${stringLiteralForJson(checksum)}, | |
"originHost": ${stringLiteralForJson(originHost)}, | |
"originService": ${stringLiteralForJson(originService)}, | |
"userAgent": ${httpData.map(d => stringLiteralForJson(d.userAgent)).orNull}, | |
"acceptCharset": ${httpData.map(d => stringLiteralForJson(d.acceptCharset)).orNull}, | |
"acceptDateTime": ${httpData.map(d => stringLiteralForJson(d.acceptDatetime)).orNull}, | |
"authorization": ${httpData.map(d => stringLiteralForJson(d.authorization)).orNull}, | |
"contentLength": ${httpData.map(d => d.contentLength).orNull}, | |
"contentType": ${httpData.map(d => stringLiteralForJson(d.contentType)).orNull}, | |
"server": ${httpData.map(d => stringLiteralForJson(d.server)).orNull}, | |
"date": ${httpData.map(d => stringLiteralForJson(d.date)).orNull}, | |
"nonce": ${nonce.orNull}, | |
"fact": $fact | |
} """.stripLineEnd.stripMargin | |
} | |
object ConcreteSink extends LogstashSink(Some(config), Application.system ) | |
implicit def geoipRequestToEvent(event: HttpRequest, fact: Any) : Option[MyCustomEvent]= | |
{ | |
import spray.http.HttpHeaders._ | |
// create our event type | |
val userAgent = event.headers.find(header => header.name.equals(`User-Agent`.name)) match | |
{ | |
case Some(v) => v.value | |
case None => "" | |
} | |
val acceptCharset = event.headers.filter(header => header.name.equals(`Accept-Charset`.name)).headOption match | |
{ | |
case Some(v) => v.value | |
case None => "" | |
} | |
val acceptDatetime = event.headers.filter(header => header.name.equals(`Date`.name)).headOption match | |
{ | |
case Some(v) => v.value | |
case None => "" | |
} | |
val authorization = event.headers.filter(header => header.name.equals(`Authorization`.name)).headOption match | |
{ | |
case Some(v) => v.value | |
case None => "" | |
} | |
val contentLength = event.headers.filter(header => header.name.equals(`Content-Length`.name)).headOption match | |
{ | |
case Some(v) => v.value.toInt | |
case None => 0 | |
} | |
val contentType = event.headers.filter(header => header.name.equals(`Content-Type`.name)).headOption match | |
{ | |
case Some(v) => v.value | |
case None => "" | |
} | |
val host = event.headers.filter(header => header.name.equals(`Host`.name)).headOption match | |
{ | |
case Some(v) => v.value | |
case None => "" | |
} | |
val remoteAddress = event.headers.filter(header => header.name.equals(`Remote-Address`.name)).headOption match | |
{ | |
case Some(v) => v.value | |
case None => "" | |
} | |
val accept = event.headers.filter(header => header.name.equals(`Accept`.name)).headOption match | |
{ | |
case Some(v) => v.value | |
case None => "" | |
} | |
val requestUri = event.uri.toString() | |
val httpHeaderData = HttpHeaderData(userAgent = Some(userAgent), | |
acceptCharset = Some(acceptCharset), | |
acceptDatetime = Some(acceptDatetime), | |
authorization = Some(authorization), | |
contentLength = Some(contentLength), | |
contentType = Some(contentType), | |
host = Some(host), | |
requestUri = Some(requestUri), | |
accept = Some(accept), | |
remoteAddress = Some(remoteAddress)) | |
Some(MyCustomEvent(httpData = Some(httpHeaderData), fact = fact)) | |
} | |
def putItem(jsonFactToInsert: spray.json.JsValue, ctx: RequestContext): Unit = | |
{ | |
geoipRequestToEvent(ctx.request, jsonFactToInsert.toString()) match { | |
case Some(event) => ConcreteSink.persist(event).onComplete { | |
case Success(event) => | |
ctx.complete(StatusCodes.Created, "Sent to logstash") | |
eventStoreServiceLog.info("Link info data saved to Logstash") | |
case Failure(error) => | |
eventStoreServiceLog.error("Error occurred with EventStore",error) | |
ctx.complete(StatusCodes.InternalServerError, error.getMessage) | |
} | |
case None => | |
eventStoreServiceLog.info("Unable to marshall to Event") | |
ctx.complete(StatusCodes.InternalServerError, "Unable to marshall to Event") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment