Created
November 15, 2016 10:37
-
-
Save codemartial/20f8101f4cd4ac1ce67cd7f90cc10c6d to your computer and use it in GitHub Desktop.
A distributed filesystem prototype
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"fmt" | |
"sync" | |
"strings" | |
) | |
// Stores the filesystem hierarchy, data node members and their capacity | |
type MetaServer struct { | |
dataNodes map[int]int // NodeID: Capacity | |
dataNodeInstances map[int]*DataNode // NodeID: DataNode Instance Mapping | |
files *FSNode // Filesystem Hierarchy | |
dnMu sync.RWMutex // Data Node Mutex | |
fsMu sync.RWMutex // File System Mutex | |
} | |
func NewMetaServer() *MetaServer { | |
return &MetaServer{ | |
map[int]int{}, | |
map[int]*DataNode{}, | |
NewDirectory(), | |
sync.RWMutex{}, | |
sync.RWMutex{}, | |
} | |
} | |
func (meta *MetaServer) AddNode(nodeID, capacity int) bool { | |
meta.dnMu.Lock() | |
defer meta.dnMu.Unlock() | |
if _, ok := meta.dataNodes[nodeID]; ok { | |
return false | |
} | |
meta.dataNodes[nodeID] = capacity | |
meta.dataNodeInstances[nodeID] = NewDataNode(nodeID) | |
return true | |
} | |
func (meta *MetaServer) GetNode(nodeID int) (*DataNode, bool) { | |
meta.dnMu.RLock() | |
defer meta.dnMu.RUnlock() | |
node, ok := meta.dataNodeInstances[nodeID] | |
return node, ok | |
} | |
// Return the node with the maximum capacity | |
func (meta *MetaServer) GetFreeNode() (*DataNode, bool) { | |
meta.dnMu.RLock() | |
defer meta.dnMu.RUnlock() | |
var nodeID, capacity int | |
for _nodeID, _capacity := range(meta.dataNodes) { | |
if _capacity > capacity { | |
capacity = _capacity | |
nodeID = _nodeID | |
} | |
} | |
if capacity <= 0 { | |
return nil, false | |
} | |
node, ok := meta.dataNodeInstances[nodeID] | |
return node, ok | |
} | |
func (meta *MetaServer) ValidateNode(nodeID int) bool { | |
if _, ok := meta.dataNodes[nodeID]; !ok { | |
return false | |
} | |
if meta.dataNodes[nodeID] <= 0 { // Out of capacity | |
return false | |
} | |
return true | |
} | |
// Add a file into the directory listing | |
func (meta *MetaServer) RegisterFile(path string, nodeID int) bool { | |
meta.dnMu.RLock() | |
if valid := meta.ValidateNode(nodeID); !valid { | |
meta.dnMu.RUnlock() | |
return false // Data Node non-existent or out of capacity | |
} | |
meta.dnMu.RUnlock() | |
components := strings.Split(path, "/") | |
fileName := components[len(components)-1] | |
if len(components) == 0 || fileName == "" { | |
return false // Invalid path | |
} | |
meta.dnMu.Lock() | |
defer meta.dnMu.Unlock() | |
if valid := meta.ValidateNode(nodeID); !valid { | |
return false // Re-validating after aquiring write lock | |
} | |
meta.fsMu.Lock() | |
defer meta.fsMu.Unlock() | |
var node *FSNode | |
// Set up the top-level directory | |
node, ok := meta.files.Find(components[0]) | |
if !ok { | |
if added := meta.files.AddChild(components[0], NewDirectory()); !added { | |
return false // Error creating path component | |
} | |
node,_ = meta.files.Find(components[0]) | |
if node == nil { | |
fmt.Println("Newly created directory not retrieved") | |
return false | |
} | |
} | |
// Navigate File system hierarchy | |
for _,dirname := range(components[1:len(components)-1]) { | |
if !node.IsDirectory() { | |
return false // Path terminated in a file | |
} | |
if _, ok := node.Find(dirname); !ok { | |
if added := node.AddChild(dirname, NewDirectory()); !added { | |
return false // Error creating path component | |
} | |
} | |
node, _ = node.Find(dirname) | |
} | |
if _, ok := node.Find(fileName); ok { | |
return false // Already exists | |
} | |
// Register File | |
if added := node.AddChild(fileName, NewFile(nodeID)); !added { | |
return false // Couldn't add file to the hierarchy | |
} | |
meta.dataNodes[nodeID] -= 1 | |
return true | |
} | |
func (meta *MetaServer) ListPath(path string) (*FSNode, bool) { | |
components := strings.Split(path, "/") | |
if len(components) == 0 { | |
return nil, false | |
} | |
node, ok := meta.files.Find(components[0]) | |
if !ok { | |
return nil, false // File not found | |
} | |
node,_ = meta.files.Find(components[0]) | |
for _,dirname := range(components[1:]) { | |
if !node.IsDirectory() { | |
return nil, false // Path terminated in a file | |
} | |
node, ok = node.Find(dirname) | |
if !ok { | |
return nil, false // File not found | |
} | |
} | |
return node, true | |
} | |
// Filesystem node: is either a directory or a file | |
type FSNode struct { | |
listing map[string]*FSNode | |
dataNodeID int | |
} | |
func NewFile(nodeID int) *FSNode { | |
return &FSNode{nil, nodeID} | |
} | |
func NewDirectory() *FSNode { | |
return &FSNode{map[string]*FSNode{}, -1} | |
} | |
func (node *FSNode) ListDir() (map[string]*FSNode, bool) { | |
if node.IsDirectory() { | |
return node.listing, true | |
} | |
return nil, false | |
} | |
func (node *FSNode) GetStorageNodeID() (int, bool) { | |
if node.IsDirectory() { | |
return 0, false | |
} | |
return node.dataNodeID, true | |
} | |
func (node *FSNode) Find(name string) (*FSNode, bool) { | |
if !node.IsDirectory() { | |
return nil, false | |
} | |
child, ok := node.listing[name] | |
return child, ok | |
} | |
func (node *FSNode) IsDirectory() bool { | |
return node.listing != nil | |
} | |
func (node *FSNode) AddChild(name string, ch *FSNode) bool { | |
if !node.IsDirectory() { | |
return false | |
} | |
if _, ok := node.Find(name); ok { | |
return false | |
} | |
node.listing[name] = ch | |
return true | |
} | |
// Each data node stores contents of a file in full | |
type DataNode struct { | |
ID int // Node ID | |
files map[string]bytes.Buffer // File path: contents mapping | |
mu sync.RWMutex | |
} | |
func NewDataNode(nodeID int) *DataNode { | |
return &DataNode{nodeID, map[string]bytes.Buffer{}, sync.RWMutex{}} | |
} | |
func (dn *DataNode) StoreFile(path string, contents bytes.Buffer) bool { | |
dn.mu.Lock() | |
defer dn.mu.Unlock() | |
if _, ok := dn.files[path]; ok { | |
return false | |
} | |
dn.files[path] = contents | |
return true | |
} | |
func (dn *DataNode) FetchFile(path string) (bytes.Buffer, bool) { | |
dn.mu.RLock() | |
defer dn.mu.RUnlock() | |
contents, ok := dn.files[path] | |
return contents, ok | |
} | |
func (dn *DataNode) DeleteFile(path string) bool { | |
dn.mu.Lock() | |
defer dn.mu.Unlock() | |
if _, ok:= dn.files[path]; !ok { | |
return false | |
} | |
delete(dn.files, path) | |
return true | |
} | |
// Client interface for creating, fetching and listing files | |
type Client struct { | |
meta *MetaServer | |
} | |
func NewClient(meta *MetaServer) *Client { | |
return &Client{meta} | |
} | |
func (c *Client) Put(path string, contents bytes.Buffer) bool { | |
dn, ok := c.meta.GetFreeNode() | |
if !ok { | |
return false | |
} | |
if stored := dn.StoreFile(path, contents); !stored { | |
return false | |
} else { | |
fmt.Println("Client.Put: (",path,") File stored on data node ", dn.ID) | |
} | |
if registered := c.meta.RegisterFile(path, dn.ID); registered { | |
return true | |
} | |
dn.DeleteFile(path) | |
return false | |
} | |
func (c *Client) ListPath(path string) (*FSNode, bool) { | |
listing, ok := c.meta.ListPath(path) | |
return listing, ok | |
} | |
func (c *Client) GetFile(path string) (bytes.Buffer, bool) { | |
file, ok := c.ListPath(path) | |
if !ok { | |
fmt.Println("Client.GetFile: (",path,") File not found") | |
return bytes.Buffer{}, false | |
} | |
if file.IsDirectory() { | |
fmt.Println("Client.GetFile: (",path,") Found directory instead of file") | |
return bytes.Buffer{}, false | |
} | |
nodeID, ok := file.GetStorageNodeID() | |
if !ok { | |
fmt.Println("Client.GetFile: (",path,") Invalid file location") | |
return bytes.Buffer{}, false | |
} | |
dn, ok := c.meta.GetNode(nodeID) | |
if !ok { | |
fmt.Println("Client.GetFile: (",path,") Couldn't access data node") | |
return bytes.Buffer{}, false | |
} | |
if contents, ok := dn.FetchFile(path); !ok { | |
fmt.Println("Client.GetFile: (",path,") Couldn't fetch file data") | |
return bytes.Buffer{}, false | |
} else { | |
return contents, ok | |
} | |
} | |
func bootstrapServer() *MetaServer { | |
meta := NewMetaServer() | |
meta.AddNode(1, 2) | |
meta.AddNode(2, 2) | |
meta.AddNode(3, 2) | |
return meta | |
} | |
func main() { | |
server := bootstrapServer() | |
fmt.Println("Server up") | |
c1 := NewClient(server) | |
c2 := NewClient(server) | |
c3 := NewClient(server) | |
fmt.Println("Clients up") | |
done := make(chan struct{}, 2) | |
go func() { | |
if !c1.Put("/frob/test", *bytes.NewBufferString("Test")) { | |
fmt.Println("Failed to create file with contents: ","Test") | |
} | |
if !c1.Put("/foo/bar.txt", *bytes.NewBufferString("Hello")) { | |
fmt.Println("Failed to create file with contents: ","Hello") | |
} | |
v:= struct{}{} | |
done <- v | |
}() | |
go func() { | |
if !c2.Put("/foo/bar.txt", *bytes.NewBufferString("Hi")) { | |
fmt.Println("Failed to create file with contents: ","Hi") | |
} | |
if contents, ok := c2.GetFile("/frob/test"); ok { | |
fmt.Println("File contents: ", contents.String()) | |
} else { | |
fmt.Println("File not found: /frob/test") | |
} | |
v := struct{}{} | |
done <- v | |
}() | |
for i:= 0; i < 2; i++ { | |
<- done | |
} | |
if contents, ok := c3.GetFile("/foo/bar.txt"); ok { | |
fmt.Println("File contents: ", contents.String()) | |
} else { | |
fmt.Println("File not found: /foo/bar.txt") | |
} | |
if !c2.Put("/foo/baz.txt", *bytes.NewBufferString("World")) { | |
fmt.Println("Failed to create file with contents: ","Worls") | |
} | |
if contents, ok := c3.GetFile("/foo/baz.txt"); ok { | |
fmt.Println("File contents: ", contents.String()) | |
} else { | |
fmt.Println("File not found: /foo/baz.txt") | |
} | |
if node, ok := c2.ListPath("/foo"); ok { | |
fmt.Println(node.ListDir()) | |
} else { | |
fmt.Println("Path not found: /foo") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment