Last active
April 2, 2023 10:10
-
-
Save dacr/89c2d24d228c955b2da308fc29dd6037 to your computer and use it in GitHub Desktop.
Drools banking skeleton application wired to a kafka topic / published by https://github.com/dacr/code-examples-manager #cf0d98c9-9383-4297-b6f3-ff1fed217e23/32aa5e00a6df1ec847a4d36786e6a0e92b4a8dcb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Drools banking skeleton application wired to a kafka topic | |
// keywords : scala, drools, mvel, ai, knowledgebase, kafka, cep, complex-event-processing | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : cf0d98c9-9383-4297-b6f3-ff1fed217e23 | |
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc' | |
// created-on : 2019-10-03T08:46:39+02:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
/* | |
PLEASE FIRST START A REPL SESSION AND EVAL THE CONTENT OF | |
the "drools-kb-banking-events-stream-generator.sc" SCRIPT | |
to start an embedded kafka server and get simple way to generate events | |
(See : https://gist.github.com/dacr/da70d346f0e477c1a6e78bc15405cfd3) | |
then quickly makes such call on THE REPL : | |
Generator.operate("send", "aaa-bbb", "bbb-ddd", "Give me cash", 32) | |
And see the effect on this script | |
*/ | |
import $ivy.`fr.janalyse::drools-scripting:1.0.11` | |
import $ivy.`org.apache.kafka::kafka:2.6.0` | |
import fr.janalyse.droolscripting._ | |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
val drl = | |
"""package banking | |
| | |
|declare OperationRequest @role(event) @timestamp(timestamp) | |
| timestamp: java.util.Date | |
| operationId: String | |
| fromAccountId: String | |
| toAccountId: String | |
| label:String | |
| amount: double | |
|end | |
| | |
|rule "Log events" | |
|when | |
| OperationRequest($label:label, $amount:amount) | |
|then | |
| System.out.println(""+$label+" "+$amount); | |
|end | |
| | |
|""".stripMargin | |
val config = DroolsEngineConfig( | |
withDroolsLogging = false, | |
equalsWithIdentity = false, | |
pseudoClock = false | |
) | |
val engine = DroolsEngine(drl,config) | |
val props = new java.util.Properties() | |
props.put("bootstrap.servers", "127.0.0.1:4242") | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("auto.offset.reset", "latest") // latest message; versus "earliest" | |
props.put("group.id", "consumer-group") | |
import scala.jdk.CollectionConverters._ | |
import java.time.Duration | |
val consumer = new KafkaConsumer[String, String](props) | |
consumer.subscribe(List("banking-operations").asJavaCollection) | |
while (true) { | |
val record = consumer.poll(Duration.ofSeconds(1)).asScala | |
for (data <- record.iterator) { | |
val json = data.value() | |
engine.insertJson(json, "banking.OperationRequest") | |
} | |
engine.fireAllRules() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment