Created
November 26, 2012 19:48
-
-
Save arosien/4150216 to your computer and use it in GitHub Desktop.
ymmv scala zk dsl for associating domain models with zk paths and serialization formats
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class WithZk(path: String, client: ZooKeeperClient) { | |
import WithZk._ | |
import scala.util.control.Exception._ | |
lazy val dir = path.split('/').reverse.tail.reverse.mkString("/") | |
private val catcher = catching(classOf[KeeperException]) | |
def reader[A: Reads] = new Reader[A] | |
def writer[A: Writes](value: A) = new Writer[A](value) | |
def deleter = new Deleter | |
def get[A: Reads]: R[A] = reader[A].get | |
def getChildren[A: Reads, B]: R[Seq[Path[B]]] = reader[A].getChildren[B] | |
def toOption[A: Reads]: Option[A] = reader[A].toOption | |
def getOrElse[A: Reads](a: => A): A = reader[A].getOrElse(a) | |
def watch[A: Reads](updated: Option[R[A]] => Unit) = reader[A].watch(updated) | |
def watchChildren[A: Reads](updated: A => Unit, removed: A => Unit) = reader[A].watchChildren(updated, removed) | |
def createDir = catcher.either(client.createPath(dir)) | |
def create[A: Writes](value: A, mode: CreateMode) = writer[A](value).create(mode) | |
def set[A: Writes](value: A) = writer[A](value).set | |
def createAndSet[A: Writes](value: A, mode: CreateMode) = writer[A](value).createAndSet(mode) | |
def delete = deleter.delete() | |
type R[A] = Either[Throwable, A] | |
class Reader[A: Reads] { | |
def parent = WithZk(dir, client) | |
def get: R[A] = for { | |
data <- catcher.either(client.get(path)) | |
r <- implicitly[Reads[A]].reads(data) | |
} yield r | |
def getChildren[B]: R[Seq[Path[B]]] = { | |
catcher.either(client.getChildren(path)) | |
.map(s => s.map(childPathSuffix => Path[B](path + "/" + childPathSuffix))) | |
} | |
def toOption: Option[A] = get.right.toOption | |
def getOrElse[B >: A](b: => B): B = get.right.toOption.getOrElse(b) | |
def watch(updated: Option[R[A]] => Unit) = client.watchNode(path, { data: Option[Array[Byte]] => | |
updated(data.map(implicitly[Reads[A]].reads(_))) | |
}) | |
def watchChildren[B: Reads](updated: B => Unit, removed: B => Unit) = { | |
import scala.collection.mutable | |
val children = mutable.Map[String, B]() | |
val updates = children.empty | |
def notifier(childPath: String) { | |
children.synchronized { | |
children.get(childPath).fold( | |
update => { updates(childPath) = update; updated(update) }, | |
removed(updates.remove(childPath).get)) | |
} | |
} | |
def f(b: Array[Byte]): B = implicitly[Reads[B]].reads(b).fold(e => throw e, identity) | |
client.watchChildrenWithData[B](path, children, f(_), notifier(_)) | |
} | |
} | |
class Writer[A: Writes](value: A) { | |
def parent = WithZk(dir, client) | |
def createDir = WithZk.this.createDir | |
def create(mode: CreateMode) = catcher.either(client.create(path, implicitly[Writes[A]].writes(value), mode)) | |
def set = catcher.either(client.set(path, implicitly[Writes[A]].writes(value))) | |
/** | |
* Sets value, creating path if it does not exist | |
*/ | |
def createAndSet(mode: CreateMode): R[String] = { | |
for { | |
dirCreated <- createDir | |
noNode <- set.map(_ => path).left | |
created <- create(mode) | |
} yield path | |
} | |
} | |
class Deleter { | |
def delete(): R[Unit] = catcher.either(client.delete(path)) | |
} | |
} | |
/** | |
* Yet another ZooKeeper DSL: | |
* {{{ | |
* import WithZk._ | |
* | |
* val client: ZooKeeperClient | |
* | |
* // Read a String from a known path: | |
* client.atPath("/foo").reader[String].get | |
* | |
* // Write a String to a known path: | |
* client.atPath("/foo").writer("bar").createAndSet(CreateMode.PERSISTENT) | |
* | |
* // If an object of type A can be implicitly converted to a Path[A] then you can: | |
* | |
* case class Foo(bar: String, baz: String) | |
* implicit def fooToPath(foo: Foo): Path[Foo] = Path("/foo/%s".format(foo.bar)) | |
* | |
* val foo = Foo("bar", "baz") | |
* client.atPathFor(foo).writer(foo.baz).set // Zk node /foo/bar = baz | |
* client.atPathFor(foo).reader[Foo].get // Foo("bar", "baz") | |
* }}} | |
*/ | |
object WithZk { | |
case class Path[+A](value: String) | |
trait Reads[A] { | |
def reads(in: Array[Byte]): Either[Throwable, A] | |
} | |
trait Writes[A] { | |
def writes(value: A): Array[Byte] | |
} | |
trait Format[A] extends Reads[A] with Writes[A] | |
implicit object StringFormat extends Format[String] { | |
def reads(in: Array[Byte]): Either[Throwable, String] = utf8Encoder.decode(in).right | |
def writes(value: String): Array[Byte] = utf8Encoder.encode(value) | |
} | |
def withZk(path: String, client: ZooKeeperClient) = WithZk(path, client) | |
/* | |
* implicit def meh[A <% B](a: A): C means | |
* if A can be implicitly converted to a B, then convert B to a C. | |
*/ | |
case class AtPath(client: ZooKeeperClient) { | |
def atPath(path: String) = withZk(path, client) | |
def atPath[A](path: Path[A]) = withZk(path.value, client) | |
def atPathFor[A <% Path[A]](path: A) = withZk(path.value, client) | |
} | |
implicit def clientToWithZk(client: ZooKeeperClient) = AtPath(client) | |
def pathOf[A <% Path[A]](value: A): Path[A] = value | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment