Last active
May 7, 2023 15:45
-
-
Save dacr/22b5389155471058438024c0918441fc to your computer and use it in GitHub Desktop.
Generate stock time series / published by https://github.com/dacr/code-examples-manager #e10d61c2-2916-43ed-a9db-af50e9ed1665/7d538ba91faf74196fc19e2943fb07ed2191455
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Generate stock time series | |
// keywords : scala, elasticsearch, feed, bigdata, fractal, mandelbrot | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : e10d61c2-2916-43ed-a9db-af50e9ed1665 | |
// created-on : 2019-11-24T15:53:31Z | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc' | |
import $ivy.`com.sksamuel.elastic4s::elastic4s-core:7.3.1` | |
import $ivy.`com.sksamuel.elastic4s::elastic4s-client-esjava:7.3.1` | |
import $ivy.`com.sksamuel.elastic4s::elastic4s-json-json4s:7.3.1` | |
import $ivy.`org.json4s::json4s-native:3.6.7` | |
import $ivy.`org.json4s::json4s-ext:3.6.7` | |
import org.json4s.{DefaultFormats, native} | |
import org.json4s.ext.JavaTimeSerializers | |
import java.time.{Instant, OffsetDateTime, ZoneId, ZoneOffset} | |
import java.time.format.DateTimeFormatter | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, Response} | |
import com.sksamuel.elastic4s.http.JavaClient | |
import com.sksamuel.elastic4s.ElasticDsl._ | |
import com.sksamuel.elastic4s.json4s.ElasticJson4s.Implicits._ | |
import com.sksamuel.elastic4s.requests.mappings._ | |
import com.sksamuel.elastic4s.requests.mappings.FieldType._ | |
import scala.concurrent._ | |
import scala.util.Properties.{envOrNone, propOrNone} | |
import scala.concurrent.duration._ | |
import com.sksamuel.elastic4s.requests.count.CountResponse | |
import com.sksamuel.elastic4s.requests.indexes.admin.DeleteIndexResponse | |
import com.sksamuel.elastic4s.requests.indexes.admin.RefreshIndexResponse | |
class Generator(elasticEndPoints:String) { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
implicit val serialization = native.Serialization | |
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all | |
val indexBaseName = "stocks-" | |
val indexNamePattern = "stocks-*" | |
val mappingName = "stocks-mapping" | |
// Customize the default configuration, we're going to insert a huge amount of data in a unclean but fast way | |
val client = ElasticClient(JavaClient(ElasticProperties(elasticEndPoints))) | |
def now(): Long = System.currentTimeMillis() | |
def doClean(): Future[Response[DeleteIndexResponse]] = { | |
client.execute { | |
deleteIndex(indexNamePattern) | |
} | |
} | |
def doRefresh(): Future[Response[RefreshIndexResponse]] = { | |
client.execute { | |
refreshIndex(indexNamePattern) | |
} | |
} | |
def doCount(): Future[Response[CountResponse]] = { | |
client.execute { | |
count(indexNamePattern) | |
} | |
} | |
val dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("Europe/Paris")) | |
def insertBulk(entries: Seq[StockEntry]) = client.execute { | |
bulk { | |
for {entry <- entries} yield { | |
val dateSuffix = dateFormat.format(entry.timestamp) | |
val indexName = indexBaseName + "-" + dateSuffix | |
indexInto(indexName).doc(entry) | |
} | |
} | |
} | |
def doCreateIndexMapping() = client.execute { | |
createIndexTemplate(mappingName, indexNamePattern).mappings( | |
properties() as Seq( | |
dateField("timestamp"), | |
keywordField("name"), | |
doubleField("value"), | |
) | |
) | |
} | |
// 2019-11-18 18:53:48,864 | |
val timestampFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS").withZone(ZoneId.of("Europe/Paris")) | |
case class StockEntry( | |
timestamp:OffsetDateTime, | |
name:String, | |
value: Double | |
) | |
def generate(fromDate:OffsetDateTime, toDate:OffsetDateTime, stockNames:List[String], dailyTransactionsCount:Int) { | |
val startedAt = now() | |
def writeData(): Future[Unit] = Future { | |
val iterator = logsDir.list(_.extension == Some(".log")).toSeq.flatMap(_.lineIterator).filter(_.contains(" AccessLog:")) | |
val groupedIterator = iterator.grouped(100) | |
val parallelismLevel = 10 | |
while (groupedIterator.hasNext) { | |
print("*" * parallelismLevel) | |
val groups = groupedIterator.take(parallelismLevel).map { group => | |
insertBulk(group.flatMap(parseAccessLogEntry)) | |
} | |
Future.sequence(groups).await | |
} | |
} | |
val futureResponse = for { | |
cleaned <- doClean() // delete any existing indexBaseName | |
created <- doCreateIndexMapping() // create the right index pattern | |
responses <- writeData() // bulk operation insert all events | |
refreshed <- doRefresh() | |
count <- doCount() | |
} yield { | |
count | |
} | |
Await.result(futureResponse, 30.minutes) // because we don't want to exit the script before the future has completed | |
futureResponse map { countResponse => | |
val duration = (now() - startedAt) / 1000 | |
println(s"$countResponse documents inserted in $duration seconds") | |
} | |
} | |
} | |
@main | |
def main(elasticEndPoints:String = s"http://127.0.0.1:9201"): Unit = { | |
def envOrPropOrNone(key: String): Option[String] = { | |
envOrNone(key).orElse(propOrNone(key)) | |
} | |
val stockToGenerateCount=3 | |
val names = 'A'.to('Z').combinations(3).map(_.mkString).toList.take(stockToGenerateCount) | |
new Generator(elasticEndPoints).generate( | |
fromDate = OffsetDateTime.parse("2009-01-01T00:00:00.000Z"), | |
toDate = OffsetDateTime.parse("2019-11-24T23:59:59.999Z"), | |
stockNames = names, | |
dailyTransactionsCount = 100 | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment