Created
October 20, 2020 08:17
-
-
Save hochgi/7eeb90b2d742968b07661e06ac148cc0 to your computer and use it in GitHub Desktop.
implementation of internal event logger lib backed by logback's rolling file appender
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sparkbeyond.engine.util.logging.eventslogger | |
import java.nio.charset.StandardCharsets | |
import akka.Done | |
import ch.qos.logback.core.rolling.{ | |
FixedWindowRollingPolicy, | |
RollingFileAppender, | |
SizeBasedTriggeringPolicy | |
} | |
import ch.qos.logback.core.util.FileSize | |
import ch.qos.logback.core.{ContextBase, LayoutBase} | |
import ch.qos.logback.core.encoder.LayoutWrappingEncoder | |
import com.sparkbeyond.commons.util.logging.SBLogger | |
import com.sparkbeyond.eventlogging.encoding.SBJson | |
import com.sparkbeyond.eventlogging.logic.{EventsLogger, SharedMetadata} | |
import com.sparkbeyond.eventlogging.logic.EventsLogger.EventLoggerSettings | |
import com.typesafe.config.Config | |
import com.typesafe.config.ConfigException.BadValue | |
import scala.concurrent.{ExecutionContext, Future} | |
object LogbackEventLogger { | |
def apply(sharedMetadata: SharedMetadata, | |
settings: EventLoggerSettings, | |
config: Config): LogbackEventLogger = { | |
val totalDirSize = config.getMemorySize("archive.overflow-size").toBytes | |
val maxFileSize = config.getMemorySize("max-size").toBytes | |
val rollingFiles = math.ceil(totalDirSize.toDouble / maxFileSize).intValue() | |
val eventLogFile = config.getString("dir.parent") + "/events.log" | |
val compression = config.getString("archive.compression") match { | |
case "none" => "" | |
case validCompressionMode @ ("gz"|"zip") => "." + validCompressionMode | |
case wrong => throw new BadValue("eventslogger.file.archive.compression", s"unrecognized compression[$wrong]") | |
} | |
val rollingPolicy = new FixedWindowRollingPolicy | |
rollingPolicy.setMaxIndex(rollingFiles) | |
rollingPolicy.setFileNamePattern(eventLogFile + ".%i" + compression) | |
new LogbackEventLogger(sharedMetadata, settings, eventLogFile, rollingPolicy, maxFileSize) | |
} | |
} | |
class LogbackEventLogger(override val sharedMetadata: SharedMetadata, | |
override val settings: EventLoggerSettings, | |
currentFile: String, | |
rollingPolicy: FixedWindowRollingPolicy, | |
maxFileSize: Long) extends EventsLogger with SBLogger { | |
private[this] val appender: RollingFileAppender[Event] = { | |
val a = new RollingFileAppender[Event] | |
a.setAppend(true) | |
a.setFile(currentFile) | |
val ctx = new ContextBase | |
a.setContext(ctx) | |
a.setEncoder { | |
val e = new LayoutWrappingEncoder[Event] | |
e.setLayout(new LayoutBase[Event] { | |
override def doLayout(event: Event): String = { | |
val eventJson = SBJson.JsonObj(Map("event" -> event.event, "metadata" -> event.metadata)) | |
SBJson.format(eventJson) + "\n" | |
} | |
}) | |
e.setCharset(StandardCharsets.UTF_8) | |
e | |
} | |
a.setName("EventAppender") | |
// a.setPrudent(true) // we have multiple JVMs (master & workers) that will write to same file | |
rollingPolicy.setContext(ctx) | |
a.setRollingPolicy(rollingPolicy) | |
rollingPolicy.setParent(a) | |
val triggeringPolicy = new SizeBasedTriggeringPolicy[Event] | |
triggeringPolicy.setMaxFileSize(new FileSize(maxFileSize)) | |
a.setTriggeringPolicy(triggeringPolicy) | |
a.setImmediateFlush(true) | |
rollingPolicy.start() | |
triggeringPolicy.start() | |
a.start() | |
a | |
} | |
private[this] val memoizedDone = Future.successful(Done) | |
override protected def doSend(event: Event)(implicit ec: ExecutionContext): Future[Done] = { | |
appender.doAppend(event) | |
memoizedDone | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment