Skip to content

Instantly share code, notes, and snippets.

@iomonad
Created December 27, 2018 09:58
Show Gist options
  • Save iomonad/61925e815bdff7ca71627a4d2c46b92d to your computer and use it in GitHub Desktop.
Save iomonad/61925e815bdff7ca71627a4d2c46b92d to your computer and use it in GitHub Desktop.
/**
* @note SourceShape Graph, runnable as Source.
*/
override def graph: Graph[SourceShape[FileMessage], NotUsed] = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val TCK = b.add(Source.tick[SmartjogClient]
(0.second, interval, client))
val LST: FlowShape[SmartjogClient, FileMessage] = b.add(Flow[SmartjogClient]
.flatMapConcat { client =>
Source.fromFuture(client.listDir(directory.toString))
.completionTimeout(timeout = interval)
}
.map { x =>
x.toList.map(_.getPath)
.filter(e =>
e.toLowerCase.endsWith(".mxf")
|| e.toLowerCase.endsWith(".mov")
|| e.toLowerCase.endsWith(".wav")
|| e.toLowerCase.endsWith(".mpeg"))
}
.scan[(List[String], List[String])]
(zero = (Nil, Nil))((prev, current) => {
/*
* need to memorize current value because scan uses the result as the next input
* */
(current.diff(prev._2), current)
})
.mapConcat(list => list._1)
.map(e => FileMessage(Paths.get(e), rule)))
TCK ~> LST
SourceShape(LST.out)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment