File backed implementation of ScalaCache Cache
import java.nio.ByteBuffer
import java.nio.channels.{AsynchronousFileChannel, CompletionHandler, FileLock}
import java.nio.file.StandardOpenOption.{CREATE, READ, WRITE}
import java.nio.file._
import java.util.concurrent.{Executors, TimeUnit}
import scala.compat.java8.FunctionConverters.asJavaConsumer
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
import scalacache.Cache
import scalacache.serialization.Codec
class FileCache(prefix: String) extends Cache[Array[Byte]] {
private val scheduler = Executors.newScheduledThreadPool(2)
private val ec = ExecutionContext.fromExecutor(scheduler)
override def get[V](key: String)(implicit codec: Codec[V, Array[Byte]]): Future[Option[V]] = {
Future(, READ))(ec).flatMap{
channel =>
readFile(codec, channel)
}.recover { case _: NoSuchFileException => None }
* Note: At it stands, if the process is restarted, entries with a TTL won't get deleted :(
* Future work: maybe store TTL and creation time disk, then remove during get... Or remove on close
override def put[V](key: String, value: V, ttl: Option[Duration])(implicit codec: Codec[V, Array[Byte]]): Future[Unit] = {
val res = writeFile(key, value, codec)
ttl.foreach { ttl =>
val delete = runnable {
val location = path(key)
).recover { case e => e.printStackTrace() }
scheduler.schedule(delete, ttl.toMillis, TimeUnit.MILLISECONDS)
override def remove(key: String): Future[Unit] = {
Future(Files.delete(path(key)))(ec).recover{ case _ => () }
override def removeAll(): Future[Unit] = {
val completion = Promise[Unit]()
def deleteAll = {
asJavaConsumer(item =>
val deleteAndComplete = runnable {
override def close(): Unit = {
scheduler.awaitTermination(1, TimeUnit.MINUTES)
private def path(key: String) = Paths.get(s"$prefix/$key")
private def readFile[V](codec: Codec[V, Array[Byte]], channel: AsynchronousFileChannel): Future[V] = {
for {
fileSize <- Future.successful(channel.size().toInt) //Will fail if file is larger than 2.147 gigabytes
buffer = ByteBuffer.allocate(fileSize)
_ <- read(channel, buffer)
bytes = buffer.array()
value = codec.deserialize(bytes)
} yield value
private def lock[V](channel: AsynchronousFileChannel) = {
val lock = Promise[FileLock]()
channel.lock((), completionHandler(lock))
private def read(channel: AsynchronousFileChannel, buffer: ByteBuffer): Future[Integer] = {
val completion = Promise[Integer](), 0, (), completionHandler(completion))
private def writeFile[V](key: String, value: V, codec: Codec[V, Array[Byte]]) = {
val completion = Promise[Integer]()
val channel =, CREATE, WRITE)
val bytes = codec.serialize(value)
val buffer = ByteBuffer.wrap(bytes)
val handler = completionHandler(completion)
channel.write(buffer, 0, (), handler) { _ =>
private def completionHandler[V](promise: Promise[V]) = {
new CompletionHandler[V, Unit] {
def failed(exc: Throwable, attachment: Unit): Unit = promise.failure(exc)
def completed(result: V, attachment: Unit): Unit = promise.success(result)
private def runnable(f: => Unit) = new Runnable {
def run(): Unit = f
import java.nio.file.Files
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
import org.scalatest.{FunSuite, Matchers}
import scala.collection.JavaConverters._
import scalacache.AnyRefBinaryCodec
import scala.concurrent.duration._
class FileCacheSpec extends FunSuite with Matchers with ScalaFutures with Eventually with IntegrationPatience{
test("remove") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
files shouldBe empty
cache.put("a", 1, None)
cache.put("b", 2, None)
eventually(timeout(1.second)) {
files.size shouldBe 2
files.size shouldBe 2
files.size shouldBe 1
test("put without ttl") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
files shouldBe empty
cache.put("someKey", 1, None).futureValue shouldBe (())
files.size shouldBe 1
test("put with ttl") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
files shouldBe empty
val write = System.currentTimeMillis()
cache.put("someKey", 42, Option(300.millis)).futureValue shouldBe (())
val written = System.currentTimeMillis()
files.size shouldBe 1
files.size shouldBe 0
val timeLived = System.currentTimeMillis() - write
timeLived shouldBe > (300L)
timeLived shouldBe < (600L)
test("removeAll") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
files shouldBe empty
cache.put("a", 1, None)
cache.put("b", 2, None)
cache.put("c", 3, None)
cache.put("d", 4, None)
eventually(timeout(1.second)) {
files.size shouldBe 4
eventually(timeout(1.second)) {
files.size shouldBe 4
test("get") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
cache.get[Int]("someKey").futureValue shouldBe None
cache.put("someKey", 42, None).futureValue
cache.get[Int]("someKey").futureValue shouldBe Some(42)
cache.get[String]("someKey").failed.futureValue shouldBe a [Throwable]
test("close") {
val cacheDir = Files.createTempDirectory("FileCacheSpec").toAbsolutePath
val cache = new FileCache(cacheDir.toString)
def files = Files.list(cacheDir).iterator().asScala.toList
//Queue up some async actions
cache.put("a", 1, None)
cache.put("b", 2, None)
cache.put("c", 3, None)
cache.put("d", 4, None)
//Close should return after removeAll is processed
files shouldBe empty
