This Document discusses design for Unified Chunk Management Layer which is part of a larger effort Pravega Simplified Storage Subsystem (S3).
This also covers design for PDP 34: CRWD Tier 2. https://github.com/pravega/pravega/wiki/PDP-34:-CRWD-Tier-2
Here are some of the problems we are facing today that are motivating this design
- Need to use complex fencing logic for HDFS making it slow and hard to maintain.
- Impedence mismatch between current model and the way Amazon S3 works.
- Need for performance improvements in general
- Simplify API contract: Support additional cloud storage options by simplifying the API contract and requirements on the tier-2 storage bindings.
- Eliminate complex fencing: Tier-2 binding implementations should not require complex fencing logic. More specifically, provide stronger guarantees around isolation of writes from two different segment store instances. (In other words, two segment store instances will never write to a same underlying file.)
- Support Amazon S3: Support bindings that do not support append operations. Especially support storage bindings that only support Amazon S3 compatible APIs.
- Leverage Merge/Concat capabilities effectively whenever underlying binding provides them with strong guarantees. (Eg multipart upload in S3 ECS, concating existing files in HDFS)
- Allow storage bindings complete freedom to optimize their own implementations by using additional capabilities effectively whenever/wherever underlying binding provides them with strong guarantees. (Eg conditional append in ECS/Azure/GCP, truncate)
- Extension points: Enable basic architectural extension points/ mechanisms to implement
- Additional background services: Enable creation of additional background services like defragmenter that merge number of smaller tier-2 objects into large contiguous ones by leveraging concat operation where underlying storage provides it.
- Integrity Checks/Self repair: Enable basic support for additional admin functionality like self-repair and integrity checks (like fschk).
- Import/Export: Enable basic architectural extension points/ mechanisms to implement importing and exporting data in Pravega from pre-existing external sources.
- Ability to read data back without using Pravega or outside of Pravega is out of scope. The data written by Pravega must still be read back using Pravega.
- While our goal is to minimize storage overhead and keep it below 1%, it is not our goal to completely eliminate a possibility of temporarily creating extra data, metadata or even some garbage data on tier-2. There may be small amounts of extra metadata or additional orphaned or temporary data stored on the tier-2 at any point in time during normal operations and/or failover scenarios.
- Implementing import or export of pre-existing data in other formats (eg avro, porque) is out of scope.
- Support additional features like encryption, compression, error/bit-rot resilience, interoperability with open formats (eg. Avro) etc.
It is helpful for us to explicitly state these assumption to better understand available design space and it’s constraints.
It is a fundamental assumption of Pravega architecture that Tier-1 provides low latency using high end storage devices whereas Tier-2 provides cheap long-term storage at higher throughput. The storage system design will take advantage of following facts
- Tier-2 is not on a critical path for write operation. Write to tier-2 is designed to be async.
- Tail reads should be mostly served from cache. Tier-2 should not be on a critical path for tail read.
- In case of historical/Batch reads, throughput matters substantially more than fast “first byte read time” and any latency in reading the first bytes is eventually amortized by higher throughput.
- In addition, for sequential reads the data can be prefetched in large read operations to increase performance further.
There is a hard requirement that the actual sequence of data bytes stored inside a storage object/file must match byte by byte to sequence of bytes that user provided.
- Tier-2 does not interprete the user provided bytes in any way. For pravega the data is an opaque.
- Tier-2 does not change the original sequence of bytes nor mutates them.
- There are no any addition headers and footers added directly to or embeded in the user data.
- The actual sequence of bytes is not changed at all. Eg. data is not encrypted, not compressed, no erasure encoding or error correction bits are added.
- However we may store a single segment in Multiple storage objects
- There may be additional headers/footers as separate metadata objects in addition to segment data.
This design critically depends on ability of Bookkeeper tier-1 to fence old/outdated bookie writer.
Any system design inevitably involves making tough choices when conflicting requirements cannot be achieved simultaneously. Below are our guidelines when making such choices.
- Prefer consistency and correctness guarantees over higher performance.
- Prefer write throughput over write latency.
- Prefer focus on making normal performance fast and keeping it just-good-enough/tolerable/acceptable in exceptional situations.
- Prefer writing big chunks of data over eagerly evicting cache.
- Prefer not evicting system segments whenever possible.
- Prefer admin initiated maintenance operations.
- Prefer conservative approach to changing on-disk metadata formats by being backward and forward compatible.
A chunk is a basic unit of storage in tier-2. Conceptually it is a contiguous range of bytes. A segment is made up of sequence of non-overlapping chunks.
- A chunk is always stored contiguously on tier-2 as a single indivisible unit. It is never split into multiple tier-2 objects/files.
- For current discussion, chunk is simply a separate file/object on tier-2
- For each chunk, the actual sequence of data bytes stored inside a storage object/file must match byte by byte to sequence of bytes that user provided.
- The persisted data will not include additional metadata in addition to user supplied bytes of data. This metadata resides in separate object.
- We require tier-2 bindings to provide only the following operations - Create, Delete, Write and Read (CRWD) chunks to/from underlying storage.
- However, the specific binding may optimize its internal implementation by utilizing additional functionality like append. Append is not necessary, and not included in API contract. It is entirely up to tier-2 implementation to optimize using whatever primitives underlying storage provides.
- In addition, optionally the operations to Merge (M) and Truncate(T) might be provided.
- The List (L) operation is not required for normal operation but is generally available and may be required for implementing admin tools functionality.
The API contract is described later in the document.
- Each chunk is a unique file.
- If storage provider does not support append (eg. vanilla S3), then each append/write is written to a separate chunk on tier-2.
- If storage provider does support conditional appends (or otherwise concurrency safe appends), then multiple appends/writes can be stored in a single chunk on tier-2. Instead of creating new chunk for each write, a data is simply written/appended to an existing chunk.
- Except possibly for the system segments, the actual chunk names used while persisting on tier-2 are arbitrary. There is no special meaning attached to the format of the name. The names could be UUIDs or may contain any combination of segment name, segment store epoch, offset etc. However the name of chunk is required to be globally unique.
- It is guaranteed that two segment store instances will never write to a same underlying file. (By imposing restriction that a segment store instance never appends to file/object created by another instance. Explained in detail later)
It will be useful to have a rough estimate of number of chunks and segments for understanding design space. The intention here is to capture estimate of orders of magnitutes.
- A chunk could be as small as one byte, but we prefer them to be really large. There is no upper limit on the size of the chunk.
- The average size of chunk is expected to be low 10s of GB.
- There can be upto 25K active segments per container with estimated average around 1K segments.
- There is no upper limit on number of segments in a container.
- A segment can have upto 100s of chunks. The average number of chunks is expected to be order of magnitude lower with aggressive and efficient merging.
- Conceptually the segment metadata consists of a header describing various properties of a segment plus a linked list of chunk metadata records describing each chunk.
Below is how segment metadata records are organized.
- The segment metadata and layout information are stored using key-value tuples using pinned table segment (as opposed to single record containing all metadata including chunk layout data of the segment).
There are two ways to store metadata using key value store.
- Using single key-value pair for all metadata ( Prefered for V1)
- Complete metadata is stored in a serialized value field.
- For any metadata update the entire value is re-written.
- Pros: The number of keys is kept to minimum. This will improve the table segment index performance.
- Con: Cannot support large number of chunks because total number of bytes written is O(N^2) to number of metadata updates.
- Using multiple-value pairs for metadata (Optional- implemented post V1)
- There is one KV pair for the header for each segment. In addition to segment properties it contains pointer to first/last chunk metadata records.
- There is one KV pair for each chunk. In addition to chunk properties it contains pointer to previous/next chunks.
- NOTE - The pointer here means a string that can be used as a key to retrieve the full record by calling get on KV store.
- Pros: Only small part of metadata is updated. Storage overhead is reduced.
- Con: Frequent metadata update.
- All metadata operations are performed using conditional updates and using a single transaction that updates multiple keys at once. This guarantees that concurrent operations will never leave the metadata in inconsistent state.
- Given the append only nature of segments, there can be only one file to which data is appended. When a new file is opened using OpenWrite the old file is considered "non-active".
- In case of failover, the new segment store instance creates a new file and makes it active. The offset of the old file seen by the new instance at the time of openeing a new file is recorded in the matadata.
- To avoid frequent updates to metadata, the metadata about all the chunks in the file/object is updated lazily only when required.
- More specifically the length of actively written file is not updated with each write. It is updated only when the file is no more considered active.
- The updates are required only in following cases.
- Creating new file/object, so that name of the file can be recorded for new object and final offset can be updated for previous file.
- When two segments are concatenated. (eg. Transactions)
- When segments are truncated.
- When segment is created, sealed, unsealed, deleted.
- Table segments internally use Tier-1. Therefore, a segment store instance is fenced in a failover scenario and it can no longer make change to segment metadata. This provides strong guarantees against data corruption by zombie segment stores.
- By splitting the data in multiple records, we avoid writing same data again and again but update only the records regarding modified (or last) chunk.
- Problem of writing tiny metadata updates to tier-2 is also solved as table store will aggregate updates to metadata for number of segments into large writes to tier-2.(recall that table store saves data in system segments).
- Given the “append only” only semantics of segments, a very few KV records are updated during segment metadata operations except for last chunk.
- The segment level operations like concat, seal, unseal are relatively infrequent.
- Therefore, metadata records are excellent candidates for being cached. (eg. Guava cache)
- All metadata persisted should contain a version.
- In future, Segment metadata may contain some additional fields – Eg. Access Control Lists (ACL)
- In future, Chunk metadata may contain some additional fields – Eg. CRC/Checksums,
- This is useful for backward compatibility, backup and disaster recovery
- A snapshot of segment metadata (with metadata for all its chunks) can be exported to tier-2 on demand. Current header file format will be supported.
- A snapshot of segment metadata (with metadata for all its chunks) can be imported from existing tier-2 snapshot on demand.
- Automatic import of current header file format will be implemented.
Segment Merge and other Layout change operations are segment level operations that change the number and layout of the chunks that make up a segment. Segment layout is specified by a linked list of chunk records. (recall here that records are stored in table store as KV pair. However, they are conceptually linked list nodes and they “point” to other records using string key as a “pointer”)
- When two segments are merged via concat operation, the data stored on the tier-2 is not changed. Only the layout (sequence) of chunks is changed which is purely metadata operation. This is a simple operation consisting of concatenating two linked list of chunk metadata records by appending one linked list to other. Only last chunk metadata record of the target segment needs to “point” to first chunk metadata record of the source. The properties of target segment is updated in its header record to reflect result of concat and header record for the source is deleted. This is purely metadata operation.
- Multiple small chunks are copied to a big chunk using storage binding’s native capabilities (Eg, multipart upload for S3 or concat for HDFS).
- The sublist corresponding copied chunks is replaced with single new chunk record corresponding to new large file/object. Metadata operation is like deleting nodes from middle of the list.
- It is assumed that both S3 and HDFS implement concat as mostly metadata operation on their side without moving data.
- Therefore, once again it ends up being mostly a metadata operation for Pravega.
- A special corner case of defrag is when series of chunks are in the same file consecutively and chunks can be merged in situ without modifying. This case can be optimized by inline merge while writing new chunk.
- The chunks at the head of the segment that are situated completely below the new start offset are deleted. Metadata operation is like deleting nodes from start of the list.
- Whenever there is a need to rollover the segment or when large write needs to be split in smaller writes, new chunks is added as needed and metadata updated.
New segment header added to KV store. No tier-2 object created. (If segment header record does not exist, then attempt is made to create and populate segment header and chunk records using files found on tier-2, before creating new segment header record.)
- If segment header record exists in the KV store then the layout information from chunk metadata is read and cached.
- If segment header record does not exist, then attempt is made to create and populate segment header and chunk records using files found on tier-2.
- Presence of segment header record which is marked active, indicates whether segment exists or not.
- Presence of segment header record which is marked active, indicates whether segment exists or not. If segment header record does not exist, then attempt is made to create and populate segment header and chunk records using files found on tier-2.
- This is purely a metadata operation on segment header.
- This is purely a metadata operation on segment header.
- Mark segment as deleted in segment header.
- All tier-2 objects and chunk metadata is deleted asynchronously.
- The list of chunk metadata is used to identify the chunks containing data.
- Using chunk metadata, the corresponding tier-2 object/file is read. For efficient lookup, the chunk metadata is cached.
- The list of chunk metadata is used to identify the last chunk if any.
- A new chunk is written to the tier-2 file/object. (to either an existing file or new depending on underlying storage provider.)
- The metadata for new chunk is added.
- If the new chunk can be merged inline or triggers a merge, it is merged. Relevant fields header and affected chunk metadata is updated in a single transaction.
- Data need not be read sequentially one chunk after another. All the relevant chunks could be read in parrallel. This is benefetial where underlying storage itself is highly distributed and replicated (Eg. HDFS, Cloud storages like S3) giving higher effective throughput.
- Just like normal read The list of chunk metadata is used to identify the chunks containing data.
- Using chunk metadata, the corresponding tier-2 object/file is read in parallel. For efficient lookup, the chunk metadata is cached.
- List all segments
- For each segment metadata is exported to a file (using well-known file convention)
- Import segment metadata previously imported to a file.
-
How do we guarantee zombie segment store cannot write to tier-2? Write operation is not considered complete until metadata about that write is committed. When tier-1 ledger is closed, the zombie segment store is fenced out and cannot write to tier-1. This means it cannot metadata in the table segment neither can it truncate the ledger. Whatever data written to tier-2 by zombie after its last successful metadata update will be simply ignored. This guarantees consistency for tier-2.
-
Will this break if tier-1 is changed to something other than Bookkeeper? Yes. Unless its replacement also provides automatic fencing on closed ledgers.
-
How do we guarantee multiple segment stores never write to the same file? Each segment store instance gets a unique id when instantiated (epoch in current case). This id is used/included while generating a file/object name for a given segment.
-
You mentioned both Append and Merge. Aren’t we supposed to have CRWD only? The API contract does not require append. But is free to use append to improve performance by writing multiple chunks to the same underlying file/object. Merge is also optional but if efficient implemntation is available we should be able to leverage it.
-
There will be large number of small objects. Will this not be a problem? No.
- (a) For the storage providers that do provide append operations - Same object/file will contain multiple consecutive chunks. Because of guaranteed Single Writer pattern no fencing gymnastics are necessary thereby providing fast write performance. Because same file is appended to the resultant file contains several chunks which improves read performance as well. The chunk metadata can be merged “inline”.
- (b) For the storage providers that do not provide append we defragment the segment by using native concat capabilities which are assumed to be efficiently implemented by underlying storage.
- (c) For reads – intermediate results for offset search will be aggressively cached.
- [WIP] How do we make sure system is able to boot properly by loading table segment.
Component | Responsibilities | Collaborations |
---|---|---|
SegmentMetadataTableStore(SMTS) | Store all the metadata for the segments associated with for the given container. Data is stored as simple KV store. Interface follows repository pattern. | |
SMTSCache | write through cache for SMTS entry | |
StorageAdapter | Translates segment operations to operations on individual chunks. | |
StorageProvider (s) | Component that provides bindings for various storage providers. | |
StorageWriter | Periodically writes data to tier-2 | |
StorageReader | Prefetches the data from tier-2 | |
StorageDeframenter | Periodically merges set of small tier-2 objects into a larger object. Remove deleted objects from the tier-2. |
/**
* Defines an abstraction for Permanent Storage.
* Note: not all operations defined here are needed.
*/
public interface PersistentStorageProvider extends AutoCloseable {
/**
* Gets a value indicating whether this Storage implementation supports truncate operation on underlying storage object.
*
* @return True or false.
*/
boolean supportsTruncation();
/**
* Gets a value indicating whether this Storage implementation supports append operation on underlying storage object.
*
* @return True or false.
*/
boolean supportsAppend();
/**
* Gets a value indicating whether this Storage implementation supports merge operation on underlying storage object.
*
* @return True or false.
*/
boolean supportsConcat();
/**
* Determines whether named file/object exists in underlying storage.
*
* @param name Name of the storage object to check.
* @return True if the object exists, false otherwise.
*/
boolean exists(String name);
/**
* Creates a new file.
*
* @param name Name of the storage object to create.
* @return CompletableFuture A CompletableFuture corresponding to completion of the operation.
* @throws IOException
*/
CompletableFuture create(String name) throws IOException;
/**
* Deletes a file.
*
* @param name Name of the storage object to delete.
* @return CompletableFuture A CompletableFuture corresponding to completion of the operation.
* @throws IOException
*/
CompletableFuture delete(String name) throws IOException;
/**
* Reads a range of bytes from the underlying storage object.
*
* @param name Name of the storage object to read from.
* @param fromOffset Offset in the file from which to start reading.
* @param length Number of bytes to read.
* @param buffer Byte buffer to which data is copied.
* @param bufferOffset Offset in the buffer at which to start copying read data.
* @return CompletableFuture A CompletableFuture containing number of bytes read.
* @throws IOException
* @throws NullPointerException
* @throws IndexOutOfBoundsException
*/
CompletableFuture<Integer> read(String name, long fromOffset, int length, byte[] buffer, int bufferOffset) throws IOException, NullPointerException, IndexOutOfBoundsException;
/**
* Writes the given data to the underlying storage object.
*
* @param name Name of the storage object to write to.
* @param offset Offset in the file to start writing.
* @param length Number of bytes to write.
* @param data An InputStream representing the data to write.
* @return CompletableFuture A CompletableFuture containing number of bytes written.
*/
CompletableFuture<Integer> write(String name, long offset, int length, InputStream data) throws IOException;
/**
* Concatenates two or more files.
*
* @param target Name of the target file to concat to.
* @param sources Array of strings of the names of existing files to be appended to the target. The files are appended in the same sequence the names are provided.
* @return CompletableFuture A CompletableFuture corresponding to completion of the operation.
* @throws IOException
*/
CompletableFuture concat(String target, String... sources) throws IOException, UnsupportedOperationException;
/**
* Truncates
*
* @param name Name of the storage object to truncate.
* @param offset Offset to truncate to.
* @return CompletableFuture A CompletableFuture corresponding to completion of the operation.
* @throws IOException
*/
CompletableFuture truncate(String name, long offset) throws IOException, UnsupportedOperationException;
}
- Key – segment name ,
- Mutable – yes
- Value - Serialized record containing following info
- Version
- Status (sealed | deleted )
- Storage Provider ID. (enum)
- Start offset
- End offset
- Last chunk pointer
- First chunk pointer
- Key – “chunk-” , where N is contiguous monotonically increasing chunk number starting with 0
- Mutable – Mostly immutable , once written this data seldom changes.
- Value - Serialized record containing following info
- Version
- Chunk path/name on the tier-2 storage
- Start offset ,
- length
- Status (sealed | deleted | merging )
- Pointer to next chunk metadata record.
- Pointer to previous chunk metadata record. (We don’t really need this yet)