Skip to content

Instantly share code, notes, and snippets.

@gippy
Last active January 2, 2018 13:26
Show Gist options
  • Save gippy/c4c6489c667bb238e5921f0dc51a42e4 to your computer and use it in GitHub Desktop.
Save gippy/c4c6489c667bb238e5921f0dc51a42e4 to your computer and use it in GitHub Desktop.
createKey(store, new count, count of added items) {
start = new count - count of added items + 1
end = new count
return key in format '<store>/<padded start>-<padded end>'
}
class SequentialStore {
# DI of s3 client and reporting
constructor(s3, reporting){
setup buffer, s3 client and reporting
every ... seconds
call this.buffer.getOld and cycle through stores
if timestamp is older then ... seconds
call this.flush(store)
call this.buffer.getPrepared and cycle through stores
if timestamp is older then ... seconds
call this.persist(store, timestamp)
}
push(store, record) {
size = this.buffer.push(store, record)
if size exceeds ... bytes call this.flush(store)
}
flush(store) {
lockedSuccessfully = this.buffer.lock(3s, store)
if not lockedSuccessfully then exit
timestamp = this.buffer.prepareToPersist(store)
call this.persist(store, timestamp)
}
persist(store, timestamp) {
lockedSuccessfully = this.buffer.lock(150s, store, timestamp)
if not lockedSuccessfully then exit
data, fields = this.buffer.getContents(store)
count of persisted items = this.reporting.beforePersist(store, length of data)
key = createKey(store, count of persisted items, length of data)
compressed data = compress(data)
s3.put(key, compressed data)
this.reporting.afterPersist(
store,
key,
fields,
count of persisted items,
size of compressed data in bytes
)
this.buffer.empty(store, timestamp)
}
getStream(store, offset, limit){
count of persisted records, keys, fields = this.reporting.getStore(store)
data, fields = this.buffer.getContents(store)
key = createKey(store, totalCount, length of data)
add key to keys array
totalCount = count of persisted records + count of records in buffer
keys to download = empty array
cycle through keys
extract start and end of range
if start or end is between offset and offset + limit
push key to list of keys to download
create a stream
cycle through keys while stream is not paused
compressed data = s3.download(key)
call this.reporting.afterDownload(
store,
size of compressed data in bytes
)
data = decompress(compressed data)
cycle through items
if item is between offset and offset + limit then output it to stream
return stream
}
delete(store) {
call this.buffer.empty(store)
keys = this.reporting.getStore(store)
create batches of keys
for each batch call s3.delete(batch)
this.reporting.afterDelete(store)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment