Skip to content

Instantly share code, notes, and snippets.

@valtih1978
Last active September 7, 2017 10:30
Show Gist options
  • Save valtih1978/8f71943d3205bfd15510 to your computer and use it in GitHub Desktop.
Save valtih1978/8f71943d3205bfd15510 to your computer and use it in GitHub Desktop.
import java.lang.reflect.{Method, InvocationHandler, Proxy}
import scala.language.reflectiveCalls, scala.language.postfixOps
import scala.collection.mutable, java.io._, Util._
// I called this version "reduced" because proxies are not GC-ed when not referenced by user
// This makes a code a bit simpler and, probably, even faster if proxies are small indeed.
// The experiment
// timeit "scala -J-Xmx33m ProxyDemo + 16 100 > nul"
// however suggests that this implementation is slow as 144 sec vs 129 sec for managed proxies.
// Tree: The following command now succeeds
// scala -J-Xmx33m ProxyDemo + 10 1m
// It fits into 33 mb of memory despite 2**10 2-mb strings
// take more than 2GB in main memory. In the latest version
// we partition both tree and string values that its nodes
// may have. It would be necessary (10+2) * 2 mb = 28 mb if
// we did not proxify the string values because final printng
// is recurisve and printin thread would not release
// the nodes whose methods are currently under invocation.
// Since depth of the tree is 10 nodes, all 10 nodes would be
// locked in mem. The cache also keeps two objects. However,
// since we know that tree values can be very large, we
// proxified them as well letting them go while thread descends
// into 10 small nodes.
object ProxyDemo extends App {
if (args.length < 3) {
println ("usage: <managed: + or -> <tree depth: int> <node size in unicode kilo chars>")
println ("example: + 3 3M")
println ("It will tree with (2^3)-1 = 7 nodes which will occupy 7 * 3 mln unicode chars = 21 mb")
} else {
val managed = args(0) == "+" //false
val levels = args(1).toInt
val a2 = args(2).toLowerCase ; val a2i = a2.filter { c => c.isDigit }.toInt
val nodeSize = a2i * (?(a2.contains("k"), 1000, 1)) * (?(a2.contains("m"), 1000*1000, 1))
val db = cleandb
println("db = " + db)
//iterate(pairs) ; println("---") ; iterate(pairs)
def gen(level: Int, cnt: Int): (Node, Int) = if (level == 0) (End, cnt) else {
val (left, cnt2) = gen(level - 1, cnt)
val (right, cnt3) = gen(level - 1, cnt2)
println(s"creating node $cnt3 at level $level")
def rnd = Array.fill(nodeSize)('1').mkString("") // real random is too slow
val value = Some(Value(s"$cnt3 $rnd")).map (value =>
if (managed) db.manage(value, classOf[IValue]) else value
).get
val tree = Some(Tree(value, left, right)).map (tree =>
if (managed) db.manage(tree, classOf[ITree]) else tree).get
tree -> (cnt3 + 1)
} ; val (tree, cnt) = gen(levels, 0)
println(s"Generated $cnt nodes. Taking into acccount " +
s"cache of size $cacheSize, we need " +
(cnt + cacheSize) * nodeSize + " of memory to store all the strucutre")
println("---") ; println(tree.str())
db.stop;
}
}
trait IValue // wraps a string into interface to
case class Value(value: String) extends IValue {
override def toString = if (value.length < 20) value else value.hashCode + ""
private def writeObject(oos: ObjectOutputStream) {
//throw new Exception
oos.defaultWriteObject
}
}
trait Node extends Serializable {
def str(indent: String = ""): String = "node"
}
object End extends Node
trait ITree extends Node with ProxifiableField {
def value: IValue
def left: Node ; def right: Node
override def str(indent: String = ""): String = {
s"\n$indent$value " + left.str(indent + " ") +
"," + right.str(indent + " ")
}
}
// We need to intorduce private variable coupled with def value
// for deserialization reasons http://stackoverflow.com/questions/33843884
case class Tree(@transient private var pValue: IValue, @transient private var pLeft: Node, @transient private var pRight: Node) extends ITree {
def value = pValue
def left = pLeft ; def right = pRight
println("created " + this)
def writeFields(out: ObjectOutputStream, db: Db) {
//out.defaultWriteObject() // serializes enclosing object
Seq(value, left, right) foreach (wr(out, db, _))
}
def readFields(in: ObjectInputStream, db: Db) {
//in.defaultReadObject()
def rd[T] = super.rd[T](in, db)
pValue = rd; pLeft = rd ; pRight = rd
}
}
object PersistenceDemo extends App {
def usage {
println ("usage1: <#nodes to generate>")
println ("usage2: load")
}
if (args.length < 1) usage else args(0) match {
case "load" =>
val db = Db(false) // load existing db
println("loaded " + db.byName("root").toString())
case int => val len = args(0).toInt
val db = cleandb
def rnd = scala.util.Random.alphanumeric.take(10).mkString
val tree = (1 to len).foldLeft(End: Node) { case (tail, i) =>
val value = db.manage(Value(i + " " + rnd), classOf[IValue])
db.manage(Tree(value, tail, End), classOf[ITree])
}
val lastTreeId = db.name("root", tree)
println("generated " + tree)
println(" ---------- last tree id = " + lastTreeId)
db.stop
}
}
object MutableDemo extends App {
trait IList {
override def toString = str(2) ; def str(depth: Int): String
def next: IList ; def next_=(next: IList)
}
case class Cons(var value: String, @transient var next: IList) extends IList with ProxifiableField {
// User logic
def str (depth: Int) = {
s"$value->" + ((next == null, depth == 1) match {
case (true, _) => "null"
case (_, true) => "..."
case _ => next.str(depth-1)
})
}
// handling proxifiable fields
def writeFields(out: ObjectOutputStream, db: Db) =
{wr(out, db, value) ; wr(out, db, next)}
def readFields(in: ObjectInputStream, db: Db) =
{value = rd(in, db) ; next = rd(in, db)}
}
def usage {
println ("usage1: gen <ring size> <disaplay length>")
println ("usage1: bug <ring size> <disaplay length>")
println ("usage2: load <display length>")
println ("example: gen 3 20")
}
def gen(useUpdateWrapper: Boolean) = {
val db = pass(new Db(defaultDir, 0, true)){ _.softCache = (_: Object) => null}
def manage(name: String, next: IList) = {
db.manage(Cons(name, next), classOf[IList])
}
val ringLen = args(1).toInt
val end = Cons("end1", null) ; val preEnd = manage("1", end)
val head = (2 to args(1).toInt).foldLeft(preEnd){
case(head, i) => manage(i + " ", head)
}
println("updating preEnd.next = " + preEnd.next + " to head = " + head)
if (!useUpdateWrapper) preEnd.next = head
else db.updating(preEnd){preEnd.next = head}
println("in result, generated " + head.str(args(2).toInt))
db.name("head", head) ; db.restart
val end2 = (1 to ringLen).foldLeft(db.byName[IList]("head")){
case(head, _) => head.next
}
println("end2 = " + end2)
assert(end2.next != null, "BUG! BUG! BUG!\n"
+ "BUG! BUG! BUG! You must forgotten to perform object update"
+ "under updating(object) section and your updates were not"
+ " saved to the disk.")
}
if (args.length < 1) usage else args(0) match {
case "bug" =>gen(false)
case "gen" => gen(true)
case "load" =>
val head: IList = Db(false).byName("head")
println(head.str(args(1) toInt))
case uncom => println("unknown command " + uncom) ; usage
}
def cast[T](o: Object) = o.asInstanceOf[T]
}
class Db(dir: File, var cacheSize: Int, clean: Boolean) {
import java.lang.ref._
println("dir = " + dir)
// We probably do not need this since soft references are employed.
val hardCache = mutable.Queue[Proxy]()
object Dirty extends Enumeration {
type Dirty = Value
val Clean, Updating, Yes = Value
} ; import Dirty._
// I would like to merge the ProxyClass with WeakReference but
// constructor of weakref needs a proxy, which needs proxy class
// for its constructor.
// When proxee != null in constructor, must set drity = Yes and call activate. This will force serialization on eviction.
class ProxyClass (
val dbid: Int, var proxee: Object, // not null proxees must be exacly in the actives
val implements: Class[_],
var dirty: Dirty
) extends InvocationHandler {
// Physical ID might be used to support mutability. When object
// is updated, a new revision is created. In append-only DB,
// physical ID may be the offset of the serialized object
//var physicalDBID: Int
println("creating " + this)
// When new object is created we may evict something from mem
def activate(proxy: Proxy) {
hardCache.dequeueAll(_ eq proxy)
evict(cacheSize) ; assert(proxee != null, "null proxee is not allowed")
hardCache.enqueue(proxy)
}
override def invoke(proxy: Object, method: Method, args: Array[Object]): AnyRef = {
println("calling1 " + this +"."+ method.getName + ", victims = " + victims)
assert(allProxies(dbid) == proxy, "proxy in the weak map mismatches the invoked one")
//assert(wr.get eq proxy, this + s" is accessed through ${System.identityHashCode(proxy)} but is known as ${wr.get} in our weakMap")
proxee = proxee match {
case ref: Reference[Object] => val res = ref.get ; if (res != null) println(s"restored $this from memory") ; res
case a => a
}
if (proxee == null) { println(s"restoring $this from disk")
proxee = usingProxeeFile(dbid) { in =>
in.readUTF(); in.readObject match {
case ts: ProxifiableField => ts.readFields(in, Db.this) ; ts
case o => o // this is conventional object
}
}
}
activate(proxy.asInstanceOf[Proxy])
//println("calling2 " + this +"."+ method.getName)
invokeProxee(method, args)
}
def invokeProxee(method: Method, args: Array[Object]): AnyRef = {
method.invoke(proxee, args: _*)
}
def serializeDirty = if (dirty == Yes) {
println(dbid + " was dirty")
closing(new ObjectOutputStream(new FileOutputStream(fname(dbid))))
{ oos => oos.writeUTF(implements.getName)
oos.writeObject(proxee) ; proxee match {
case ts: ProxifiableField => ts.writeFields(oos, Db.this)
case _ => // this is conventional object
}
}
//serialize(fname(pc.dbid), pc.proxee, Some(f))
dirty = Clean
}
override def toString = (if (proxee == null) "-" else "+") + dbid
}
def evict(threshold: Int) =
while (hardCache.length > threshold) {
val pc = proxyClass(hardCache.dequeue())
val dbid = pc.dbid ; val proxee = pc.proxee
//println("passivating " + dbid)
assert(hardCache map proxyClass forall { _.proxee != null }) // check that all are active
pc.serializeDirty // after serialization, dirty can be clean or updating
pc.proxee = softCache(proxee)
}
// setting softCache(porxee) = null disables the memory cache and forces use of the disk alone, useful for testing
var softCache = (proxee: Object) => new SoftReference(proxee)
def proxyClass(p: Proxy) = Proxy.getInvocationHandler(p).asInstanceOf[ProxyClass]
def victims = hardCache.map(proxyClass(_)).mkString(",")
// includes both active and not-yet-GC'ed proxies
val allProxies: mutable.Map[Int, Object] = mutable.Map()
//import scala.reflect.runtime.universe._
//def getType[T: TypeTag](a: T): Type = typeOf[T]
def manage[T <: AnyRef with Serializable, I](proxee: T, supportedInterfaces: Class[I]) = {
val pc = new ProxyClass(allProxies.size, proxee, supportedInterfaces, Yes) // if we set proxee, we must call pc.activate
val proxy = makeProxy(pc).asInstanceOf[Proxy]
pc.activate(proxy) ; proxy.asInstanceOf[I]
}
//pc must have proxee initialized
def makeProxy(pc: ProxyClass) = {
assert(!allProxies.contains(pc.dbid) , "proxy for " + pc.dbid + " already exists")
pass(Proxy.newProxyInstance( pc.implements.getClassLoader, Array(pc.implements), pc))
{allProxies(pc.dbid) = _}
}
def fname(dbid: Int/*, physicalID: Int*/): File = fname("" + dbid/* + "." + physicalID*/)
def fname(dbid: String) = new File(dir + "/" + dbid)
// names are used for persistence
val names = mutable.Map[String, Int]() ; val fNames = fname("names")
def name(name: String, p: Object) = {
val dbid = proxyClass(p.asInstanceOf[Proxy]).dbid
names(name) = dbid ; println("names = " + names); dbid
}
def byName[T](name: String) = allProxies(names(name)).asInstanceOf[T]
def updating[T](proxy: Object)(code: => T) = {
val pc = proxyClass(proxy.asInstanceOf[Proxy])
pc.dirty = Updating ; try code finally pc.dirty = Yes
}
def flush {
//serialize(fNames, names toList)
closing(new PrintWriter(fNames))( pw => names.foreach {
case (n, dbid) => pw.println(n + " " + dbid)
})
hardCache map proxyClass foreach { pc =>
assert (pc.dirty != Updating)
pc.serializeDirty
}
}
def stop {
println("number of known proxies in db = " + allProxies.size) ;
flush ; hardCache.clear ; names.clear ; allProxies.clear()
// stop any threads if running
}
def start() = {
closing(scala.io.Source.fromFile(fNames))(_.getLines().foreach {
line => val nameval = line split ' ' take 2
names(nameval(0)) = nameval(1) toInt
})
dir.listFiles().foreach{f => if (f.getName != "names") {
val dbid = f.getName.toInt;
usingProxeeFile(dbid) { in =>
val cl = Class.forName(in.readUTF(), false, getClass.getClassLoader)
makeProxy(new ProxyClass(dbid, null, cl, Clean))
}
}}
}
def restart() = { stop ; start }
if (clean) {
dir.mkdir()
dir.listFiles().foreach { f => java.nio.file.Files.deleteIfExists(f.toPath())}
assert(dir.list().length == 0)
stop // this will create names file that we needed for restart
}
start()
def usingProxeeFile[T](dbid: Int)(code: ObjectInputStream => T) = {
closing(new ObjectInputStream(new FileInputStream(fname(dbid)))){code}
}
}
// Every class, whose fields can be proxified, must implement
// this trait.
//todo: use scala marcos obiate the need to call wr(field),
// field = rd(). User could just list the fields to the macros.
// Even better could be if we could collect the transient fields
// ourselves at compilation time.
trait ProxifiableField extends Serializable {
// WrtieFields is called during serialization. User must
// call wr(field) for every @transient-marked field
def writeFields(out: ObjectOutputStream, db: Db)
// ReadFields must call field = rd(in, db) for every transient field
def readFields(in: ObjectInputStream, db: Db)
// auxillary methods
def wr(out: ObjectOutputStream, db: Db, field: AnyRef) =
field match {
case p: Proxy => out.writeBoolean(true) ; out.writeInt(db.proxyClass(p).dbid) // for managed object, serialize object id
case o => out.writeBoolean(false) ; out.writeObject(o) // otherwise serialize normally
}
def rd[T](in: ObjectInputStream, db: Db) = {
val res = (in.readBoolean() match {
case false => in.readObject() // normal object
case true => db.allProxies(in.readInt)
}).asInstanceOf[T]
assert(res != null); res
}
}
object Util {
//def using[Resource <: Closeable, T](resource: Resource)(code: Resource => T) = try code(resource) finally resource.close
def closing[A, B <: {def close(): Unit}] (closeable: B) (f: B => A): A =
try { f(closeable) } finally { closeable.close() }
def pass[T](t: T)(f: T => Unit): T = {f(t) ; t}
def id(obj: AnyRef) = if (obj == null) "null" else obj.getClass.getSimpleName + "" + System.identityHashCode(obj)
val cacheSize = 2
def defaultDir = new File("""C:\Users\valentin\AppData\Local\Temp\zdb_swap7797261140896837624""")
// hold no more than 2 objects in mem
def cleandb = Db(true)
def Db(clean: Boolean) = {
println("cachesize = " + cacheSize)
new Db(defaultDir, cacheSize, clean)
}
}
// This implementaion can run all the application (ProxyDemo, PersistenceDemo and MutabilityDemo)
// from first implementation. However, in contrast with the reduced version, it enables proxies
// to GC, which is supposed to be more efficient in the long run, in case we have really many
// managed objects. Experiment
// timeit "scala -J-Xmx33m ProxyDemo + 16 100 > nul"
// suggests that this is faster indeed.
// Probably this does not make a lot of sense as long as we store each object in separate file
// because maintaining a large folder is more memory demanding than maintaining the proxy list.
// At least I must compare how sooner first version fails than this one.
// This implementaion can run all the application (ProxyDemo, PersistenceDemo and MutabilityDemo)
// from first implementation. However, in contrast with the first DB, it enables to GC the
// proxies themselves, which is supposed to be more efficient.
class Db(dir: File, var cacheSize: Int, clean: Boolean) {
import java.lang.ref._
println("dir = " + dir)
// We probably do not need this since soft references are employed.
val hardCache = mutable.Queue[Proxy]()
// I would like to merge the ProxyClass with WeakReference but
// constructor of weakref needs a proxy, which needs proxy class
// for its constructor.
// When proxee != null in constructor, must set drity = Yes and call activate. This will force serialization on eviction.
class ProxyClass (
val dbid: Int, var proxee: Object, // not null proxees must be exacly in the actives
val implements: Class[_],
var dirty: Boolean) extends InvocationHandler {
// Physical ID might be used to support mutability. When object
// is updated, a new revision is created. In append-only DB,
// physical ID may be the offset of the serialized object
//var physicalDBID: Int
//println("creating " + this)
// When new object is created we may evict something from mem
var updatingReetrance = 0
def activate(proxy: Proxy) {
hardCache.dequeueAll(_ eq proxy)
evict(cacheSize) ; assert(proxee != null, "null proxee is not allowed")
hardCache.enqueue(proxy)
}
override def invoke(proxy: Object, method: Method, args: Array[Object]): AnyRef = {
//println("calling1 " + this +"."+ method.getName + ", victims = " + victims)
assert(weakProxies(dbid).get == proxy, "proxy in the weak map mismatches the invoked one")
//assert(wr.get eq proxy, this + s" is accessed through ${System.identityHashCode(proxy)} but is known as ${wr.get} in our weakMap")
proxee = proxee match {
case ref: Reference[Object] => ref.get //val res = ref.get ; if (res != null) println(s"restored $this from memory") ; res
case a => a
}
if (proxee == null) { //println(s"restoring $this from disk")
proxee = closing(new ObjectInputStream(new FileInputStream(fname(dbid)))){
in => in.readObject match {
case ts: ProxifiableField => ts.readFields(in, Db.this) ; ts
case o => o // this is conventional object
}
}
}
activate(proxy.asInstanceOf[Proxy])
//println("calling2 " + this +"."+ method.getName)
// this cleans up weak proxies. Probably it is too much to make
// the cleanup at every invocation. Consider other places
rqClean // for refqueue.poll, like weak(dbid).get == null means that there are some empty refs in the queue or poll on every 100th request
invokeProxee(method, args)
}
def invokeProxee(method: Method, args: Array[Object]): AnyRef = {
method.invoke(proxee, args: _*)
}
def serializeDirty = if (dirty) {
//println(dbid + " was dirty")
closing(new ObjectOutputStream(new FileOutputStream(fname(dbid))))
{ oos => oos.writeObject(proxee) ; proxee match {
case ts: ProxifiableField => ts.writeFields(oos, Db.this)
case _ => // this is conventional object
}} ; dirty = false
}
override def toString = (if (proxee == null) "-" else "+") + dbid
}
def evict(threshold: Int) =
while (hardCache.length > threshold) {
val pc = proxyClass(hardCache.dequeue())
val dbid = pc.dbid ; val proxee = pc.proxee
//println("passivating " + dbid)
assert(hardCache map proxyClass forall { _.proxee != null }) // check that all are active
pc.serializeDirty // after serialization, dirty can be clean or updating
pc.proxee = softCache(proxee)
}
// setting softCache(porxee) = null disables the memory cache and forces use of the disk alone, useful for testing
var softCache = (proxee: Object) => new SoftReference(proxee)
def proxyClass(p: Proxy) = Proxy.getInvocationHandler(p).asInstanceOf[ProxyClass]
def victims = hardCache.map(proxyClass(_)).mkString(",")
// includes both active and not-yet-GC'ed proxies
val weakProxies: mutable.Map[Int, MyWR] = mutable.Map()
var total: Int = _
//import scala.reflect.runtime.universe._
//def getType[T: TypeTag](a: T): Type = typeOf[T]
def manage[T <: AnyRef with Serializable, I](proxee: T, supportedInterfaces: Class[I]) = {
total += 1 ;
val pc = new ProxyClass(total, proxee, supportedInterfaces, true) // if we set proxee, we must call pc.activate
val proxy = makeProxy(pc).asInstanceOf[Proxy]
pc.activate(proxy) ; proxy.asInstanceOf[I]
}
private val rq = new ReferenceQueue[Proxy]()
@annotation.tailrec final def rqClean {rq.poll match {
case null =>
case top => val dbid = top.asInstanceOf[MyWR].dbid
assert(weakProxies.contains(dbid)) ; val current = weakProxies(dbid)
//println((if (current != top) "a previous copy of " else "") + s"proxy with dbid " + dbid + " was finalized")
if (current == top) weakProxies -= dbid ; rqClean
}
}
//class HardReference[T](val referent: Proxy, rq1: Object) extends SoftReference[Proxy](referent, rq)
class MyWR(val dbid: Int, proxy: Proxy) extends WeakReference[Proxy](proxy, rq)
//pc must have proxee initialized
def makeProxy(pc: ProxyClass) = {
assert(!weakProxies.contains(pc.dbid) || weakProxies(pc.dbid).get == null)
val proxy = Proxy.newProxyInstance(pc.implements.getClassLoader, Array(pc.implements), pc)
weakProxies(pc.dbid) = new MyWR(pc.dbid, proxy.asInstanceOf[Proxy])
proxy
}
def fname(dbid: Int/*, physicalID: Int*/): File = fname("" + dbid/* + "." + physicalID*/)
def fname(dbid: String) = new File(dir + "/" + dbid)
// names are used for persistence
val names = mutable.Map[String, (Int, Class[_])]() ; val fNames = fname("names")
def name(name: String, p: Object) = {
val pc = proxyClass(p.asInstanceOf[Proxy])
val interface = pc.implements // must be in the map coz proxy is referenced by thread
names(name) = (pc.dbid, pc.implements) ; pc.dbid
}
def byName[T](name: String) = {
val (dbid, interface) = names(name)
fromWeak(dbid, interface).asInstanceOf[T]
}
def updating[T](proxy: Object)(code: => T) = {
val pc = proxyClass(proxy.asInstanceOf[Proxy])
pc.updatingReetrance += 1 ; try code finally {
pc.updatingReetrance -= 1
if (pc.updatingReetrance == 0) pc.dirty = true
}
}
def flush {
//serialize(fNames, names toList)
closing(new PrintWriter(fNames))( pw => names.foreach {
case (n, (dbid, interface)) => pw.println(n + " " + dbid + " " + interface.getName)
})
hardCache map proxyClass foreach { pc =>
assert (pc.updatingReetrance == 0 , pc + " dirty = " + pc.dirty + ", updating = " + pc.updatingReetrance)
pc.serializeDirty
}
}
def stop {
rqClean ; println("number of known proxies in db = " + weakProxies.size) ;
flush ; hardCache.clear ; names.clear
// stop any threads if running
}
def start() = {
closing(scala.io.Source.fromFile(fNames))(_.getLines().foreach {
line => val nameval = line split ' ' take 3
names(nameval(0)) = (nameval(1) toInt, class4name(nameval(2)))
})
total = dir.list().length - 1 // one file is for names, other are regular objects
}
def restart() = { stop ; start }
if (clean) {
dir.mkdir()
dir.listFiles().foreach { f => java.nio.file.Files.deleteIfExists(f.toPath())}
assert(dir.list().length == 0)
stop // this will create names file that we needed for restart
}
start()
def fromWeak(dbid: Int, implements: Class[_]) = {
weakProxies.get(dbid) map {_.get} filter(_!= null) getOrElse {
makeProxy(new ProxyClass(dbid, null, implements, false))
}
}
}
// Every class, whose fields can be proxified, must implement
// this trait.
//todo: use scala marcos obiate the need to call wr(field),
// field = rd(). User could just list the fields to the macros.
// Even better could be if we could collect the transient fields
// ourselves at compilation time.
trait ProxifiableField extends Serializable {
// WrtieFields is called during serialization. User must
// call wr(field) for every @transient-marked field
def writeFields(out: ObjectOutputStream, db: Db)
// ReadFields must call field = rd(in, db) for every transient field
def readFields(in: ObjectInputStream, db: Db)
// auxillary methods
def wr(out: ObjectOutputStream, db: Db, field: AnyRef) =
field match {
case p: Proxy => out.writeBoolean(true) // for managed object, serialize object id
val pc = db.proxyClass(p); val dbid = pc.dbid
out.writeInt(dbid) ; out.writeUTF(pc.implements.getName)
case o => out.writeBoolean(false) ; out.writeObject(o) // otherwise serialize normally
}
def rd[T](in: ObjectInputStream, db: Db) = {
val res = (in.readBoolean() match {
case false => in.readObject() // normal object
case true => db.fromWeak(in.readInt, class4name(in.readUTF))
}).asInstanceOf[T]
assert(res != null); res
}
}
object Util {
//def using[Resource <: Closeable, T](resource: Resource)(code: Resource => T) = try code(resource) finally resource.close
def closing[A, B <: {def close(): Unit}] (closeable: B) (f: B => A): A =
try { f(closeable) } finally { closeable.close() }
def pass[T](t: T)(f: T => Unit): T = {f(t) ; t}
def ?[T](sel: Boolean, a: => T, b: => T) = if (sel) a else b
def id(obj: AnyRef) = if (obj == null) "null" else obj.getClass.getSimpleName + "" + System.identityHashCode(obj)
def class4name(name: String) = Class.forName(name, false, getClass.getClassLoader)
val cacheSize = 2
def defaultDir = new File("""C:\Users\valentin\AppData\Local\Temp\zdb_swap7797261140896837624""")
// hold no more than 2 objects in mem
def cleandb = Db(true)
def Db(clean: Boolean) = {
println("cachesize = " + cacheSize)
new Db(defaultDir, cacheSize, clean)
}
}
object Garbage extends App { // demonstrates how to poll weak objects for existence and its interaction with GC at low memory.
import java.lang.ref._
if (args.length < 1) println("usage: <size of objects in mb>\nexample: 100k")
else {
var rq = new ReferenceQueue[Any]()
class MyRef(val id: Int, target: Any) extends WeakReference(target, rq)
val inuse = collection.mutable.Map[Int, MyRef]()
def pass[T](a: T)(f: T => Unit) = {f(a) ; a}
def ?[T](sel: Boolean, a: => T, b: => T) = if (sel) a else b
def parse(arg: String) = { arg.filter(_.isDigit).toInt *
?(arg.contains("k"), 1000, 1) * ?(arg.contains("m"), 1000*1000, 1)
}
def finalize_(acc: Int): Int = rq.poll match {
case top: MyRef => inuse -= top.id ; finalize_(acc + 1)
case null => acc } ;
def arr = Array.ofDim(parse(args(0).toLowerCase()))
def forever(i: Int, variable: List[Object], killer: List[Object]){
inuse(i) = new MyRef(i, arr) ; val finalized = finalize_(0) ; if (finalized > 0)
println(finalized + " finalized(" + inuse.size + " remains in use)")
forever(i+1, ?(variable.length>30, Nil, arr :: variable), arr :: killer) // fill the memory slowly
} ; forever(0, Nil, Nil)
}
}
1. Recursion depth must be not very high because object is strongly referenced while its method is being executed by a thread.
2. "this" must be used with precausion when executing managed object methods. You should not let this reference to live for a long time not to say to store it in managed object. This is a greate limitation which probably invalidates whole my design. But, to my knowledge, children in Zamia did not hold the reference to the parent object (I consireded it as a flaw since occasionally I wanted to know the current path but it seems more runtime-efficient to supply it into the tree traversal proc rather than to store in the DB particularly because same variable may be contained in multiple identical GENERATED blocks.)
This limitation makes me to think that I could take Guenter's apporoacy to proxify the object itself when evicted and check if loaded on any method invocation. The transparency may be achieved through AOP/macros. This may have an advantage of reduced overheads since we do not need additional Proxy objects in memory and can use 'this'.
trait ITree extends Node with ProxifiableField {
override def str(indent: String = ""): String = {
//s"\n$indent$value " + left.str(indent + " ") + " ," + right.str(indent + " ")
print(s"\n$indent$value "); left.str(indent + " ") ; print(" ,") ; right.str(indent + " ") // don't accumulate long string in memory
}
// This Db pretends to overcome two Guenter's ZDB drawbacks:
-- Despite being called so, it is not transparent at all -- user has to check if field is null
on every method invocation.
-- Caching does not exploit the GC mechanism and, as such
- cannot dynamically adjust to avaliable JVM size
- admits identity violations because, despite claimed otherwise, user has direct
access to Java objects and can keep a ref while ZDB evicts it. It can then request
a copy of the object from disk, by object id resulting in two copies in one JVM.
I use proxies to break the graph of objects apart and load parts of the graph automatically,
behind the scenes, user accesses it. Weak refs are employed to maintain proxies in memory until
they are GC'ed. The managed object themselves are evicted into memory by soft references so that
they stay in memory as long as memory is available. The cache of active proxies is used to keep
hard references but it seems that it can be eliminated once we have implemented this Soft-reference
mechanism for proxees.
// Todo:
-2. Use implicits in API like STM does http://www.codecommit.com/blog/scala/software-transactional-memory-in-scala
-1. Look at ActiveObjects http://www.codecommit.com/blog/java/an-easier-java-orm example
0. Eliminate indirection for Long references. We can build the DB as immutable structure
whereas indirection is needed only for mutable structures.
// 1. handle object field updates through update method. We cannot simply mark setters with annotations and
// flag managed object dirty whenever such method called because this not only inefficient
// but is also uneffective. It is more efficient to mark dirty only once all modifications
// are applied, no matter to how many fields. Moreover, proxy can track only access to the
// immediate managed object fields but not accesses to the contained subobjects. This means
// that updates cannot be transparent. The yser must know not only how to partition the graph
// turning some objects into managed but also explicitly call setDirty of managed object
// as required.
// Despite read accesses occur transparently, user must also understand that keeping references
// and making deep recursions he blocks passivated object GC.
// We will need to expand our ProxyClass with physicalID in addition to logicalID.
// Field updates are best demonstrated with a circle and persistence.
// current implementation relies on setDirty but db may passivate and reactivate your object
// many times while you are editing it throug the proxy, rolling back all the updates. Intro
// ducing the update method, wich re-adds the object into db under given id, will not change
// anything. It seems that mutation needs to be done under `updating` function, which would
// set the proxified object state to `updating`, keep a reference to the proxified object
// (pc.proxee would not go to soft ref during passivation). In the end, state is changed to
// dirty, which will cause re-serialization. Flags can be stored in the ProxyClass, to releaf
// user from declaring/implementing the DirtyState.
// Make serializeIfDirty to return isUpdating state to stop proxee => null in the eveict and
// make error in the flush.
// 2. handle other types(Done). Actually, current demo fails at Pair.toStr recursion because
// Pair objects cannot be GC-ed until thread exit their methods. This can be cured by
// creating wrapper for the strings. This needs wrappers for objects other than IPair.
// 3. Compact and automate the proxification using instrumentation by compiler macros/plugins or aspectj wavering
// This has the advantage that objects can probably work as their own proxies, saving memory and,
// for sure, would enable disabling all the management altogether with its memory and computation
// overhead.
// 4(Done). Invent something about "cache identity", to avoid duplicates. The isolation of managed
// objects with proxy wrappers that we have already seems sufficient on the first glance. Why did
// then I captured weak references from memory? Rolled back, see MyWR class and proxee activation in the
// invocation procedure to see what is necessary to come back to weak references in revision 4 https://gist.github.com/valtih1978/8f71943d3205bfd15510/107dfa6c4fba655bd3b7f176597613f894b6adc4
// It is only 10 lines longer.
// User still can take direct ref to some subobject of managed object, let the managed object to
// be passivated, update the subobject meantime and then call managed.setDirty. This will cause
// managed to be activated and written back to disk but what is the point? We will have two copies
// of the subobject. Use have modified one and saves unmodified. This won't fix the identity violation.
// We need all subobjects be immutable or be enclosed, i.e. keep a ref to the parent.
Identity should cover the proxies. If two objects refer the same managed object, proxies must be
the same java object.
// 5(Done) persistence. Just store the `all` index and dbid of some "root" object which can reach all others.
// The fragile part is that user can get direct references to subobjects of managed objects and modify them
// forgetting to setDirty the parent object.
// 6 (Done) Manage all index, unload unreferenced proxies from memory. But how we do so if we track
//references to managed objects rather than their proxies. This means that we might also need
// to get rid of all file. Dear, we could also get rid of the proxies! If we have weak references
// then we do not need the proxies. No, proxies exist for automatic decoupling of the objects.
// If we get rid of proxies, we will have to nullify all object fields prematurely, as soon as
// it is passivated. But, this is done now anyway.
// 7 Evict proxees to mem (Done). Currently, proxee is lost once it is evicted from the cache because we do not keep
// weak ref on it. Probably we can fix this with some another weak map. Soft references
// would be used for proxees in addition to weak for proxies (we cannot use phantoms for
// proxies because we won't be able to retreive alive values).
// 8. Serialize all objects into one large file. It should be faster because opening many files takes
// a lot of time from the file management subsystem and disk rontation point of view especially if
// we
// 8.2(Done) combine mutliple object writes into a single file operation. Hard cache would play the
// role of keeping a number of objects in memory until they appear on disk.
// The beauty of this approach is that isdirty transforms directly into physicalID = -1.
// It will need however to store a map dbdi => physicalId for every managed object. This defeats the
// advantages of proxy eviction. So, we may try it first in the reduced implementation, which holds
// all proxies anyway. Then, we can try direct buffers(longs) or memory-mapped file for the map,
// likewise they maintain offsets here http://www.javacodegeeks.com/2012/12/escaping-the-jvm-heap-for-memory-intensive-applications.html
// 8.1 Since index is going to be sparse, it seems quite suboptimal to use windows paging. It will load whole pages
// whereas only couple of longs will be used. Furthermore, loading almost all pages at startup, when existing DB
// is reopening will flood the system memory with index pages and practice demonstrates that Windows won't succeed
// to copy it into the paging file and will die thrashing. It means that we will need to replace the mmaped index with
// an extendible hash (a cache).
// 8.2 maintain object file length manually. I guess that reading length from raf may access the system, including taking locks.
// This actually caused huge slowdown when soft cache enabled. Probably save the bundle with RAF or FileChannel rather than mem-mapped.
// 9. Concurrent file save. Can we serialize objects concurrently, while user does some operations
// on them? I do not think so because they can be in non-serializable state, not ready for serializa
// tion. It seems that we need to serialize whem user calls update(obj). Both update and add may take serialized
// object as optional artument in case multiple user threads opearte with single db, this may boost
// the performance. On the other hand, once we have implemented update through updating() wrapper
// function, we can block the object when update is started and release when setDirty is reached.
// Serializer won't capture updating objects anyway. Object serialization should start as soon as
// dirty is set and we know that this is done when object is added into db or update has just finished.
// 10 (Done) Store implemented interface together with referencing fields rather than with implementing instance.
// This will eliminate the need to store class names in memory but more importanly will allow to create
// proxies without opening another file because because we do not store all class names in soft memory
// anyway (this may not apply to the first implementation, based on hard proxy list).
// Done: timeit "scala -J-Xmx33m ProxyDemo + 16 100
// demonstrates speedup 129 sec -> 106 sec.
// 11 Combine WeakRef with ProxyClass (Done). Because there are a lot of proxies and weak refs to it and
// its invocation handlers, it is better to minimize the memory consumed by these objects. One way would
// to merge them because less objects => less memory consumed and less GC hassle. Weak ref to
// proxy could handle its invocations also. The problem to merge is however that to create a weak ref
// you need to create a proxy and to create a proxy you need a reference to invocation handler. Probably
// we could do the job using Scala lazy val mechanism. However, I have found that it is possible using
// java reflection, we need to create proxy with fake handler and when it is created, update its "h"
// (handler) field to the weak reference using reflection. The result however seems to be a huge slowdown
// of everything.
// Performance improved considerably after I factored common part
// ru.typeOf[Proxy].declaration(ru.TermName("h")).asTerm
// out of the Utils.setInvHandler() procedure. So, it is reflection
// that makes things so slow.
// But it was not still nearly as good as without the merge
11.2 3. Rename ProxyClass => ProxyHandler in all commits where Handler is merged with WeakReference. It is handler because we can use it to access it as if it is a reference. It is also a handler because it handles invocations for the proxy.
// 12. Compact the DB on every restart. Place all "root" nodes into the index first and then
// incrementally add nodes reached at previous step. It is especially/only useful after extensive
// updates of single-file DB because we will have many new versions and many outdated ones. Keeping
// smaller object file will allow better caching of it.
13. Use physical = -2 for dirty. PhysicalID = -1 => unloaded, Physical = -2 => Dirty.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment