Skip to content

Instantly share code, notes, and snippets.

@slorber
Created November 13, 2013 13:10
Show Gist options
  • Select an option

  • Save slorber/7448862 to your computer and use it in GitHub Desktop.

Select an option

Save slorber/7448862 to your computer and use it in GitHub Desktop.
Akka actor to watch FS changes
import akka.actor.{ ActorRef, ActorSystem, Props, Actor }
import java.nio.file._
import java.io.File
import scala.concurrent.duration._
import scala.collection.JavaConversions._
import scala.Some
case class WatchDirectory(path: Path,resursive: Boolean = true)
case object DoPoll
object FsNotifType extends Enumeration {
val Create, Update, Delete = Value
}
case class FsNotif(notifType: FsNotifType.Value,path: Path) {
lazy val isDirectory = path.toFile.isDirectory
lazy val isCreation = notifType == FsNotifType.Create
}
object FsNotif {
def apply(watchKey: WatchKey,event: java.nio.file.WatchEvent[_]): FsNotif = {
val fsNotifType = event.kind() match {
case StandardWatchEventKinds.ENTRY_CREATE => FsNotifType.Create
case StandardWatchEventKinds.ENTRY_MODIFY => FsNotifType.Update
case StandardWatchEventKinds.ENTRY_DELETE => FsNotifType.Delete
case _ => throw new IllegalStateException("Unknown FS event kind: " + event)
}
val watchedPath = watchKey.watchable().asInstanceOf[Path] // TODO: doesn't work well when parent directory has been renamed :(
val relativePath = event.context().asInstanceOf[Path]
val absolutePath = watchedPath.resolve(relativePath)
FsNotif(fsNotifType,absolutePath)
}
}
class FsWatcherActor extends Actor {
var watchService = FileSystems.getDefault.newWatchService()
def receive = {
case WatchDirectory(path,recursive) => if ( recursive ) registerPathRecursion(path) else registerPath(path)
case DoPoll => doPoll()
}
// TODO make it @tailrec
def getAllChildDirectories(file: File): Array[File] = {
val childDirectories: Array[File] = file.listFiles().filter(_.isDirectory)
childDirectories ++ childDirectories.flatMap(getAllChildDirectories)
}
def registerPathRecursion(path: Path): Unit = {
val childDirectories = getAllChildDirectories(path.toFile)
val allPathsToRegister = path :: childDirectories.map(_.toPath).toList
allPathsToRegister.foreach(registerPath)
}
def registerPath(path: Path): Unit = {
println(s"Registering path to watch service -> $path")
path.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_DELETE
);
}
def registerIfNewFolder(fsNotif: FsNotif): Unit = if ( fsNotif.isCreation && fsNotif.isDirectory ) registerPathRecursion(fsNotif.path)
def doPoll(): Unit = {
Option(watchService.poll()) match {
case None => Unit
case Some(key) => pollKey(key)
}
}
def pollKey(key: WatchKey): Unit = {
key.pollEvents().toList.foreach { event: WatchEvent[_] =>
val notif = FsNotif(key,event)
registerIfNewFolder(notif)
sender ! notif
}
key.reset()
}
}
class PrintFsNotifActor extends Actor {
def receive = {
case event: FsNotif => println(s"New event: $event")
}
}
object HelloAkkaScala extends App {
val pathToWatch = Paths.get("/home/sebastien/Bureau/myfolder")
val system = ActorSystem("helloakka")
val watcherActor = system.actorOf(Props[FsWatcherActor], "FsWatcherActor")
val printFsEventsActor = system.actorOf(Props[PrintFsNotifActor],"PrintFsNotifActor")
watcherActor.tell( WatchDirectory(pathToWatch), ActorRef.noSender)
system.scheduler.schedule(0.seconds, 1.second, watcherActor, DoPoll)(system.dispatcher, printFsEventsActor)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment