Created
April 22, 2019 16:07
-
-
Save hadoopsters/3fd7c585d3e320ab0daaede801ee9a7b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tv.spotx.scala.dbutils | |
import org.apache.logging.log4j.scala.Logging | |
import scalaj.http.{Http, HttpOptions, HttpResponse} | |
case class InfluxConfig(hostname: String = "console", | |
port: Int = 8086, // scalastyle:off magic.number | |
database: String = "devtest", | |
ssl: Boolean = false, | |
username: Option[String] = None, | |
password: Option[String] = None) | |
/** | |
* Influx precision enumeration. | |
* @see https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint | |
*/ | |
object InfluxPrecision extends Enumeration { | |
type InfluxPrecision = Value | |
val ns = Value("ns") | |
val u = Value("u") | |
val ms = Value("ms") | |
val s = Value("s") | |
val m = Value("m") | |
val h = Value("h") | |
} | |
/** Implement a generic InfluxDB Writer */ | |
abstract class InfluxDBWriter extends Serializable with Logging { | |
/** | |
* Optional precision; will default to seconds. | |
*/ | |
protected var _precision: InfluxPrecision.Value = InfluxPrecision.s | |
/** | |
* Set precision. | |
* @see https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint | |
* @param newValue Preferred precision. | |
*/ | |
def precision_= (newValue: InfluxPrecision.Value): Unit = _precision = newValue // scalastyle:off | |
/** | |
* Return precision. | |
* @return InfluxPrecision | |
*/ | |
def precision: InfluxPrecision.Value = _precision | |
/** | |
* Invoke a write on influx. | |
* @see https://docs.influxdata.com/influxdb/v1.7/introduction/getting-started/ | |
* @param database The database containing the measurement to write to. | |
* @param measurement The measurement to write to. | |
* @param tags A sequence of an arbitrary number of tags in key/value tuples. | |
* @param fields A sequence of an arbitrary number of fields in key/value tuples. | |
* @return HttpResponse from api write. | |
*/ | |
def write(database: String, | |
measurement: String, | |
tags: Seq[(String, Any)], | |
fields: Seq[(String, Any)]): HttpResponse[String] | |
/** | |
* Sanitises data string for use as a tag or field value. | |
* @param data Arbitrary data string. | |
* @return Sanitised data. | |
*/ | |
def prepareInfluxData(data: String): String = data.replaceAll("\n", "") | |
.replaceAll(",", raw"\\,") | |
.replaceAll("=", raw"\\=") | |
.replaceAll(" ", raw"\\ ") | |
.replaceAll("\"", "'") | |
/** | |
* Generate a clause containing tags and fields for influx write api. | |
* @param tags A sequence of an arbitrary number of tags in key/value tuples. | |
* @param fields A sequence of an arbitrary number of fields in key/value tuples. | |
* @return Stringbuilder containing clause. | |
*/ | |
protected def generateTagFieldClause(tags: Seq[(String, Any)], fields: Seq[(String, Any)]): String = { | |
// First the tags. | |
val buff = new StringBuilder() | |
buff.append(tags.map(x => s"${x._1}=${x._2}").mkString(",")) | |
buff.append(" ") | |
// Then the fields. | |
// Strings need to be wrapped in quotes. | |
val formattedFields = fields.map(x => x._2 match { | |
case _: String => | |
val fieldBuf = new StringBuilder(x._1) | |
fieldBuf.append("=") | |
fieldBuf.append("\"") | |
fieldBuf.append(x._2.toString) | |
fieldBuf.append("\"") | |
fieldBuf.toString() | |
case _ => | |
s"${x._1}=${x._2}" | |
}) | |
buff.append(formattedFields.mkString(",")).toString() | |
} | |
} | |
/** Implements an InfluxDBWriter that writes one line to the API */ | |
class InfluxHTTPWriter(host: String, | |
port: Int = 8086, | |
ssl: Boolean = false, | |
username: String = "", | |
password: String = "") extends InfluxDBWriter { | |
/** | |
* Validates the combination of ssl related fields passed through the constructor. | |
* An alternative option would be to validate this in a constructor and throw an exception | |
* in the event of an invalid combination of fields. | |
* @return Boolean indicating we have a useful set of credentials. | |
*/ | |
protected def hasValidCredentials: Boolean = (ssl && username.nonEmpty && password.nonEmpty) || !ssl | |
override def write(database: String, | |
measurement: String, | |
tags: Seq[(String, Any)], | |
fields: Seq[(String, Any)]): HttpResponse[String] = if (hasValidCredentials) { | |
val scheme = if (ssl) { | |
"https" | |
} else { | |
"http" | |
} | |
val endpointUrl = s"$scheme://$host:$port/write?db=$database&precision=${_precision}" | |
val http = Http(endpointUrl) | |
if (ssl) { | |
http.options(HttpOptions.allowUnsafeSSL).auth(username, password) | |
} | |
val buffPost = new StringBuilder(measurement) | |
if (tags.nonEmpty || fields.nonEmpty) { | |
buffPost.append(",") | |
buffPost.append(generateTagFieldClause(tags, fields)) | |
} | |
http.postData(buffPost.toString()).asString | |
} else { | |
logger.error("Username and/or password is empty! Provide official credentials.") | |
HttpResponse[String]("", 403, Map()) | |
} | |
} | |
/** Implements an InfluxDBWriter that writes the influx write to the console */ | |
class InfluxConsoleWriter(output: Int) extends InfluxDBWriter { | |
override def write(database: String, | |
measurement: String, | |
tags: Seq[(String, Any)], | |
fields: Seq[(String, Any)] | |
): HttpResponse[String] = { | |
output match { | |
case 1 => System.out | |
case _ => System.err | |
} | |
logger.info(s"influx-write:///write?db=$database&precision=${_precision}/$measurement," + | |
generateTagFieldClause(tags, fields).toString() | |
) | |
HttpResponse[String]("", 204, Map()) | |
} | |
} | |
/** Called to create an instance of the InfluxDBWriter from the possible classes */ | |
object InfluxDBWriter { | |
def create(host: String, | |
port: Int = 8086, | |
ssl: Boolean = false, | |
username: String = "", | |
password: String = ""): InfluxDBWriter = if (host == "console") { | |
new InfluxConsoleWriter(port) | |
} else { | |
new InfluxHTTPWriter(host, port, ssl, username, password) | |
} | |
} | |
/** Implement a generic InfluxDB Bulk Writer */ | |
abstract class InfluxDBBulkWriter { | |
def write(database: String, value: String) | |
} | |
/** Implement an InfluxDBBulkWriter and post numerous rows in one api call */ | |
class InfluxHTTPBulkWriter(host: String, port: Int = 8086) extends InfluxDBBulkWriter { | |
override def write(database: String, value: String): Unit = | |
Http(s"http://$host:$port/write?db=$database").postData( | |
value | |
).asString | |
} | |
/** Implement an InfluxDBBulkWRiter that writes what would be posted in bulk | |
* to the console. */ | |
class InfluxConsoleBulkWriter(output: Int) extends InfluxDBBulkWriter with Logging { | |
override def write(database: String, value: String): Unit = { | |
output match { | |
case 1 => System.out | |
case _ => System.err | |
} | |
logger.info(s"Would post to Influx db $database, with data \n $value") | |
} | |
} | |
/** Called to create an instance of the InfluxDBBulkWriter from the possible classes */ | |
object InfluxDBBulkWriter { | |
def create(host: String, port: Int = 8086): InfluxDBBulkWriter = if (host == "console") { | |
new InfluxConsoleBulkWriter(port) | |
} else { | |
new InfluxHTTPBulkWriter(host, port) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment