Last active
April 2, 2023 10:10
-
-
Save dacr/c56a579cca9daf3290ee2aae4f347f3c to your computer and use it in GitHub Desktop.
Drools banking example application wired to a kafka topic / published by https://github.com/dacr/code-examples-manager #c7c25a3a-9167-4950-b390-42187623e086/851299fb2f8e81747800934dfbe574765040e6ec
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Drools banking example application wired to a kafka topic | |
// keywords : scala, drools, mvel, ai, knowledgebase, kafka, cep, complex-event-processing | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : c7c25a3a-9167-4950-b390-42187623e086 | |
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc' | |
// created-on : 2019-10-03T08:46:39+02:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
/* | |
PLEASE FIRST START A REPL SESSION AND EVAL THE CONTENT OF | |
the "drools-kb-banking-events-stream-generator.sc" SCRIPT | |
to start an embedded kafka server and get simple way to generate events | |
(See : https://gist.github.com/dacr/da70d346f0e477c1a6e78bc15405cfd3) | |
then quickly makes such call on THE REPL : | |
Generator.operate("send", "aaa-bbb", "bbb-ddd", "Give me cash", 32) | |
And see the effect on this script | |
*/ | |
import $ivy.`fr.janalyse::drools-scripting:1.0.11` | |
import $ivy.`org.apache.kafka::kafka:2.6.0` | |
import fr.janalyse.droolscripting._ | |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
val drl = | |
"""package banking | |
| | |
|global org.slf4j.Logger logger | |
| | |
|declare OperationRequest @role(event) @timestamp(timestamp) @expires(10s) | |
| timestamp: java.util.Date | |
| operationId: String | |
| fromAccountId: String | |
| toAccountId: String | |
| label:String | |
| amount: double | |
|end | |
| | |
|declare Account | |
| id:String | |
| balance:double | |
|end | |
|// ------------------------------------------------------------------------------- | |
|rule "init" when then | |
| insert(new Account("aaa-bbb", 100000)); | |
| insert(new Account("bbb-ddd", 100000)); | |
|end | |
|// ------------------------------------------------------------------------------- | |
|rule "operation received" when | |
| OperationRequest($timestamp:timestamp, $label:label) | |
|then | |
| logger.info($timestamp+" "+$label); | |
|end | |
|// ------------------------------------------------------------------------------- | |
|rule "two many operations in a short period of time" | |
|when | |
| $first:OperationRequest($from:fromAccountId, $to:toAccountId) | |
| $second:OperationRequest(this after[0,10s] $first, this != $first, fromAccountId == $from) | |
|then | |
| logger.warn("OPERATION CANCELLED - TOO NEAR FROM PREVIOUS ONE"); | |
| delete($second) | |
|end | |
|// ------------------------------------------------------------------------------- | |
|rule "update balance" no-loop when | |
| $operation: OperationRequest( | |
| $fromId:fromAccountId, | |
| $toId:toAccountId, | |
| $amount:amount | |
| ) | |
| $from:Account(id == $fromId, $fromBalance:balance, $fromBalance-$amount>0) | |
| $to:Account(id == $toId, $toBalance:balance) | |
|then | |
| modify($from) { | |
| setBalance($fromBalance-$amount); | |
| } | |
| modify($to) { | |
| setBalance($toBalance+$amount); | |
| } | |
|end | |
|// ------------------------------------------------------------------------------- | |
|""".stripMargin | |
val config = DroolsEngineConfig( | |
withDroolsLogging = true, | |
equalsWithIdentity = false, | |
pseudoClock = false | |
) | |
val engine = DroolsEngine(drl,config) | |
val props = new java.util.Properties() | |
props.put("bootstrap.servers", "127.0.0.1:4242") | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("auto.offset.reset", "latest") // latest message; versus "earliest" | |
props.put("group.id", "consumer-group") | |
import scala.jdk.CollectionConverters._ | |
import java.time.Duration | |
val consumer = new KafkaConsumer[String, String](props) | |
consumer.subscribe(List("banking-operations").asJavaCollection) | |
while (true) { | |
val record = consumer.poll(Duration.ofSeconds(1)).asScala | |
for (data <- record.iterator) { | |
val json = data.value() | |
engine.insertJson(json, "banking.OperationRequest") | |
} | |
engine.fireAllRules() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment