Skip to content

Instantly share code, notes, and snippets.

@a-agmon
a-agmon / gql_2.go
Last active September 12, 2023 06:41
import (
...
"database/sql"
"database/sql/driver"
"github.com/marcboeker/go-duckdb"
...
)
// the driver struct wrapping the duckdb connection
type DuckDBDriver struct {
@a-agmon
a-agmon / gql_1.go
Last active September 13, 2023 05:45
type DAO struct {
driver DataDriver
}
// the data we want to load
QUsersTable := "CREATE TABLE users AS SELECT name, last_name, cast(age as integer)"
+ "FROM read_parquet('%s') "
// the parquet files location
case class PredictionResult(key: String, ts:Timestamp,
label: Float, prediction: Float, ratio: Float)
private def predictXGBBooster(app_id: String, booster: Booster,
predictSeq: Seq[FeaturesRecord]):
Try[Seq[PredictionResult]] = Try {
val forecastedVal = booster.predict(predictSeq.toDMatrix)
predictSeq.zip(forecastedVal).map { case (FeaturesRecord(_, ts, _, label), forecast) =>
implicit class DMatrixConverter(seq: Seq[FeaturesRecord]) {
def toDMatrix: DMatrix = {
val labeledPoints = seq.map { case FeaturesRecord(_, _, features, label) =>
LabeledPoint(label, features.size, null, features.toArray)
}
new DMatrix(labeledPoints.iterator)
}
}
private def trainXGBBooster(trainSeq: Seq[FeaturesRecord]):
Try[Booster] = Try {
XGBoost.train(trainSeq.toDMatrix,
Map("eta" -> 0.1f, "max_depth" -> 4, "objective" -> "reg:squarederror"), 50)
}
@a-agmon
a-agmon / ts3.scala
Last active February 26, 2023 12:18
private def getForecastDatasets(app_id: String,
recordsIterator: Seq[FeaturesRecord]):
Try[(Seq[FeaturesRecord], Seq[FeaturesRecord])] = Try {
val dataPoints = recordsIterator.sortBy(_.ts.getTime)
// ignoring the last observation as it might be be partial
val trainSeq = dataPoints.slice(0, dataPoints.length - 3)
val actualValSeq = dataPoints.slice(dataPoints.length - 3, dataPoints.length - 1)
(trainSeq, actualValSeq)
}
@a-agmon
a-agmon / ts2.scala
Last active February 26, 2023 12:08
case class PredictionResult(key: String, ts:Timestamp,
label: Float, prediction: Float, ratio: Float)
def predict(appId:String, recordsIter:Iterator[FeaturesRecord]):
Seq[PredictionResult] = {
val predDF = for {
(trainSeq, actualValSeq) <- getForecastDataset(appId, recordsIter.toSeq)
booster <- trainXGBBooster(trainSeq)
@a-agmon
a-agmon / ts1.scala
Last active February 26, 2023 07:26
case class FeaturesRecord(key: String, ts:Timestamp,
features: Seq[Float], label: Float)
private def getFeaturesDataFrame(df: DataFrame): Try[Dataset[FeaturesRecord]] = Try {
df.map(row => {
val key = row.getAs[String]("app_id")
val label = row.getAs[Int]("installs").toFloat
val ts = row.getAs[Timestamp]("event_hour")
val dayOfWeek = row.getAs[Int]("day_of_week").toFloat
val hourOfDay = row.getAs[Int]("hour_of_day").toFloat
def getStreamTopology(inputTopic:String):Topology = {
val builder = new StreamsBuilder()
val reqStream = builder.stream[String, PredictRequest](inputTopic)
reqStream
.map( (_, request) => {
Classifier.predict(request.recordID, request.featuresVector)
})
.split()
private def getInputVector(rawVector:Seq[Float]): DMatrix = {
val nRows = 1
val nCols = rawVector.length
val missingVal = Float.NaN
new DMatrix(rawVector.toArray[Float], nRows, nCols, missingVal)
}
def predict(recordID:String, features:Seq[Float]): (String, Float) = {
val xgbInput = getInputVector(features)