Last active
January 2, 2018 13:26
-
-
Save gippy/c4c6489c667bb238e5921f0dc51a42e4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
createKey(store, new count, count of added items) { | |
start = new count - count of added items + 1 | |
end = new count | |
return key in format '<store>/<padded start>-<padded end>' | |
} | |
class SequentialStore { | |
# DI of s3 client and reporting | |
constructor(s3, reporting){ | |
setup buffer, s3 client and reporting | |
every ... seconds | |
call this.buffer.getOld and cycle through stores | |
if timestamp is older then ... seconds | |
call this.flush(store) | |
call this.buffer.getPrepared and cycle through stores | |
if timestamp is older then ... seconds | |
call this.persist(store, timestamp) | |
} | |
push(store, record) { | |
size = this.buffer.push(store, record) | |
if size exceeds ... bytes call this.flush(store) | |
} | |
flush(store) { | |
lockedSuccessfully = this.buffer.lock(3s, store) | |
if not lockedSuccessfully then exit | |
timestamp = this.buffer.prepareToPersist(store) | |
call this.persist(store, timestamp) | |
} | |
persist(store, timestamp) { | |
lockedSuccessfully = this.buffer.lock(150s, store, timestamp) | |
if not lockedSuccessfully then exit | |
data, fields = this.buffer.getContents(store) | |
count of persisted items = this.reporting.beforePersist(store, length of data) | |
key = createKey(store, count of persisted items, length of data) | |
compressed data = compress(data) | |
s3.put(key, compressed data) | |
this.reporting.afterPersist( | |
store, | |
key, | |
fields, | |
count of persisted items, | |
size of compressed data in bytes | |
) | |
this.buffer.empty(store, timestamp) | |
} | |
getStream(store, offset, limit){ | |
count of persisted records, keys, fields = this.reporting.getStore(store) | |
data, fields = this.buffer.getContents(store) | |
key = createKey(store, totalCount, length of data) | |
add key to keys array | |
totalCount = count of persisted records + count of records in buffer | |
keys to download = empty array | |
cycle through keys | |
extract start and end of range | |
if start or end is between offset and offset + limit | |
push key to list of keys to download | |
create a stream | |
cycle through keys while stream is not paused | |
compressed data = s3.download(key) | |
call this.reporting.afterDownload( | |
store, | |
size of compressed data in bytes | |
) | |
data = decompress(compressed data) | |
cycle through items | |
if item is between offset and offset + limit then output it to stream | |
return stream | |
} | |
delete(store) { | |
call this.buffer.empty(store) | |
keys = this.reporting.getStore(store) | |
create batches of keys | |
for each batch call s3.delete(batch) | |
this.reporting.afterDelete(store) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment