Last active
November 8, 2015 14:50
-
-
Save haoch/ea86d4d76a49648d2c34 to your computer and use it in GitHub Desktop.
Eagle DSL
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
MyApp extends StreamApp{ | |
/** | |
* Scheme Definition | |
*/ | |
stream("hdfsAuditLogStream", | |
datasource = "HDFS", | |
attributes = { | |
"path" -> string("Path","HDFS Path"), | |
"size" -> long("Size","HDFS Path Size") | |
} | |
) | |
stream("someOtherStream",resource="schema.json") | |
/** | |
* Configuration | |
*/ | |
val strtConfVal = get[String]("config.key") | |
val boolConfVal = get[Bool]("config.key") | |
set("config.key","config.value") | |
/** | |
* Schema Processing DAG | |
* | |
* Not need to execute manually | |
*/ | |
stream("hdfsAuditLogStream") | |
.from(KafakaSourceProvider()) | |
.flatMap(parserFunc) | |
.alert(); | |
} | |
/** | |
* Stream schema inline | |
*/ | |
SampleApp1 extends StreamApp{ | |
stream("hdfsAuditLogStream", | |
datasource = "HDFS", | |
attributes = {"path" -> string("Path","HDFS Path"),"size" -> long("Size","HDFS Path Size")}) | |
.from(KafakaSourceProvider()) | |
.flatMap(parserFunc) | |
.alert(); | |
} | |
/** | |
* Stream schema from db | |
*/ | |
SampleApp2 extends StreamApp{ | |
stream("hdfsAuditLogStream") | |
.from(KafakaSourceProvider()) | |
.flatMap(parserFunc) | |
.alert(); | |
} | |
/** | |
* Inline policy in alert | |
*/ | |
SampleApp3 extends StreamApp { | |
val sender=get[String]("eagle.alert.sender") | |
val recipient=get[String]("eagle.alert.recipient") | |
stream("hdfsAuditLogStream") | |
.from(KafakaSourceProvider()) | |
.flatMap(parserFunc) | |
.alert("""from hdfsAuditLogEventStream[(src == '/tmp/')]#window.externalTime(timestamp,10 min) select count(timestamp) as aggValue having aggValue >= 2 insert into outputStream;""""),sender=sender,recipient=recipient) | |
} | |
/** | |
* Use siddhi for general process like transformation, filter, aggreation and so on | |
*/ | |
SampleApp4 extends StreamApp { | |
val sender=get[String]("eagle.alert.sender") | |
val recipient=get[String]("eagle.alert.recipient") | |
stream("finalAlertStream",resource="finalAlertStream.json") | |
stream("hdfsAuditLogStream") | |
.from(KafakaSourceProvider()) | |
.flatMap(parserFunc) | |
.process("from hdfsAuditLogEventStream[(user == 'somebody')]#window.externalTime(timestamp,10 min) select count(timestamp) as aggValue having aggValue >= 2 insert into hdfsAlertStream;") | |
.process("from hdfsAlertStream[(user == 'somebody')]#window.externalTime(timestamp,10 min) select count(timestamp) as aggValue having aggValue >= 2 insert into finalAlertStream;") | |
.alert() | |
} | |
/** | |
* Multi Alert Join | |
*/ | |
SampleApp5 extends StreamApp { | |
val sender=get[String]("eagle.alert.sender") | |
val recipient=get[String]("eagle.alert.recipient") | |
val hdfsAlert=stream("hdfsAuditLogStream") | |
.from(KafakaSourceProvider()) | |
.flatMap(parserFunc) | |
.alert("""from hdfsAuditLogEventStream[(user == 'somebody')]#window.externalTime(timestamp,10 min) select count(timestamp) as aggValue having aggValue >= 2 insert into hdfsAlertStream;""""),sender=sender,recipient=recipient) | |
val hiveAlert=stream("hiveLogStream") | |
.from(KafakaSourceProvider()) | |
.flatMap(parserFunc) | |
.alert("""from hiveLogStream[(user == 'somebody')]#window.externalTime(timestamp,10 min) select count(timestamp) as aggValue having aggValue >= 2 insert into hiveAlertStream;""""),sender=sender,recipient=recipient) | |
hdfsAlert.innerJoinByWindow(hiveAlert, "hdfsAlertStream.user == hiveAlertStream.user",window) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment