Created
April 7, 2009 14:30
-
-
Save Arachnid/91254 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import StringIO | |
from google.appengine.ext import db | |
BLOCK_NAME_FMT = '#%.8X' | |
class Inode(db.Model): | |
DEFAULT_BLOCK_SIZE = 524288 | |
size = db.IntegerProperty(required=True, default=0) | |
block_size = db.IntegerProperty(required=True, default=DEFAULT_BLOCK_SIZE) | |
reference_count = db.IntegerProperty(required=True, default=0) | |
created = db.DateTimeProperty(required=True, auto_now_add=True) | |
modified = db.DateTimeProperty(required=True, auto_now=True) | |
@classmethod | |
def kind(cls): | |
return "DatastoreFile.Inode" | |
def get_block_key(self, block_num): | |
return db.Key.from_path(DataBlock.kind(), | |
BLOCK_NAME_FMT % block_num, parent=self.key()) | |
def get_block(self, block_num): | |
return DataBlock.get(self.get_block_key(block_num)) | |
def open(self, *args, **kwargs): | |
if not self.is_saved(): | |
raise db.NotSavedError() | |
return DatastoreFile(self, *args, **kwargs) | |
class DataBlock(db.Model): | |
data = db.BlobProperty() | |
@classmethod | |
def kind(cls): | |
return "DatastoreFile.DataBlock" | |
class DatastoreFile(object): | |
def __init__(self, inode, mode="r", max_uncommitted=None): | |
self._inode = inode | |
self._mode = mode | |
self._pos = 0 | |
if max_uncommitted is None: | |
self._max_uncommitted = self._inode.block_size | |
elif max_uncommitted > self._inode.block_size: | |
raise ValueError("max_uncommitted must be <= block size") | |
self._current = None | |
self._dirty = False | |
self._previous = None | |
def close(self): | |
self.flush() | |
self._inode = None | |
def _flush_blocks(self, blocks): | |
blocknums, blockdata = zip(*blocks) | |
keys = [self._inode.get_block_key(x) for x in blocknums] | |
keys.append(self._inode.key()) | |
def _do_flush(): | |
entities = db.get(keys) | |
inode = entities.pop() | |
for i in range(len(entities)): | |
if entities[i] is None: | |
block_name = BLOCK_NAME_FMT % blocknums[i] | |
entities[i] = DataBlock(key_name=block_name, parent=inode) | |
entities[i].data = blockdata[i].getvalue() | |
if self._pos > inode.size: | |
inode.size = self._pos | |
entities.append(inode) | |
db.put(entities) | |
return inode | |
new_inode = db.run_in_transaction(_do_flush) | |
self._inode.size = new_inode.size | |
self._inode.modified = new_inode.modified | |
def flush(self): | |
if self._current: | |
block_num = self._pos / self._inode.block_size | |
to_flush = [] | |
if self._previous: | |
to_flush.append((block_num - 1, self._previous)) | |
if self._dirty: | |
to_flush.append((block_num, self._current)) | |
if to_flush: | |
self._flush_blocks(to_flush) | |
self._previous = None | |
self._dirty = False | |
def _load_current(self): | |
if not self._current: | |
block_num = self._pos / self._inode.block_size | |
block = self._inode.get_block(block_num) | |
if block: | |
self._current = StringIO.StringIO(block.data) | |
self._current.seek(self._pos - block_num * self._inode.block_size) | |
else: | |
this_block_size = min(self._pos - block_num * self._inode.block_size, | |
self._inode.block_size) | |
self._current = StringIO.StringIO('\0' * this_block_size) | |
def _advance(self): | |
if self._previous: | |
# Flush previous block, and current if it's dirty | |
# We don't simply call flush() because at this point, the state of | |
# _current and _previous isn't yet updated to reflect _pos. | |
block_num = (self._pos - 1) / self._inode.block_size | |
to_flush = [(block_num - 1, self._previous)] | |
if self._dirty: | |
to_flush.append((block_num, self._current)) | |
self._flush_blocks(to_flush) | |
self._previous = None | |
self._dirty = False | |
elif self._dirty: | |
self._previous = self._current | |
self._dirty = False | |
assert self._pos % self._inode.block_size == 0 | |
self._current = None | |
def next(self): | |
return self.readline() | |
def read(self, size=None): | |
self._load_current() | |
ret = part = self._current.read(size) | |
self._pos += len(part) | |
if size: | |
size -= len(part) | |
while self._pos < self._inode.size and (size is None or size > 0): | |
if part == "": | |
self._advance() | |
self._load_current() | |
part = self._current.read(size) | |
self._pos += len(part) | |
ret += part | |
if size: | |
size -= len(part) | |
return ret | |
def readline(self, size=None): | |
self._load_current() | |
ret = part = self._current.readline(size) | |
self._pos += len(part) | |
if size: | |
size -= len(part) | |
while (self._pos < self._inode.size | |
and (size is None or size > 0) | |
and ret[-1] != "\n"): | |
if part == "": | |
self._advance() | |
self._load_current() | |
part = self._current.readline(size) | |
self._pos += len(part) | |
ret += part | |
if size: | |
size -= len(part) | |
return ret | |
def readlines(self, sizehint=None): | |
line = self.readline(sizehint) | |
ret = [line] | |
if sizehint: | |
sizehint -= len(line) | |
while line and (sizehint is None or sizehint > 0): | |
line = self.readline() | |
ret.append(line) | |
if sizehint: | |
sizehint -= len(line) | |
return ret | |
def xreadlines(self): | |
return self | |
def seek(self, offset, whence=0): | |
self.flush() | |
block_num = self._pos / self._inode.block_size | |
if whence == 0: # os.SEEK_SET | |
self._pos = offset | |
elif whence == 1: # os.SEEK_CUR | |
self._pos += offset | |
elif whence == 2: # os.SEEK_END | |
self._pos = self._inode.size - offset | |
new_block_num = self._pos / self._inode.block_size | |
if block_num == new_block_num and self._current: | |
self._current.seek(self._pos - block_num * self._inode.block_size, 0) | |
else: | |
self._current = None | |
def tell(self): | |
return self._pos | |
def truncate(self, size=None): | |
raise NotImplementedError() | |
def write(self, str): | |
block_num = self._pos / self._inode.block_size | |
block_end = (block_num + 1) * self._inode.block_size | |
bytes_left = block_end - self._pos | |
part, str = str[:bytes_left], str[bytes_left:] | |
self._load_current() | |
self._current.write(part) | |
self._dirty = True | |
self._pos += len(part) | |
while str: | |
part, str = str[:bytes_left], str[bytes_left:] | |
self._advance() | |
self._load_current() | |
self._current.write(part) | |
self._dirty = True | |
self._pos += len(part) | |
def writelines(self, sequence): | |
for line in sequence: | |
self.write(line) | |
def __iter__(self): | |
return self | |
def __del__(self): | |
self.close() | |
@property | |
def closed(self): | |
return self._inode is None | |
@property | |
def mode(self): | |
return self._mode |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import datastorefile | |
import unittest | |
from google.appengine.api import apiproxy_stub_map | |
from google.appengine.api import datastore_file_stub | |
from google.appengine.ext import db | |
class DatastoreFileTest(unittest.TestCase): | |
def setUp(self): | |
os.environ['APPLICATION_ID'] = 'test' | |
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() | |
datastore = datastore_file_stub.DatastoreFileStub('test', None, None) | |
apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', datastore) | |
def testCreateFile(self): | |
test_string = "Hello, world!" | |
file = datastorefile.Inode() | |
file.put() | |
fh = file.open() | |
fh.write(test_string) | |
self.failUnlessEqual(fh._pos, len(test_string)) | |
fh.seek(0) | |
self.failUnlessEqual(fh.read(len(test_string)), test_string) | |
self.failUnlessEqual(fh._pos, len(test_string)) | |
fh.close() | |
self.failUnlessEqual(file.size, len(test_string)) | |
self.failIfEqual(file.created, file.modified) | |
blocks = datastorefile.DataBlock.all().ancestor(file).fetch(100) | |
self.failUnlessEqual(len(blocks), 1) | |
self.failUnlessEqual(blocks[0].key().name(), '#00000000') | |
self.failUnlessEqual(blocks[0].data, test_string) | |
# Reopen the file and check it still has the same contents | |
fh = file.open() | |
fh.seek(1) | |
self.failUnlessEqual(fh.read(len(test_string)), test_string[1:]) | |
fh.close() | |
def testMultipleBlocks(self): | |
file = datastorefile.Inode(block_size=32) | |
file.put() | |
fh = file.open() | |
fh.write('x'*16) | |
fh.flush() | |
self.failUnlessEqual(fh._dirty, False) | |
fh.write('y'*32) | |
self.failIfEqual(fh._previous, None) | |
self.failUnlessEqual(fh._previous.getvalue(), 'x'*16+'y'*16) | |
self.failUnlessEqual(fh._dirty, True) | |
self.failUnlessEqual(fh._pos, 48) | |
fh.write('z'*32) | |
self.failUnlessEqual(fh._previous, None) | |
self.failUnlessEqual(fh._dirty, True) | |
self.failUnlessEqual(fh._pos, 80) | |
fh.close() | |
self.failUnlessEqual(fh._previous, None) | |
self.failUnlessEqual(fh._dirty, False) | |
blocks = datastorefile.DataBlock.all().ancestor(file).fetch(100) | |
self.failUnlessEqual(len(blocks), 3) | |
self.failUnlessEqual(blocks[0].key().name(), '#00000000') | |
self.failUnlessEqual(blocks[1].key().name(), '#00000001') | |
self.failUnlessEqual(blocks[2].key().name(), '#00000002') | |
self.failUnlessEqual(blocks[0].data, 'x'*16+'y'*16) | |
self.failUnlessEqual(blocks[1].data, 'y'*16+'z'*16) | |
self.failUnlessEqual(blocks[2].data, 'z'*16) | |
# Reopen and read the whole file | |
fh = file.open() | |
self.failUnlessEqual(fh.read(), 'x'*16+'y'*32+'z'*32) | |
def testReadlines(self): | |
file = datastorefile.Inode(block_size=32) | |
file.put() | |
fh = file.open() | |
fh.write("This is a test!!\n" * 10) | |
fh.close() | |
fh = file.open() | |
self.failUnlessEqual(fh.readlines(90), ["This is a test!!\n"]*6) | |
self.failUnlessEqual(fh.readlines(), ["This is a test!!\n"]*4+['']) | |
def testLongRead(self): | |
file = datastorefile.Inode(block_size=32) | |
file.put() | |
fh = file.open() | |
fh.write('x'*80+'\n') | |
fh.close() | |
fh = file.open() | |
self.failUnlessEqual(fh.read(65), 'x'*65) | |
self.failUnlessEqual(fh._pos, 65) | |
fh = file.open() | |
self.failUnlessEqual(fh.readline(81), 'x'*80+'\n') | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment