Skip to content

Instantly share code, notes, and snippets.

@bttger
Last active February 17, 2023 12:19
Show Gist options
  • Save bttger/ac9a26a53f55cc4c3d1effca4b91d283 to your computer and use it in GitHub Desktop.
Save bttger/ac9a26a53f55cc4c3d1effca4b91d283 to your computer and use it in GitHub Desktop.
My mdfc notes I collected throughout the semester with copied stuff from slides, papers, and the internet. NOT REVIEWED YET answers might be wrong and please don't copy them blindly

Overview

Why should you make a distributed system?

  • The system is inherently distributed (e.g. mobile devices like smartphones)
  • Reliability (some nodes can fail and the system keeps functioning; fault tolerance)
  • Performance (process data faster by utilizing multiple nodes or achieve better latency by requesting data from a nearby node)
  • The capacity of a single machine isn’t enough for the task (e.g. limited memory size, bandwidth, CPU cycles, etc.; scalability)

What are the Characteristics of distributed systems?

Failure is the defining difference between distributed and local programming. Distributed systems must be designed with the expectation of both hardware (connection, disk, etc.) and software failures.

  • Fault-Tolerant: It can recover from component failures without performing incorrect actions.
  • Highly Available: It can restore operations, permitting it to resume providing services even when some components have failed.
  • Recoverable: Failed components can restart themselves and rejoin the system, after the cause of failure has been repaired.
  • Consistent: The system can coordinate actions by multiple components often in the presence of concurrency and failure. This underlies the ability of a distributed system to act like a non-distributed system.
  • Scalable: It can operate correctly even as some aspect of the system is scaled to a larger size. For example, we might increase the size of the network on which the system is running. This increases the frequency of network outages and could degrade a ”non-scalable” system. Similarly, we might increase the number of users or servers, or overall load on the system. In a scalable system, this should not have a significant effect.
  • Predictable Performance: The ability to provide desired responsiveness in a timely manner.
  • Secure: The system authenticates access to data and services.

What are the types of failures that can occur in a distributed system?

  • Halting failures: A component simply stops. There is no way to detect the failure except by timeout: it either stops sending ”I’m alive” (heartbeat) messages or fails to respond to requests. Your computer freezing is a halting failure.
  • Fail-stop: A halting failure with some kind of notification to other components. A network file server telling its clients it is about to go down is a fail-stop.
  • Omission failures: Failure to send/receive messages primarily due to lack of buffering space, which causes a message to be discarded with no notification to either the sender or receiver. This can happen when routers become overloaded.
  • Network failures: A network link breaks.
  • Network partition failure: A network fragments into two or more disjoint sub-networks within which messages can be sent, but between which messages are lost. This can occur due to a network failure.
  • Timing failures: A temporal property of the system is violated. For example, clocks on different computers which are used to coordinate processes are not synchronized; when a message is delayed longer than a threshold period, etc.
  • Byzantine failures: A component such as a server can inconsistently appear both failed and functioning to failure-detection systems, presenting different symptoms to different observers. It is difficult for the other components to declare it failed and shut it out of the network, because they need to first reach a consensus regarding which component has failed in the first place.

What is the difference between bandwidth and throughput?

Bandwidth is the theoretical capacity of a data channel. It describes the maximum amount of data that the channel can transfer. The throughput on the other side describes the actual, maximum amount of data that the channel can transfer in a running system. This amount gets influenced by multiple factors including the latency and protocol. For example, TCP relies on socket setups and acknowledgements to detect lost packets but UDP doesn't. The additional round trips of TCP decrease the actual throughput. On the other hand, UDP, and protocols that rely on UDP like DNS and QUIC/HTTP3, have higher throughput since UDP is connectionless and doesn't guarantee ordering of packets.

Remote Procedure Calls (RPC)

What is an RPC?

A remote procedure call, RPC, is a technique based on extending the notion of local procedure calling, so that the called procedure may not exist in the same address space as the calling procedure. The two processes may be on the same system, or they may be on different systems with a network connecting them. An RPC is similar to a function call.

How do RPCs work on a high level?

Like a function call, when an RPC is made, the arguments are passed to the remote procedure and the caller waits for a response to be returned. The client process waits until either a reply is received, or it times out. When the request arrives at the server, it calls a dispatch routine that performs the requested service, and sends the reply to the client. After the RPC call is completed, the client process continues.

How does the communication between RPC agents look like?

The communication protocol is created by stubs generated by a protocol compiler. A stub is a routine that doesn't actually do much other than declare itself and the parameters it accepts. The stub contains just enough code to allow it to be compiled and linked.

The client and server programs must communicate via the procedures and data types specified in the protocol. The server side registers the procedures that may be called by the client and receives and returns data required for processing. The client side calls the remote procedure, passes any required data and receives the returned data.

Which error cases can happen for RPC applications?

RPC introduces a set of error cases that are not present in local procedure programming. Some RPC applications view these types of errors as unrecoverable. Fault-tolerant systems, however, have alternate sources for critical services and fail-over from a primary server to a backup server.

  • Binding error when server is not running
  • Version mismatches if client compiled against other version than the server
  • Network data loss resulting in retransmission
  • Server process crashes during RPC operation: If the server crashes before the response was sent but after a state change, duplicate requests can result
  • Client process crashes before receiving response: Client is restarted, and server discards response and rolls back operation

Apache MapReduce

How do the map and reduce interfaces look like?

map(k1, v1) -> (k2, v2)[] and reduce(k2, v2[]) -> (k2, v3)

Which challenges need to be solved repeatedly in big data scenarios?

  • Parallelization of application logic
  • Communication between nodes
  • Synchronization
  • Load balancing
  • Fault-tolerance
  • Scheduling of jobs

How does the MapReduce algorithm work? Also describe what happens with K1 and K2 keys and their data.

  1. Map: Each worker node applies the map function to the local data with keys K1, and writes the output to a temporary storage. A master node ensures that only one copy of the redundant input data is processed.
  2. Shuffle: Worker nodes redistribute data based on the output keys K2 (produced by the map function), such that all data belonging to one key is located on the same worker node.
  3. Reduce: Worker nodes now process each group of output data, per key K2, in parallel.

How does the MapReduce architecture look like?

The MapReduce framework employs a master-worker architecture (though Apache Hadoop calls their workers "data nodes"). The master is called the "name node" and it keeps the directory tree of all files in the file system, and tracks where across the cluster the file data is kept. It does not store the data of these files itself. Also, it keeps track of all the MapReduce jobs. The "data nodes" stores all the data and listens for new tasks. Thus, each data node consists of the distributed file system and a task tracker to run tasks on the stored data.

name node <==> job tracker data nodes <==> task trackers

What are limitations of MapReduce?

  • Graph algorithms
  • Iterative algorithms
  • Stream processing (and low latency)
  • .. overall it is just a very low level abstraction for common data analysis tasks (whereas Pig is a high level abstraction)

Apache Pig

What is Apache Pig?

Pig = SQL-like procedural queries + distributed execution

Apache Pig is a high-level platform for handling common data analytics tasks that run on Apache Hadoop. The language for this platform is called Pig Latin which are SQL-like queries with distributed execution. Pig Latin abstracts the programming from the Java MapReduce idiom into a notation which makes MapReduce programming high level, similar to that of SQL for relational database management systems.

A Pig Latin script gets compiled into several MapReduce jobs. The Pig compiler first creates a logical plan based on the SQL-like statements, then a physical plan with data rearranges and packaging, and at the last step, converts it to MapReduce jobs.

What is the advantage of the staged compilation of Pig?

  • SQL query optimizations
  • MapReduce specific optimizations

Leases

Why are leases better suited for distributed systems than plain locks?

A lease is a contract that gives its holder specified rights to some resource for a limited period. Because it is time-limited, a lease is an alternative to a lock. A traditional resource lock is granted until it is explicitly released by the locking client process. But that means that a process can hold a resource indefinitely which is a problem for several reasons:

  • The client failed before releasing the resources
  • The client deadlocked while attempting to allocate another resource
  • The client was blocked or delayed for an unreasonable period
  • The client neglected to free the resource, perhaps due to a bug
  • The request to free the resource was lost
  • The resource manager failed or lost track of the resource stated

Any of these could end the availability of an important reusable resource until the system is reset. By contract, a lease is valid for a limited period, after which it automatically expires, making the resource available for reallocation by a new client. A process can renew the lease before it expires if it wants to extend the access.

What types of leases exist? And are they granted concurrently or exclusively?

  • Read leases: allow clients to cache clean data. The guarantee is that no other client is modifying the data. Read leases may be granted concurrently.
  • Write leases: allows safe delayed writes. The guarantee is that no other client has data caches. A client can locally modify the data, and then write in a batch. Write leases are granted exclusively.

What are possible issues of leases?

  • There must be the possibility to notify lease holders of the expiration and preventing the agent to further rely on the resource => Access token that must be sent with resource request and which is invalidated if lease expired
  • Leases depend on well-behaved clocks => If the speeds mismatch, it can lead to inconsistencies

How are lease failures and conflicts resolved?

If a conflict exists, the server may send eviction notices to the clients. An evicted write lease must write its state back. Evicted read leases must flush/disable their cache. Clients must acknowledge when they completed the notice.

The bounded lease term simplifies the failure recovery. Before leases expire, clients must renew them. If a client fails while holding a lease, the server waits until the lease expires and then reclaims the resource. The same applies during an eviction. If the server fails while there are leases outstanding, it waits the lease period plus the clock skew time before it issues new leases.

Distributed File Systems (GFS)

What is a distributed file system?

A distributed file system is a client/server-based application that allows clients to access and process data stored on the server as if it were on their own computer.

What are design requirements of GFS and HDFS?

  • Large files
  • Scalable
  • Reliable
  • Write once (or append only in the case of HDFS; means it is not POSIX compatible)
  • Available
  • Concurrent
  • Namespace
  • Performance

What is the main difference between HDFS and GFS?

GFS can update specific file regions and append to files at an offset chosen by GFS, while HDFS can only append to files. The append operation is atomic and idempotent (following the at-least-once scheme) for both GFS and HDFS since the system choses an offset automatically.

Though, Google writes in their paper that "practically all [their] applications mutate files by appending rather than overwriting".

What does HDFS allow you to run on top of it?

Applications like MapReduce and Pig

How does the GFS architecture look like?

  • Single master
    • log replicated
    • serves the file metadata (namespace, access control, chunk handles mapping, location of chunks)
    • controls system wide services (chunk leases, garbage management of orphaned chunks, chunk migration between servers)
    • communicates with chunk servers in HeartBeat messages (instructions to chunk servers and retrieval of their state)
  • Multiple chunk servers
    • contains many 64MB chunks identified by a chunk handle assigned by the master during chunk creation
    • each chunk replicated on 3 chunk servers (Though, since the chunks are very big, it can lead to hot spots for small files that consist of only a single chunk and that need to be accessed by many clients. Thus the replication factor is higher for small files that are in high demand.)
  • Clients
    • caches the metadata responses but not the chunks (since chunks are plain files on Linux hosts which are cached automatically on the system)
    • communicate with the
      • master only for metadata operations
      • chunk servers for all data-bearing (both write and read) requests

Permissions for modifications are handled by a system of time-limited, expiring "leases", where the Master server grants permission to a process for a finite period of time during which no other process will be granted permission by the Master server to modify the chunk. The modifying chunkserver, which is always the primary chunk holder, then propagates the changes to the chunkservers with the backup copies. The changes are not saved until all chunkservers acknowledge, thus guaranteeing the completion and atomicity of the operation.

Programs access the chunks by first querying the Master server for the locations of the desired chunks; if the chunks are not being operated on (i.e. no outstanding leases exist), the Master replies with the locations, and the program then contacts and receives the data from the chunkserver directly.

How does GFS handle a write request and how does the data flow look like?

  1. The client asks the master which chunkserver holds the current lease for the chunk and the locations of the other replicas. If no one has a lease, the master grants one to a replica it chooses (not shown).
  2. The master replies with the identity of the primary and the locations of the other (secondary) replicas. The client caches this data for future mutations. It needs to contact the master again only when the primary becomes unreachable or replies that it no longer holds a lease.
  3. The client pushes the data to all the replicas by specifying an optimal replica order (depending on the network topology, i.e. distances in the network) and sending the data to the first replica of that order. Each chunkserver will store the data in an internal LRU buffer cache until the data is used or aged out. By decoupling the data flow from the control flow, we can improve performance by scheduling the expensive data flow based on the network topology regardless of which chunkserver is the primary. Each replica can use its full network bandwidth by pushing the data linearly along a chain.
  4. Once all the replicas have acknowledged receiving the data, the client sends a write request to the primary. The request identifies the data pushed earlier to all of the replicas. The primary assigns consecutive serial numbers to all the mutations it receives, possibly from multiple clients, which provides the necessary serialization. It applies the mutation to its own local state in serial number order.
  5. The primary forwards the write request to all secondary replicas. Each secondary replica applies mutations in the same serial number order assigned by the primary. This is why concurrent writes (to overlapping regions) by multiple clients may lead to consistent but undefined state. This is not the case for append only requests.
  6. The secondaries all reply to the primary indicating that they have completed the operation.
  7. The primary replies to the client. Any errors encountered at any of the replicas are reported to the client. In case of errors, the write may have succeeded at the primary and an arbitrary subset of the secondary replicas. (If it had failed at the primary, it would not have been assigned a serial number and forwarded.) The client request is considered to have failed, and the modified region is left in an inconsistent state. Our client code handles such errors by retrying the failed mutation. It will make a few attempts at steps (3) through (7) before falling back to a retry from the beginning of the write.

For record appends, the primary chooses the offset and tells all other replicas to do the same operation with the same offset.

How does GFS handle garbage collection?

Manually. It creates a deleted timestamp and puts the file to a hidden state. The garbage collector then periodically checks for deleted files and deletes them together with their chunks after a stale period of 3 days by default.

How does GFS handle data integrity?

  • checksumming to detect corruption stored data, recover using other chunk replicas
  • chunks are broken up into 64 KB blocks. Each has a corresponding 32 bit checksum. Like other metadata, checksums kept in memory and stored persistently with logging, separate from user data
  • divergent replicas be legal: the semantics of GFS mutations, in particular record append as discussed earlier, does not guarantee identical replicas. Therefore, each chunkserver must independently verify the integrity of its own copy by maintaining checksums

Read:

  • chunk server verifies the checksum of data blocks that overlap the read range before returning any data (whether to a client or other chunk server)
  • if a block does not match the checksum, the chunk server will reply with an error, report it to the master, and clone the chunk from another replica

Write (record append):

  • incrementally update the checksum for the last partial checksum block (incrementally meaning with the last checksum as input and not re-calculating it from the actual data; this way GFS does not need to detect the corruption during the write but as normal during a read)
  • compute new checksums for new checksum blocks filled by the append

d0,d1,d2,d3,d4 c0 ,c1 s0 ,s1 ,s2 (partial checksum needed for incremental update of checksum)

record append d5,d6 after d4 got corrupted: d0,d1,d2,d3,dx,d5,d6 c0 ,c1 ,c2 s0 ,s1 ,s2' ,s3 => s2'(s2) not match the data

Why is atomic record append at-least-once, rather than exactly once?

If a write fails at one of the secondaries, the client re-tries the write. That will cause the data to be appended more than once at the non-failed replicas.

How can clients find their data given that atomic record append writes it at an unpredictable offset in the file?

Append (and GFS in general) is mostly intended for applications that sequentially read entire files. Such applications will scan the file looking for valid records (see the previous question), so they don't need to know the record locations in advance. For example, the file might contain the set of link URLs encountered by a set of concurrent web crawlers. The file offset of any given URL doesn't matter much; readers just want to be able to read the entire set of URLs.

The GFS paper mentions reference counts -- what are they?

They are part of the implementation of copy-on-write for snapshots. When GFS creates a snapshot, it doesn't copy the chunks, but instead increases the reference counter of each chunk. This makes creating a snapshot inexpensive. If a client writes a chunk and the master notices the reference count is greater than one, the master first makes a copy so that the client can update the copy (instead of the chunk that is part of the snapshot). You can view this as delaying the copy until it is absolutely necessary. The hope is that not all chunks will be modified and one can avoid making some copies.

How does GFS handle snapshots?

  • On snapshot request, master revokes all chunk leases of file or namespace
  • When leases revoked or expired, log the operation to disk
  • The master then applies this log record to its in-memory state by duplicating the metadata for the source file or namespace
  • On write request, master notices reference count for chunk C is greater than one, so it asks each chunk server to create local copy of chunk C -> C' (Clients now write to C')

What's a lease for GFS?

For GFS, a lease is a period of time in which a particular chunk server is allowed to be the primary for a particular chunk. Leases are a way to avoid having the primary have to repeatedly ask the master if it is still primary -- it knows it can act as primary for the next minute (or whatever the lease interval is) without talking to the master again.

How does the GFS master place the replicas when creating a new chunk?

The chunk replica placement policy serves two purposes:

  • maximize data reliability and availability
  • maximize network bandwidth utilization

Criteria for a chunk placement during creation:

  • on chunkservers with average disk space utilization to equalize disk utilization across chunk servers over time
  • limit the number of “recent” creations on each chunkserver since creations are followed by heavy writes (due to the append-once-read-many workload)
  • spread replicas across racks

When does Zookeeper re-replicate chunks?

When the number of available replicas falls below a user-specified goal.

  • chunk server becomes unavailable
  • chunk server reports that replica is corrupted
  • chunk server reports disk errors etc.
  • or the replication goal got increased

Each chunk that needs to be re-replicated is prioritized based on several factors:

  • prioritize chunks that are far off from its replication goal
  • prefer to first re-replicate chunks for live files as opposed to chunks that belong to recently deleted files
  • boost the priority of any chunk that is blocking client progress.

What are the limitations of GFS/HDFS?

  • It can handle sequential reads well but not random reads
  • High throughput but with relatively high latency
  • Designed for unstructured data

Zookeeper

What are the key properties of monolithic kernels and microkernels

Monolithic kernels like Linux and BSD run all the OS services in kernel space. This leads to good performance. But has the following disadvantages:

  • many dependencies between system components
  • complex and huge code base => hard to maintain

Microkernels, like Minix, on the other hand follow a minimalist approach since they only handle IPC, virtual memory, and thread scheduling in the kernel. The rest, like FS, VFS, device drivers, etc., is handled in user space.

It's more stable and easier to maintain. But it has the drawback of many system calls and thus context switches.

What is Zookeeper?

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

https://www.youtube.com/watch?v=gZj16chk0Ss

What are the guarantees that Zookeeper gives?

  • Linearizable writes
  • Client requests are processed in FIFO order
    • writes => client specified order
    • reads => client remembers ZxID (highest log ID they have seen) and any read must be read from a log that has at least this ZxID (means a client has read-after-write consistency but not across multiple, individual clients)
  • Clients receive notifications of changes before the changed data becomes visible (or can be read; i.e. notifications and following reads are linearizable)

What are the common API operations in Zookeeper?

  • create(path, data, flags)
  • delete(path, version)
  • exists(path, watch)
  • getData(path, watch)
  • setData(path, data, version)
  • getChildren(path, watch)
  • sync() (if the server that the client is connected to has not been synced with the current ZxID that the client knows of; send this command instead of write request)

How does Zookeeper provide the group services like cluster configuration and synchronization?

It organizes a set of data nodes (called Znodes) in a hierachical namespace which can be read, written, watched, and locked by clients.

What are the two types of Znodes?

  • Regular: explicitly created and deleted by clients
  • Ephemeral: can be deleted explicitly or automatically when the session terminates (of the client that created the Znode)

Which flags can be used when working with Znodes?

  • Sequential: Monotonically increase a number suffix on the files in the subdirectory
  • Watch: Receive notifications of changes

How can you implement a lock in Zookeeper while also preventing the herding problem?

Lock:

  1. znode = create("locks/1/lock-", EPHEMERAL|SEQUENTIAL)
  2. childs = getChildren("locks/1", watch=false)
  3. if (znode is the one with lowest sequence number suffix in childs) exit("I have the lock");
  4. callback = exists(znode - 1, watch=true)
  5. if (callback => false) goto 2.

Unlock:

  1. delete(znode)

The removal of a node will only cause one client to wake up since each node is watched by exactly one client. In this way, you avoid the herd effect.

There is no polling or timeouts.

PS: Herding problem occurs when a large number of processes or threads waiting for an event are awoken when that event occurs, but only one process is able to handle the event. When the processes wake up, they will each try to handle the event, but only one will win. All processes will compete for resources, possibly freezing the computer, until the herd is calmed down again.

https://zookeeper.apache.org/doc/r3.1.2/recipes.html

How many servers does Zookeeper need to tolerate failures?

To tolerate f servers to fail the system needs 2f+1 servers because we need a majority of servers agreeing for a quorum.

What are synonyms for primary-backup replication?

  • master-slave
  • leader-follower
  • primary-secondary
  • coordinator-worker/participant
  • log shipping or mirroring

How does Zookeeper ensure that it is not a single-point of failure itself?

It uses primary-backup replication on many servers that all keep a copy of the configuration state (the namespaces) in memory. Updates go through the leader and followers serve all read requests.

It uses ZAB (Zookeeper Atomic Broadcasts) as its replication protocol which replicates a log to all replicas. The properties of ZAB are:

  • The leader node applies incremental, idempotent updates to the log
    • State changes are idempotent and applying the same state change multiple times does not lead to inconsistencies as long as the application order is consistent
  • Clients and the leader (when proposing new transactions to followers) must follow the FIFO order (ensured through TCP) and operations must allow at-least-once semantics (to be idempotent) with the delivery order
  • Like Raft, at most one primary is able to broadcast state changes (In general very similar to Raft)
  • Leaders and followers exchange heartbeats with each other to detect failures (if either leader or follower doesn't receive heartbeat for deadline, trigger leader election and move to discovery phase)
    • In case of the leader, it must receive heartbeats only from Q followers (dependent on the quorum size)

What are the three phases of ZAB?

  1. Discovery
  • Followers send the current epoch to prospective leader
  • Leader proposes a new epoch e' (if epoch received from quorum)
  • Followers acknowledge the new epoch proposal
  1. Synchronization
  • Prospective leader proposes itself as new leader (if ack received from quorum)
  • Followers acknowledge proposal
  • New leader commits the leader proposal to all followers (if ack received from quorum)
  1. Broadcasts
  • Leader proposes new transaction
  • Followers acknowledge transaction
  • Leader commits transaction (if ack received from quorum)

What is a 2PC?

2PC is an atomic commit protocol meaning all participants will eventually commit if all voted “YES” or leave the system unchanged otherwise.

  1. Coordinator requests vote from all participants
  • used to decide if transaction can be safely commited or needs to be aborted
  • some participant sends a "NO" if the transaction led to an error or can't be executed
  1. Coordinator sends a commit or abort message to all participants
  • All voted YES: Commit
  • At least one voted NO: Abort

TODO: check if Raft already commits and sends back the response after receiving all phase 1 acks or if it waits after receiving quorum acks from phase 2. Then I know when the coordinator actually commits the tx. I dont know if it does that before or after receiving all phase 2 acks.

Kleppmann Motivation

What do you need to implement a request-response protocol using TCP?

Some way of message framing, so that the agents know when a message begins and ends.

Either

  • encode the message size in bytes in header
  • delimit messages with specific separator sequence

How is the availability measured?

The availability of a service is typically measured in terms of its ability to respond correctly to requests within a certain time.

How are availability expactations of a service formalized?

A service-level objective typically specifies the percentage of requests that need to return a correct response within a specified timeout, as measured by a certain client over a certain period of time.

A service-level agreement specifies some SLO and the consequences if the SLO is not met.

What is the difference between a fault and a failure?

Failure: System as a whole isn't working Fault: Some part of the system isn't working (Node crash or deadlock, Network dropping or delaying messages, etc.)

How is RPC different from a local function call? Is location transparency achievable?

An RPC only calls a function stub which creates a network request instead of running the processing function locally.

  • RPCs may time out (libraries must have a checked type like a JS promise)
  • In case of a timeout, the client doesn't know if function got executed
  • slow and unpredictable speed due to network latency
  • clients must take measurements to make function calls idempotent
  • pass-by-reference and mutation not possible since server has no access to client's memory

Location transparency can only be achieved by making local function calls behave like RPC but this is undesirable.

How can an RPC framework make sure retried requests are idempotent?

The clients can send a request ID with each request and if it retries a request (without the outcome of the previous request) it can append the same request ID and the server checks if it has processed the ID yet.

Models of distributed systems

Distributed systems are kind of concurrent systems. What are the two models of concurrent systems?

  • Shared-memory concurrency (e.g. several threads running across CPU cores accessing the same memory; locks, semaphores, atomic operations)
  • Message-passing distributed systems (own OS and address space)

What problem shows the two generals problem transferred to distributed systems?

The only way to know about the state of another node is by exchanging messages. But no matter how many messages are exchanged between two nodes, none of them can ever be certain about the state of the other.

They can increase the confidence by repeatedly sending the state back-and-forth but but it can be proved that no node will ever reach certainty by exchanging any finite number of messages.

Why doesn't the two generals problem apply to the online shop example?

              customer
   *dispatch_goods*   *charge_card*
online shop   <- RPC ->     payments service

If, for example, the online shop does not get the acknowledgement message from the payments service, it would lead to the customer being charged without goods being dispatched. But since the payment can be undone, it solves the problem. Whereas the defeat of the army can not be undone.

What is the Byzantine generals problem?

It is similar to the generals problem but introduces a third general and assumes that messages are always correctly delivered.

The challenge in this setting is that some generals might be traitors: they deliberately and maliciously mislead other generals. The honest generals don't know who the malicious generals are, but the malicious generals may collude and secretely coordinate their actions.

Thus, it is impossible for some general to determine whether one general is malicious or honest.

Summary: Two generals problem: a model of networks Byzantine generals problem: a model of node behaviour

How can the Byzantine generals problem be solved?

It can only be solved if fewer than 1/3 of the generals are malicious. That is, in a system with 3f + 1 generals, no more than f generals may be malicious.

One common protocol for solving the Byzantine Agreement Problem is the Byzantine Fault Tolerance (BFT) protocol. In this protocol, each general sends their message (e.g. "attack" or "retreat") to all other generals. Each general then compares the messages they received from the other generals, and a decision is made based on a majority of the messages received.

For example, if 2/3 or more of the generals send the message "attack", then the decision will be to attack. If less than 2/3 of the generals send the message "attack", then the decision will be to retreat.

This protocol ensures that even if some of the generals are Byzantine (i.e., dishonest or faulty), the decision will still be correct as long as at least 2/3 of the generals are honest.

What are the network behaviour assumptions for bidirectional point-to-point communication? And how can a type of link converted into another?

  • Arbitrary links (active adversary): A malicious adversary may interfere with messages (eavesdrop, modify, drop/block, spoof, replay) -> Transport Layer Security (TLS)
  • Fair-los links: Messages may be lost, duplicated, or reordered. If you keep retrying, a message eventually gets through. -> retry + dedup (e.g. TCP for up to a minute of timeout)
  • Reliable links: A message is received if and only if it is sent. Messages may be reordered.

How can nodes behave when executing a specific distributed algorithm?

  • Crash-stop (fail-stop): A node is faulty if it crashes (at any moment). After crashing, it stops executing forever.
  • Crash-recovery (fail-recovery): A node may crash at any moment, losing its in-memory state. It may resume executing sometime later. Data stored on disk survives the crash. It makes no assumption on the length until the recovery or if the node is ever going to be recovered again.
  • Byzantine (fail-arbtirarly): A node is faulty if it deviates from the algorithm. Faulty nodes may do anything, including crashing or malicious behaviour.

A node that is not faulty is called "correct".

The crash-stop assumptions seems unlikely but some algorithms prefer this assumption since it makes the algorithm simpler.

For the network assumptions it was possible to convert one model to another but this is not possible for the node behaviour. The algorithms would look much different.

Which assumptions can we make about the timing and latencies in a distributed system?

  • Synchronous: Message latency no greater than a known upper bound. Nodes execute algorithm at a known speed.
  • Partially synchronous: The system is asynchronous for some finite (but unknown) period of time, synchronous otherwise.
  • Asychronous: Messages can be delayed arbitrarily (e.g. due to partition, congestion). Nodes can pause execution arbitrarily (e.g. due to GC, scheduling preemption). No timing guarantees at all.

Unfortunately, some problems in distributed computing are impossible to solve in an asynchronous model, and therefore we have the partially synchronous model as a compromise. In this model, we assume that our system is synchronous and well-behaved most of the time, but occasionally it may flip into asynchronous mode in which all timing guarantees are off, and this can happen unpredictably.

Summarize the distributed systems model assumptions.

For each distributed system a developer must make assumptions about the following 3 properties:

  • Network: reliable, fair-loss, or arbitrary
  • Nodes: crash-stop, crash-recovery, or Byzantine
  • Timing: synchronous, partially synchronous, or asynchronous

What is the problen when using heartbeats as failure detector?

After the timeout expiration, the client cannot tell the difference between a crashed node, temporarily unresponsive node, lost message, or a delayed message.

How can someone implement a failure detector for the crash-stop and crash-recovery model?

By sending heartbeats. Send a message, await the response, label node as crashed if no response within some timeout.

Which of the distributed systems model assumptions apply to the two generals problem?

  • Fair-loss link
  • Crash-stop (even though this does not apply actually)
  • Partially synchronous

Which model assumption must hold true to be able to implement a perfect failure detector?

A perfect timeout-based failure detector exists only in a synchronous crash-stop system with reliable network links.

In a partially synchronous system, a perfect failure detector does not exist.

Moreover, in an asynchronous system, no timeout-based failure detector exists, since timeouts are meaningless in the asynchronous model.

What is a suitable failure detector for a partially synchronous system? And what are their properties?

Partially synchronous systems can use the eventually perfect failure detector.

It may...

  • temporarily label a node as crashed, even though it is correct
  • temporarily label a node as correct, even though it has crashed
  • eventually label a node as crashed if and only if it has crashed

In summary, it reflects the node state, but not instantaneous, there may be incorrect timeouts.

Time, clocks, and ordering of events

What are examples for when a distributed system relies on time?

  • Schedulers, timeouts, failure detectors, retry timers
  • Performance measurements, statistics, profiling
  • Log files & databases: record when an event occurred
  • Data with time-limited validity (e.g. cache entries)
  • Determining order of events across several nodes

What are the two types of clocks?

  • Phyiscal clocks: count number of seconds elapsed (sometimes called wall clocks even though not attached to a wall)
  • Logical clocks: count events, e.g. messages sent

But note: A clock in digital electronics (oscillator) is not the same as a clock in a distributed system (source of timestamps).

What is the issue with quartz clocks?

The time drifts depending on the temperature and some clocks run faster or slower due to manufacturing imperfections.

What are two alternatives to quartz clocks?

  • Atom clocks (e.g. Caesium-133 which has certain resonant frequency at around 9Ghz)
  • GPS (Satellites broadcast their time with high resolution. Receivers measure the time it took the signal from each satellite to reach them, compute the distance from each satellite and the current time.)

Which time includes astronomic corrections and is based on atomic time?

Coordinated Universal Time (UTC) which includes leap seconds to keep in sync with the earth's rotation. Our local time is specified as an offset to UTC.

Why is it not true that in UTC timescale a day always has 86,400 seconds?

Because of a leap second a day can be 86,399 seconds, 86,400 seconds, or 86,401 seconds. During a positive leap second, for example, the time goes to the invalid state of 23:59:60 to account for the additional second.

How does software usually account for leap seconds?

They "smear" (spread out) it over the course of multiple hours to prevent the invalid time state. They do it by slowing down or speeding up the clock.

Describe some problems that may arise from leap second smearing

  • Measurements of time duration will be slightly wrong during the smearing period.
  • If a timestamp from a smeared clock is compared with a non-smeared timestamp, there will be a spurious difference of up to half a second. For example, this could arise if two nodes with different smearing policies attempt to measure the network latency between them by sending one node’s current timestamp as a message to the other node, and calculating the difference of timestamps when the message is received. The measured latency could even be negative.
  • When comparing two timestamps from clocks that use different approaches to smearing. For example, some implementations of smearing spread the extra second over different periods of time or use different functions like a cosine.

What is the clock skew?

It's the difference between two clocks at a point in time.

What is clock drift?

Pyscial clocks are not precise and the clock error gradually increases.

How can computers with quartz clocks prevent clock skew?

By utilizing the Network Time Protocol (NTP).

Explain how the NTP works and how the client estimates the current time based on the server's response.

  1. The NTP client sends his current time t1 to the NTP server
  2. The server saves its current timestamp t2 when a new request is received
  3. The server then processes the request and after the processing saves a new timestamp t3
  4. The server replies to the client by sending all three timestamps back
  5. The client estimates the current time by calculating the round trip network delay and adding half of it to t3.

The round trip time is calculated by the round trip time minus the processing time of the server: d = (t4 - t1) - (t3 - t2)

Estimated time: t3 + d/2

What is the maximum possible error in the NTP client's estimate of skew assuming both nodes follow the protocol?

The maximum possible error is d/2 in the extreme cases where either the first message takes d time and the second message 0, or vice versa.

Which ways does NTP use to adjust the clock?

  • Slew (gently adjusting the clock speed to gradually reduce the skew)
  • Step (for larger skews, forcibly sets the clock to the estimated correct time)
  • Panic (for very large skews, human operator needs to resolve it)

Why is a time-of-day clock not suited to measure elapsed time in a program?

The clock can be stepped by NTP so the time difference might not be accurate.

What are the advantage and disadvantage of monotonic clocks?

  • Always moves forward at near constant rate so good for measuring elapsed time on a single node
  • Time since arbitrary point so it is not comparable across nodes

What can be a valid condition for a happens-before relation assuming an event a happens before event b?

An event a happens before event b iff:

  • a and b occured at the same node, and a occurred before b in that node's local execution order; or
  • event a is the sending of some message m, and event b is the receipt of that same message m (assuming messages are unique); or
  • there exists an event c such that a -> c and c -> b (transitivity).

What does it mean that the happens-before relation is a partial order relation?

It is possible that neither a -> b nor b -> a. In that case a and b are concurrent (written a || b).

In a partial order not all elements are comparable while in a total order all elements are comparable.

What does causality mean?

The happens-before relation is a way of reasoning about causality in distributed systems. Causality considers whether information could have flowed from one event to another, and thus whether one event may have influenced another.

  • When a -> b, then a might have caused b
  • When a || b, we know that a cannot have caused b

The causal order is written as a ≺ b (a caused b).

How does the Lamport clock algorithm work?

on initialisation do: t := 0 => each node has its own local variable t end on

on any event occurring at the local node do: t := t + 1 end on

on request to send message m do: t := t + 1 send (t, m) via the underlying network link end on

on receiving (t', m) via the underlying network link do: t := max(t, t') + 1 deliver m to the application end on

What properties does the Lamport clock algorithm have?

  • Timestamps are consistent with causality: If a -> b then L(a) < L(b)
  • However, the converse is not true: An event a with a smaller timestamp than b does not imply that a happened before b: L(a) < L(b) does not imply a -> b since it can be that a -> b or a || b
  • Different events can have the same timestamp, i.e. on different nodes (if we need unique timestamp, it can be extended with unique node ID)

Can Lamport timestamps determine a happens-before relationship?

Lamport clocks cannot determine happens-before relation between nodes, but they are actually sufficient to do so within a single node, as a higher local logical time will always reflect a later event.

What does it mean that two events are in causal order?

It means that, if an event a happens before event b, then a causes b.

How can you define a total order over all events happening between nodes using Lamport timestamps?

We use the lexicographic order over (timestamp, node name) pairs: that is, we first compare the timestamps, and if they are the same, we break ties by comparing the node names:

Let N (e) be the node at which event e occurred. Then the pair (L(e), N (e)) uniquely identifies event e.

The total order ≺ using Lamport timestamps: (a ≺ b) ⇐⇒ (L(a) < L(b) ∨ (L(a) = L(b) ∧ N (a) < N (b)))

This relation ≺ puts all events into a linear order: for any two events a != b we have either a ≺ b or b ≺ a. It is a causal order: that is, whenever a → b we have a ≺ b. In other words, ≺ is a linear extension of the partial order →.

Can Lamport timestamps be used to correctly determine a total order?

No. In a total order, all events must be comparable. Though this is not possible with Lamport timestamps since events can happen concurrently. Thus, if a || b we could have either a ≺ b or b ≺ a, so the order of the two events is determined arbitrarily by the algorithm.

What type of logical time can be used to practically capture the happens-before relation used in partial order?

Vector clock

How can you tell wheter two events are concurrent or one happened before the other?

Given the Lamport timestamps of two events, it is in general not possible to tell whether those events are concurrent or whether one happened before the other. If we do want to detect when events are concurrent, we need a different type of logical time: a vector clock.

How does a vector clock work?

  • Each node initializes zero-vector with length of the number of nodes
  • The vector timestamp of event a is V(a) = <t0, t1, .., tn-1>
  • Each node has a current vector timestamp T
    • On event at node Ni, increment vector element at ti
    • Attach current vector timestamp to each message
    • On receive, merge received timestamp vector into local vector with element-wise max function

How can you define a partial order (happens-before relation) over all events happening between nodes using vector clocks?

We say that one vector is less than or equal to another vector if every element of the first vector is less than or equal to the corresponding element of the second vector.

One vector is strictly less than another vector if they are less than or equal, and if they differ in at least one element.

However, two vectors are incomparable if one vector has a greater value in one element, and the other has a greater value in a different element.

The partial order over vector timestamps corresponds exactly to the partial order defined by the happens-before relation. Thus, the vector clock algorithm provides us with a mechanism for computing the happens-before relation in practice.

Broadcast (or multicast)

What is the main difference between broadcast protocols?

The order in which they deliver the messages.

Why do broadcast algorithms differentiate between receiving and delivering?

Because between the network and the application there lies the broadcast algorithm as a middleware. The algorithm decides, whether a message which the node received is forwarded (i.e. delivered) to the application.

The same applies to the send. An application only sees the broadcast call but the middleware then spreads it into multiple point-to-point messages.

Which types of reliable broadcast are there?

  • FIFO broadcast
  • Causal broadcast
  • Total order broadcast
  • FIFO-total order broadcast

Describe the FIFO broadcast conditions:

Messages sent by the same node must be delivered in the order they were sent.

Messages sent by different nodes can be delivered in any order.

Formally: If m1 and m2 are broadcast by the same node, and broadcast(m1) → broadcast(m2), then m1 must be delivered before m2.

Describe the causal broadcast conditions:

Causally related messages must be delivered in causal order. Concurrent messages can be delivered in any order.

Nodes may need to hold back messages, waiting for other messages that happened before (i.e. are causally dependend).

Formally: If broadcast(m1) → broadcast(m2) then m1 must be delivered before m2.

Describe the total order (or atomic) broadcast conditions:

While FIFO and causal broadcast allow different nodes to deliver messages in different orders, total order broadcast enforces consistency across the nodes, ensuring that all nodes deliver messages in the same order. As with causal broadcast, nodes may need to hold back messages, waiting for other messages that need to be delivered first.

The precise delivery order is not defined, as long as it is the same on all nodes.

Another important detail can be seen on these diagrams: in the case of FIFO and causal broadcast, when a node broadcasts a message, it can immediately deliver that message to itself, without having to wait for communication with any other node.

Formally: If m1 is delivered before m2 on one node, then m1 must be delivered before m2 on all nodes.

Describe the FIFO-total order broadcast conditions:

FIFO-total order broadcast is like total order broadcast, but with the additional FIFO requirement that any messages broadcast by the same node are delivered in the order they were sent.

How does the eager reliable broadcast algorithm ensure that all non-faulty nodes receive the broadcast message, even if the node sending the broadcast crashes before sending all messages?

The first time a node receives a particular message, it re-broadcasts to each other node (via reliable links i.e. retry + dedup).

With O(n²) the eager reliable broadcast has very bad efficiency. How can it be improved?

By specifying only a small fixed number of nodes to which a receiving node re-broadcasts the message. The nodes to which a receiving node re-broadcasts the message are chosen randomly.

This yields to an efficiency of O(n) and is called a gossip protocol (or also epidemic protocol).

How does the implementation for the FIFO broadcast algorithm look like?

on initialisation do
  sendSeq := 0; deliveredSeqNr := <0, 0, . . . , 0>; buffer := {}
end on

on request to broadcast m at node Ni do
  send (i, sendSeq, m) via reliable broadcast
  sendSeq := sendSeq + 1
end on

on receiving msg from reliable broadcast at node Ni do
  buffer := buffer ∪ {msg}
  while ∃sender , m.(sender , deliveredSeqNr[sender], m) ∈ buffer do
    deliver m to the application
    deliveredSeqNr[sender] := deliveredSeqNr[sender] + 1
  end while
end on

The algorithm checks for messages from any sender that match the expected next sequence number, and then increments that number, ensuring that messages from each particular sender are delivered in order of increasing sequence number.

How does the implementation for the causal broadcast algorithm look like?

The causal broadcast algorithm is somewhat similar to FIFO broadcast; instead of attaching a sequence number to every message that is broadcast, we attach a vector of integers. This algorithm is sometimes called a vector clock algorithm, even though it is quite different from the algorithm on Slide 73. In the vector clock algorithm from Slide 73 the vector elements count the number of events that have occurred at each node, while in the causal broadcast algorithm, the vector elements count the number of messages from each sender that have been delivered.

on initialisation do
  sendSeq := 0; deliveredCount := <0, 0, . . . , 0>; buffer := {}
end on

on request to broadcast m at node Ni do
  deps := deliveredCount
  deps[i] := sendSeq
  send (i, deps, m) via reliable broadcast
  sendSeq := sendSeq + 1
end on

on receiving msg from reliable broadcast at node Ni do
  buffer := buffer ∪ {msg}
  while ∃(sender, deps, m) ∈ buffer where bufferEntry.deps ≤ deliveredCount do
    deliver m to the application
    buffer := buffer \ {(sender , deps, m)}
    deliveredCount[sender] := deliveredCount[sender] + 1
  end while
end on

The local state at each node consists of sendSeq, deliveredCount, and buffer, which have the same meaning as in the FIFO broadcast algorithm. When a node wants to broadcast a message, we attach the sending node number i and deps, a vector indicating the causal dependencies of that message. We construct deps by taking a copy of deliveredCount, the vector that counts how many messages from each sender have been delivered at this node. This indicates that all messages that have been delivered locally prior to this broadcast must appear before the broadcast message in the causal order. We then update the sending node’s own element of this vector to equal sendSeq, which ensures that each message broadcast by this node has a causal dependency on the previous message broadcast by the same node.

When receiving a message, the algorithm first adds it to the buffer like in FIFO broadcast, and then searches the buffer for any messages that are ready to be delivered. The comparison deps ≤ delivered uses the ≤ operator on vectors defined on Slide 75. This comparison is true if this node has already delivered all of the messages that must precede this message in the causal order. Any messages that are causally ready are then delivered to the application and removed from the buffer, and the appropriate element of the delivered vector is incremented.

What are two possible ways to implement total order broadcast algorithms?

Single leader approach:

  • One node is designated as leader (sequencer)
  • To broadcast message, send it to the leader; leader broadcasts it via FIFO broadcast.
  • Problem: leader crashes => no more messages delivered
  • Changing the leader safely is difficult

Lamport clocks approach:

  • Attach Lamport timestamp to every message
  • Deliver messages in total order of timestamps
  • Problem: how do you know if you have seen all messages with timestamp < T ? Need to use FIFO links and wait for message with timestamp ≥ T from every node

Replication

What does replication mean in the context of distributed systems?

  • Keeping and maintaining a copy of the data on multiple nodes (e.g. for datases, filesystems, caches)
  • A node that has a copy of the data is called replica
  • Advantages:
    • If some replicas are faulty, data still accessible on other replicas
    • Spreads read load onto multiple machines

How is RAID (Redundant Array of Independent Disks) different from DS replication?

RAID has a single controller; in a DS, each node acts independently and replicas can be distributed around the world near users.

One way to prevent updates to be applied multiple times is to deduplicate requests. But in a crash-recovery system model this requires to store the requests in stable storage. What is an alternative?

Requests could be made idempotent. Idempotence allows an update to have exactly-once semantics: that is, the update may actually be applied multiple times, but the effect is the same as if it had been applied exactly once.

What are the three retry behaviors for update requests?

  • At-most-once: send request, don't retry, update may not happen
  • At-least-once: retry request until acknowledged, may repeat update
  • Exactly-once: retry request, idempotence or deduplication (e.g. logical timestamp + client ID)

How can update retries be made safe when there are concurrent read-update requests? E.g. when client 1 adds his like without receiving the ack and client 2 removing the like from the set before client 1 retries the request.

The clients can attach a logical timestamp to their requests and the replication algorithm discards already applied updates.

See: Reconciliation algorithm

How can concurrent writes to replicas be handled by the replication algorithm?

Two common approaches:

  • Last writer wins (LWW) register:

    • Use timestamps with total order (e.g. Lamport clock)
    • Keep v2 and discard v1 if t2 > t1.
    • Can lead to data loss since total order algorithm based on Lamport clock arbitrarily choses what happens with concurrent writes
  • Multi-value register:

    • Use timestamps with partial order (e.g. vector clock)
    • Replace v1 by v2 if t2 > t1
    • Preserve both {v1, v2} in the register if t1 || t2 (called conflicts; must be resolved later by the application)

What is the downside of using multi-value registers paired with vector clocks?

It can become very expensive since every client needs an entry in the vector clock. Thus, the vector clock can take up more space than the data itself.

Let's assume two scenarios. In the first, a client adds x to replica A and B, then removes x from A and B, but the request to B fails and the client crashes before retrying. In the second, a client adds x to A and B but the request to B fails and the client crashes before retrying. Both scenarios lead to the same end state, even though it should be different. How can it be solved?

To solve this problem, we can do two things:

  1. Attach a logical timestamp to every update operation, and store that timestamp in the database as part of the data written by the update.
  2. Second, when asked to remove a record from the database, we don’t actually remove it, but rather write a special type of update (called a tombstone) marking it as deleted. E.g. (t2, false) indicating that the record is invisible.

Further, we run a reconciliation algorithm, called the anti-entropy process, to check for inconsistencies between the replicas.

The replicas periodically communicate with each other, check for inconsistencies, and reconcile their state with the latest record based on the highest timestamp. Since records are not actually deleted, they (i.e. their timestamps and tombstone indicator) can be compared with the ones on other replicas.

What is read-after-write consistency?

It ensure that after a client writes a value, at least the same client will be able to read back the value it has just written.

Strictly speaking, with read-after-write consistency, after writing a client may not read the value it wrote because concurrently another client may have overwritten the value. Therefore we say that read-after-write consistency requires reading either the last value written, or a later value.

How can read-after-write consistency be ensured in a DS?

We can introduce read and write quorums paired with timestamps.

In a system with n replicas:

  • If a write is acknowledged by w replicas (write quorum)
  • If a read is acknowledged by r replicas (read quorum)
  • and r + w > n,
  • then the read will see the latest written value

Typical: r = w = (n+1)/2 for n = 3, 5, 7, ... (majority)

Even after introducing read and write quorums, some nodes may deliver inconsistent values which the client needs to compare and choose the latest value. How can this be solved?

To bring replicas back in sync with each other, one approach is to rely on an anti-entropy process.

Another approch is the "read repair" where clients that read from a majority of nodes but with inconsistent values from some nodes, write the latest value back to all replicas from which the client did not receive the latest value (Also nodes from which it did not receive anything).

A client making read and write requests on replicas with a quorum is essentially a best-effort broadcast since the underlying network is unreliable (requests might be lost and no ordering guarantees). What is an alternative replication method?

Using FIFO-total order broadcast it is easy to build a replicated system: we broadcast every update request to the replicas, which update their state based on each message as it is delivered. This is called state machine replication (SMR), because a replica acts as a state machine whose inputs are message deliveries. We only require that the update logic is deterministic: any two replicas that are in the same state, and are given the same input, must end up in the same next state.

What are the downsides of using FIFO-total order broadcasting as a replication method?

  • The state can not be updated immediately. The replica might has to wait for other deliveries through broadcast.
  • Needs fault-tolerant total order broadcast

What is one way to implement a total order broadcast in a DS which needs to replicate state on multiple nodes?

Designate one node/replica as the leader and route all broadcast messages through it in order to impose a delivery oder. Any state changes must be executed on the leader.

What conditions must state changes fulfill if we don't use the strongest form of broadcast (FIFO-total order broadcast)?

As for FIFO-total order broadcast, state changes must always be deterministic. The same condition applies to all other broadcast forms.

Causal broadcast:

  • Concurrent updates must commute (i.e. transactions can be applied in arbitrary order with same final state)

Reliable broadcast:

  • All updates commute

Best effort broadcast:

  • Commutative
  • Idempotent
  • Tolerate message loss

Replica consistency

For which use case is a two-phase commit useful?

A key property of a transaction is atomicity. When a transaction spans multiple nodes, we still want atomicity for the transaction as a whole (i.e. either all nodes must commit the transaction and make its updates durable, or all nodes must abort the transaction and discard or roll back its updates).

So it is useful when we want to achieve atomicity for a transaction spanning multiple nodes.

How does the two-phase commit (2PC) work on a high level?

  1. A client starts a transaction by sending begin() to all replicas. It can then perform the usual reads and writes within this transaction.
  2. When the client is done, it sends then commit() to the coordinator. The coordinator then starts the 2PC.
  3. Coordinator sends prepare() to all replicas, then waits for their acknowledgement (ok or abort; first phase) 4.1. If any replica replied with abort, then the coordinator sends abort() to all replicas (second phase) 4.2. If all replicas replied with ok, then the coordinator sends commit() to all replicas (second phase)
  4. The replicas do a commit or abort the transaction depending on the second phase of the 2PC

What is the problem or risk with 2PC?

The problem is, that the coordinator is a single point of failure. It can happen that the coordinator crashes after sending the prepare command to replicas and before making a decision. Other nodes are now in a bad state since they don't know how to proceed with the transaction.

The algorithm is blocked until the coordinator recovers.

For that, the coordinator must write decisions to disk and read from it when recovering.

How is it possible to avoid a single-point of failure when doing 2PC?

The failure of the coordinator can be avoided when using a total order broadcast protocol. The client then doesn't send the commit() to the coordinator but to one of the replicas which then total order broadcasts it. Votes are then collected via total order broadcast.

What is the purpose of an atomic commitment protocol (e.g. 2PC) vs linearizability?

2PC:

  • preserves consistency across multiple replicas when there are faults
  • ensures that a transaction is either commited or aborted by all participants

Linearizability:

  • Reasons about the case when mulitple clients/nodes reading and writing shared/replicated data concurrently
  • Also takes timing (real time; start and end of an operation) into account (which is NOT a happens-before relation)
  • Guarantees that nodes observe the system in an “up-to-date” state (i.e. they do not read stale (outdated) values)
  • Compared to read-after-write consistency, it guarantees that ALL clients must read the new state after a write by any client (i.e. linearizability generalizes the idea of read-after write consistency to concurrent operations by different nodes)

What are the features of linearizability?

Every operation..

  • takes effect atomically sometime after it started and before it finished (an operation has a start and end)
  • behaves as if executed on a single copy of the data (even if there are multiple replicas)

The consequence is, that every operation returns an "up-to-date" value (strong consistency).

Even when a system uses read and write quorums, it is non-linearizable. How can we make it linearizable?

By employing the ABD algorithm on top of read and write quorums:

  • write operations remain the same (still use same write quorum)
  • read operations read from the quorum and if there are replicas replying with stale values (according to their timestamp), the client updates the stale replicas with the recent value before finishing the operation (basically doing read repair)

The client finishes its operation only after it made sure that a quorum of replicas have the stored the recent value, and thus every subsequent (by real time) quorum read is guaranteed to observe that value.

What is the problem with the ABD algorithm when having multiple concurrent writers?

Client requests must reflect the real-time ordering of operations.

A client that wants to write must request the latest timestamps from a quorum of replicas. It then sends the write request with the quorums maximum timestamp + 1.

This may lead to the situation that two set operations were sent with the same timestamp by concurrent writers. That is why they must also send their client ID so that read clients can determine a "winner" (similar to the Lamport timestamp total order condition).

What is the conflict resolution policy for the ABD algorithm?

If multiple clients concurrently write to a register, it uses a last-writer-wins policy and simply overwrites the value of the register. This is called a blind write.

What is the signature of a compare-and-swap operation and what does it do?

CAS(x, oldValue, newValue)

Sets variable x to newValue iff current value of x is oldValue.

Can we implement linearizable compare-and-swap in a distributed system?

Yes, with total order broadcast to make sure that the compare and swap operations are sequentially executed on all machines (in the same order).

Consensus and total order broadcast

Total order broadcast is useful for state machine replication. But implementing it involves a single leader, i.e. a single point of failure. What are the ways to transfer the leadership to a new node?

  • Manual failover: Human operator chooses a new leader and reconfigures the nodes; usually planned ahead
  • Automatic consensus: Nodes agree about new leader when leader node becomes unavailable

What is the difference between atomic commit and consensus?

  • Instead of voting commit/abort for a specific value, in consensus one or more nodes propose the value
  • In an atomic commit, all nodes must commit (otherwise TX aborted) whereas in consensus a quorum must agree on the proposed value(s)
  • If only one node crashes, the atomic commit must abort whereas in consensus a quorum of nodes is enough to agree on the value

What system model do Paxos, Multi-Paxos, Raft, etc. assume and why can't it be weakened?

A partially synchronous crash-recovery system model.

We can't weaken this assumption to an asynchronous model since we need a failure detector, which in turn requires a local clock to trigger timeouts.

Describe the two property categories of total order broadcast.

  • Safety ("nothing bad happens")
    1. Let N1 and N2 be two nodes that each deliver two messages m1 and m2, and assume that N1 delivers m1 before m2. Then N2 also delivers m1 before m2.
    2. If some node delivers a message m, then m was previously broadcasted by sme node.
    3. A node does not deliver the same message more than once.
  • Liveness ("something good eventually happens") 4. If a node broadcasts a message m and does not crash, then eventually that node delivers m. 5. If one node delivers a message m, then every other node that does not crash eventually delivers m.

A safety property is an invariant that must never be violated: that is, at any point in the execution we can check this property and it must always be true.

A liveness property is an expectation of a future state: initially it might not be satisfied (for example, a node may have broadcast a message, but it has not yet been delivered), but we can ensure that we wait long enough, it will eventually become true. Liveness properties do not have a time bound on how long it might take until the property is satisfied: we only require them to hold eventually, i.e. after some arbitrary but finite amount of time.

Which of the two property categories of consensus/total order broadcast is allowed to depend on clocks?

Only the liveness properties may depend on clocks and timing. Paxos, Raft, etc. use clocks only for timeouts/failure detection to ensure liveness.

What is a split brain in the context of consensus algorithms and how can it happen?

It describes the case when there are two leaders at the same time.

Raft ensures that there is at maximum only one leader per term. Thus, if there is a split brain, the two leaders are in different terms.

It might happen when there is a network partition and some node proposes itself as the new leader. If it gets a majority quorum, then it becomes the new leader. But there is the other partition with the old leader still running that does not know about the newly elected leader.

How can a leader, which does not know about a new leader due to a network partition, check if it got voted out?

Leaders can make sure that they have not been voted out by requesting a confirmation by a quorum whether it is allowed to deliver a message.

Everytime a leader wants to deliver a message, it first asks all nodes if it can deliver message m. If a quorum replies with yes, then the leader delivers the message m to all nodes.

If the leader does not get a confirmation from a quorum, we know that there must be a higher term with a new leader and thus the leader should step down.

Which states can a node have in Raft?

  • Leader
  • Candidate
  • Follower

What are possible log configurations of followers in Raft?

Followers might

  • be missing some log entries,
  • have extra uncommitted entries (but the term must be lower or equal than the current leader's term up until the log index when the current leader got elected (which got partitioned afterwards)),
  • or both at the same time.

The term in log entries must not decrease.

Eventual consistency

What are the advantages of linearizability?

  • Makes a distributed system behave as if it were non-distributed (e.g. work on a single copy of the data)
  • Simple for applications to use

What are the downsides of linearizability?

  • Performance cost: lots of messages and waiting for a response (ABD algorithm employs write and read quorums and read repair messages)
  • Scalability limits: leader can be a bottleneck since it needs to sequence and process all write requests, and also needs to communicate with all nodes at the same time
  • Availability problems: if you can't contact a quorum of nodes, then you can't process any operations (e.g. during partition)

Describe between which options a distributed system has to make a trade-off when there is a network partition.

  1. Either linearizable consistency (some replicas will not be able to respond to requests because they cannot communicate with a quorum; this makes these nodes effectively being unavailable)
  2. Or allow replicas to respond to requests even if they cannot communicate with other replicas (they stay available but cannot guarantee linearizability)

This theorem is called CAP (consistency, availability, partition): choose between the first two in case of a partition.

What is optimistic replication?

It's the approach of allowing each replica to process reads and writes based only on its local state, and without waiting for communication with other replicas.

The consistency model of that approach is called eventual consistency.

What is the definition of eventual consistency?

Replicas process operations based only on their local state.

If there are no more updates, eventually all replicas will converge to the same state. (No guarantees about how long it will take.)

What is the definition of strong eventual consistency?

  • Eventual delivery: every update made to a non-faulty replica is eventually processed by every non-faulty replica.
  • Convergence: any two replicas that have processed the same set of updates are in the same state (even if updates were processed in a different order)

What is the challenge with eventual consistency?

Since it does not require waiting for network communication, there can be concurrent updates on different nodes which can cause conflicts. These conflicts need to be resolved.

Summarise the key properties (problem, must wait for communication, requires synchrony) of the consistency models in descending order of the minimum strength of assumptions that they must make about the system model.

Problem Must wait for communication Requires synchrony
atomic broadcast all participating nodes partially synchronous
consensus, quorum partially synchronous
total order broadcast,
linearizable CAS
linearizable get/set quorum asynchronous
eventual consistency, local replica only asynchronous
causal broadcast,
FIFO broadcast

Atomic commit makes the strongest assumptions, since it must wait for communication with all nodes participating in a transaction (potentially all of the nodes in the system) in order to complete successfully.

Consensus, total order broadcast, and linearizable algorithms make weaker assumptions since they only require waiting for communication with a quorum, so they can tolerate some unavailable nodes.

It can be shown that a linearizable CAS operation is equivalent to consensus, and thus it requires partial synchrony.

On the other hand, the ABD algorithm for a linearizable register (supporting get/set operations) is asynchronous, since it does not require any clocks or timeouts.

Finally, eventual consistency and strong eventual consistency make the weakest assumptions: operations can be processed without waiting for any communication with other nodes, and without any timing assumptions. Similarly, in causal broadcast and weaker forms of broadcast (FIFO, reliable, etc.), a node broadcasting a message can immediately deliver it to itself without waiting for communication with other nodes

What is local-first software and what are their advantages?

Local-first software treats an end-user device as a full replica and the servers are there just for backups and synchronization.

The advantages compared to cloud services are:

  • App works offline
  • fast (no need to wait for network round trip)
  • sync with other devices when online
  • real-time collaboration with other users
  • longevity (even if the cloud service shuts down, data stays on your device)
  • simpler programming model than RPC
  • supports end-to-end encryption for better privacy
  • user control over their own data

What is the challenge of modern collaboration software and how can it be solved?

The challenge is to reconcile concurrent updates made by different clients. Ways to solve this is by

  • Conflict-free replicated data types (CRDTs)
    • operation-based
    • state-based
  • Operational transform (OT; old <~2006)

How does an operation-based map CRDT work?

On initialization, the map is empty.

On a read request for key k, the CRDT returns the value v of that key if it is present (null if not).

On a write/set request for key k, the CRDT increments the lamport timestamp t for the particular key, and then reliably broadcasts the operation (set, t, k, v) to all replicas including itself.

On delivering an operation (set, t, k, v), the CRDT looks for previous operations on the same key k. If there are no previous operations or all previous operations have a lower timestamp, then it overwrites the key k in the map.

Thus, it resolves concurrent writes to the same key using the last-writer-wins approach.

How does an operation-based CRDT ensure the strong eventual consistency guarantees?

Eventual delivery: Reliable broadcast ensures every operation is eventually delivered to every (non-crahsed) replica

Convergence: Applying an operation is commutative (i.e. order of delivery doesn't matter)

How is a state-based map CRDT different from an operation-based CRDT?

They share their behavior in the initialization and read requests. However, updates are handled differently: instead of broadcasting each operation, it directly updates values and then broadcasts the whole map.

On delivering this message at another replica requires a merge function to merge the two replicas' state. This merge function compares the timestamps of each key and keeps those with the greater timestamp.

What assumptions must a merge operator for a state-based CRDT satisfy?

  • commutative: s1 U s2 = s2 U s1
  • associative: (s1 U s2) U s3 = s1 U (s2 U s3)
  • idempotent: s1 U s1 = s1

What are the tradeoffs between operation-based and state-based CRDTs?

Op-based:

  • requires reliable broadcast so that all changes reach all replicas
  • has smaller messages

State-based:

  • bigger messages
  • can tolerate message loss/duplication

How does operational transformation work for text?

The OT function reads two concurrent operations (local operation + incoming operation; e.g. insert characters at index n) and transforms them so that the incoming concurrent operation can be securely applied to the local state. It makes sure that the text converges to the same state between all replicas.

What is the limitation with operational transformation?

It requires the communication between the replicas and total order broadcast to sequence the updates. So it needs e.g. a leader node to which all replicas send their update and which FIFO broadcasts the updates to all replicas.

How does an operation-based text CRDT work?

The CRDT consists of a chars set which contains (position, nodeId*, character) triples. When a client reads this CRDT, the chars are ordered by their position.

When a client inserts a character v at index i at node n, the replica first gets the position numbers of the immediate predecessor and successor, and then computes the new position number as (p1+p2)/2. This operation is then broadcasted by a causl broadcast. On delivery of that insert, it is simply inserted at the computed position. If there is already a char at the same position, the character of the higher nodeId takes precedence.

When a client wants to delete a character v at index i, the replica causal broadcasts the position of the character (+1 to skip the begin marker) and on delivery it removes it from the chars set.

*Or replicaId

Why does an operation-based text CRDT require causal broadcast and not just reliable broadcast?

We must ensure that if a character is deleted, all replicas process the insertion of the character before processing the deletion. This restriction is necessary because the operations to insert and delete the same character do not commute. However, insertions and deletions of different characters commute, allowing this algorithm to ensure convergence and strong eventual consistency.

KV Stores and consistent hashing

What are key-value stores?

  • Container for key-value pairs (database)
  • Distributed, multi-component systems
  • NoSQL semantics (non-relational)
    • Simple query semantics in exchange for increased scalability, speed, availability, and flexibility
    • No aggregation, no table joins, no transactions

Why are key-value stores needed?

  • Horizontal scalability (user growth, traffic patterns change, data size)
  • Performance (high speed for single record read and write operations)
  • Flexibility (adapt to changing data definitions)
  • Reliability (uses commodity hardware: failure is norm, provides failure recovery)
  • Availability and geo distribution (fast access)

What's the issue with horizontal shards/partitions by their ID space for caching?

  • doesn't work for skewed ID distributions
  • caches should be able to come and go without disrupting the cache

What's the issue with hashing for caching?

  • since it solves the issue with skewed ID distributions (hashes distributes the input uniformly across a range; hash range modulo n_caches) the load is evenly distributed to many caches
  • but if just one cache fails, the whole mapping from IDs to nodes changes and much data has to move

What are the goals of consistent hashing?

  • uniform distribution of objects across nodes
  • easilfy find objects
  • allow nodes to be added/removed without much disruption

How does consistent hashing work?

  • MD5 hash output range mapped to circular space (each distinct value represents a slot for an object or node)
  • Each object and node is mapped to a slot via the hashing function
  • Assigns each object to the closest cache node in clockwise direction on the circle

What's the issue with consistent hashing?

You need a large number of cache nodes to ensure uniform load distribution. Also, removing and adding a node can lead to downtime where data is not available for a moment and it needs to be planned ahead of time to move data. Thus it is recommended to have a replication factor that assigns an object to N nodes in clockwise direction.

What's the advantage of a B+ tree?

The primary value of a B+ tree is in storing data for efficient retrieval in a block-oriented storage context — in particular, filesystems. This is primarily because unlike binary search trees, B+ trees have very high fanout (number of pointers to child nodes in a node,[1] typically on the order of 100 or more), which reduces the number of I/O operations required to find an element in the tree.

How is the cached data stored on each node?

  • B+ tree
  • LSM tree

How does the architecture of Cassandra/Amazon Dynamo look like?

  • LSM tree nodes
  • Exchange meta information and failures by gossiping => no master/decentralized
  • Eventual consistency
  • Consistent hashing for load balancing
  • Data replication via replication factor on consistent hashing ring (chooses replicas based on physical nodes not virtual nodes)
  • Clients send requests to any node

Describe the architecture of an LSM tree.

  • Sorted string tables (SSTables) used to store immutable, ordered key-value pairs on disk
  • Memtable (e.g. balanced binary tree or skip list) to store mutable, ordered key-value pairs in memory until predifined maximum tree size
  • Immutable memtables (created from memtable when threshold reached; flushed to disk (C1) when memory exhausted)
  • Write ahead log (WAL) as an append-only disk-resident structure used for crash and transaction recovery (writes are first appended to the log, then in the memtable)
  • Bloom filter to check if a key might be present
  • Sparse index (SSIndex; sparse in the sense that it does not map to each position of a key on the disk but to the block which in turn contains multiple key-value pairs)

How does an LSM tree handle deletions?

Since the SSTables are immutable, it can only insert new data. Thus it inserts tombstones to mark keys as deleted. In the compaction phase, it then deletes the original keys and the tombstone.

How does the LSM tree compaction work?

C0 is the memtable and immutable memtables in the memory. After the memory is exhausted, the immutable memtables get flushed to C1. If the threshold size of one layer on the disk gets reached, the compaction starts by merging multiple SSTables to larger ones (cf. merge sort). In a multi-layer/component LSM tree, the merged SSTables get moved down one layer during the compaction.

What are the shortcomings of LSM trees?

  • Space amplification (out-of-place updates take up more space than necessary since updated data can only be appended; stale data of deleted or updated keys remain in the system for a period of time)
  • Read amplification (sequentially reads whole SSTable to find a single key-value pair; improved by bloom filters to check if key might exist and sparse indexes to find the block range where the key relies)
  • Write amplification (during compaction, multiple files will be read, merged, sorted, and new files written; each key might be written multiple times in each layer)

Google BigTable

Describe the architecture and data model of BigTable

  • sparse, distributed, persistent, multi-dimensional sorted map
  • indexed by a row key, column key, and a timestamp: row(string), columnFamily:qualifier(string), time(int64) -> byte[]
  • Each cell can contain multiple versions of the same data by having multiple timestamps (most recent version is being read first)
  • lexicographic order by row key
  • rows are dynamically partitioned for load distribution/balancing and each row range is called a tablet (which clients can exploit to get fast access times by retrieving keys that are lexicographically close to each other)
  • ACL (access-control list) is performed at the column family level
  • Uses Chubby (ZooKeeper) for locking and metadata storing (tablet location via three level B+ tree, schema information, ACL)
  • Master: assign/load-balance tablets to tablet servers; balance tablet server load; detect up/down tablet servers; garbage collect deleted tablets; coordinate metadata/schema updates;
  • Tablet server: handle R/W requests; split tablets if grown too large; stateless since SSTables and WAL stored in GFS; handles many tablets; LSMTree memtable in memory

How does the startup/recovery of a tablet server look like and where are files stored?

On the startup/recovery of tablet server, it reads its metdata from the root node in Chubby and then reconstructs the state in its memtable from a set of SSTable files which are stored on GFS. If there are pending updates from the WAL log (also in GFS), apply them.

For what can the timestamp be used to?

  • Versioning: new writes default to current time but clients can set it explicitly
  • Lookup options: return k most recent values or values in time range
  • Garbage collection: only retain most recent k values

Google Spanner

What is Spanner?

A scalable, multi-version, globally-distributed, and synchronously-replicated relational database.

Why isn't BigTable as well suited as Spanner for large-scale data management (OLTP use cases)?

  • BigTable doesn't support multi-row transactions
  • Bigtable is strongly consistent within a row; Spanner is consistent across rows, regions and continents with serializability

Describe the architecture of Spanner.

  • State machine replication (Paxos) within a shard
  • Two-phase locking (2PL) for serialization
  • Two-phase commit (2PC) over Paxos for distributed transactions or cross-shard transactions
  • Multi-version database systems
  • Snapshot isolation

What is external consistency in the context of a relational database?

Strict serializability = Serializability + Linearizability

External consistency = Serializability + Linearizability + Concurrency

Serializability is a transactional and multi-object model: transactions can involve several sub-operations performed in-order on multiple objects in the system. Serializability guarantees that operations take place atomically: a transaction’s sub-operations do not appear to interleave with sub-operations from other transactions. It does not impose any real-time, or even per-process constraints. If process A completes write w, then process B begins a read r, r is not necessarily guaranteed to observe w. Informally, serializability means that transactions appear to have occurred in some total (single) order but the system is permitted to reshuffle transactions as long as it fulfills the constraints.

Linearizability is a single-object consistency model: every operation appears to take place atomically, in some total order, consistent with the real-time ordering of those operations: e.g., if operation A completes before operation B begins, then B should logically take effect after A.

Strict serializability imposes no order on concurrent transactions, whereas external consistency imposes a total order on all transactions. Linearizability can be viewed as a special case of external consistency, where a transaction can only contain a single read or write operation on a single object. Thus, external consistency introduces a real-time observation constraint for concurrent transactions. If a transaction T2 starts to commit after a transaction T1 finishes, then the system assigns a timestamp T2 which is higher than T1.

How can Spanner provide read-only transactions without the need to lock the data?

A read-only transaction observes a consistent snapshot. A snapshort that reflects writes by T2 also reflects writes made by T1 and vice versa. Reads are always causally consistent.

It does that via MVCC (multi-version concurrency control). Each read-write transaction Tw has a commit timestamp tw. Every value is tagged with this timestamp that wrote the data (without overwriting the previous data so that previous read-only transactions can still read from the previous version). And now, a read-only transaction can read from this timestamp tw.

How does Spanner solve external consistency?

It uses the TrueTime API (system of physical clocks) which captures uncertainty in the timestamps. It replies with an interval of earliest and latest possible physical timestamp. When a client wants to finally commit a transaction, the system waits for the uncertainty (latest-earliest) and then commits it.

If there is now a real-time dependency between two transactions, then the uncertainty intervals from two transactions will not overlap.

How does Spanner calculate the TrueTime?

By using Atomic clocks and GPS receivers in each datacenter as a clock source. Commodity servers can now sync every 30 seconds with the TrueTime API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment