Created
December 27, 2018 09:58
-
-
Save iomonad/61925e815bdff7ca71627a4d2c46b92d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @note SourceShape Graph, runnable as Source. | |
*/ | |
override def graph: Graph[SourceShape[FileMessage], NotUsed] = GraphDSL.create() { implicit b => | |
import GraphDSL.Implicits._ | |
val TCK = b.add(Source.tick[SmartjogClient] | |
(0.second, interval, client)) | |
val LST: FlowShape[SmartjogClient, FileMessage] = b.add(Flow[SmartjogClient] | |
.flatMapConcat { client => | |
Source.fromFuture(client.listDir(directory.toString)) | |
.completionTimeout(timeout = interval) | |
} | |
.map { x => | |
x.toList.map(_.getPath) | |
.filter(e => | |
e.toLowerCase.endsWith(".mxf") | |
|| e.toLowerCase.endsWith(".mov") | |
|| e.toLowerCase.endsWith(".wav") | |
|| e.toLowerCase.endsWith(".mpeg")) | |
} | |
.scan[(List[String], List[String])] | |
(zero = (Nil, Nil))((prev, current) => { | |
/* | |
* need to memorize current value because scan uses the result as the next input | |
* */ | |
(current.diff(prev._2), current) | |
}) | |
.mapConcat(list => list._1) | |
.map(e => FileMessage(Paths.get(e), rule))) | |
TCK ~> LST | |
SourceShape(LST.out) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment