Distributed Computing: Part One PDF
Document Details
Uploaded by Deleted User
Sara Caviglia, Prof. Dell’Amico Matteo
Tags
Summary
This document provides an introduction to distributed systems, covering fundamental concepts such as transactions, ACID properties, and the CAP theorem. The document also discusses the challenges and considerations involved in designing distributed systems.
Full Transcript
Distributed Computing: Part one Sara Caviglia, A.Y. 2024/2025 Lecturer: Prof. Dell’Amico Matteo Introduction; Transactions, ACID and CAP What is a distributed system? A distributed system is a collection of autonomous computing elements (...
Distributed Computing: Part one Sara Caviglia, A.Y. 2024/2025 Lecturer: Prof. Dell’Amico Matteo Introduction; Transactions, ACID and CAP What is a distributed system? A distributed system is a collection of autonomous computing elements (nodes) that appear to its users as a single coherent system. Distributed systems are everywhere (ex. anything you see on the Internet is one). Also a single multi-core computer can be seen as a distributed system; distributed system techniques can be and are applied even there. Why do we need coherence? It is the biggest problem of these systems. Distributed computing is hard. We have the “Eight fallacies of distributed computing”: 1. The network is reliable: cables can be cut, there might be problems 2. Latency is zero: when systems start become overloaded, messages get disordered (usually right in the moment when you need your system to function well) 3. Bandwidth is infinite: the amount of data you can send in X time is limited 4. The network is secure 5. Topology doesn’t change: you cannot base your system on places/distances because you can move your data centre; also you can have a two-partition; ex. different site name 6. There is one administrator: you need redundancy in your system, and different administrators they could be doing different things 7. Transport cost is zero 8. The network is homogeneous: we have different kinds of connections (cellular, WiFi, cable, …) Distributed Computing: Part one 1 💡 What is a fallacy? When people design systems they make mistakes assuming something that is not necessarily real. Being a good architect of distributed systems is not easy. Do I need a distributed system? Why? Availability: the system needs to answer and be ready almost always. The SLA is the Service Level Agreement, about how much the server is available (ex. 99% of time): if it does not respect the agreement there might be a fine (ex. hospitals needs their data, the company could lose clients). Performance: one machine might not be enough to respect the SLA, to handle all the requests, to be fast enough; we need to use something that scales. 💡 Scalability refers to a system's ability to handle increased workload or grow in size without compromising performance. In the context of distributed systems, it means the capacity to efficiently manage growing amounts of work by adding resources to the system. Scalability is particularly important for distributed systems to maintain their effectiveness as user numbers or data volume grows, ensuring consistent performance and availability. Decentralization: it is something not necessarily included in all the systems; we don’t want the system to be controlled by a single entity; this has its advantages (one machine shutdown does not shutdown all my the system) and disadvantages (like security and coherence) All these things are related. Distributed systems are about making disparate machines coherent. In order to be coherent, nodes need to collaborate (and communicate). We need, somehow, synchronization. But we don’t have a global clock. Latency in Internet, for example, about tenths of milliseconds: it’s nothing for us, but a lot of time for computers (a lot of operations). We need to manage group memberships and authorization (like privileges). We also need to deal with node failures (ex. If a machine crashes is not convenient to re-start the computation from scratch). Transactions, ACID and CAP Distributed Computing: Part one 2 A transaction is an independent modification in a system that stores data, like a database or a file system. While they may change several parts of the system at once, we think of them as a single modification. Some examples are a money transfer or the removing of a directory. ACID properties ACID properties are Atomicity, Consistency, Isolation and Durability. They are a set of properties from databases and they help us implement transactions. They make it easier to think about how the system behaves. They were implement in 1973 but the term ACID was coined in 1983. Atomicity: It means “something you cannot divide”. Every transaction is a single unit: either it does happen and succeeds or it fails completely. Consistency (Correctness): The system remains in a valid state. Isolation: Every transaction looks like it happens on its own, even when our system works on more transactions concurrently. Transactions are seen as ordered and executed sequentially. Durability: Even in case of a system failure, the result of the transaction is not lost. CAP theorem The CAP theorem was proposed as a conjecture and then proved as a theorem. In a system (that allows transactions) you cannot have all of three properties: Consistency, availability and partition tolerance. Consistency: every read receives the most recent write or error Availability: every request receives a non-error response Partition Tolerance: the system keeps working even if an arbitrary number of messages between the nodes of our distributed system is dropped We have to choose two of these properties for our system, since we cannot have the three of them together. Let’s see the easy proof. Suppose the system is partitioned in two parts, G₁ and G₂: no communication happens between them. A write happens in G₁. A read happens in G₂. The result of the write is not accessible from G₂, so one of these happens: Distributed Computing: Part one 3 The system returns an error (we lose availability); The system returns old data (we lose consistency); The system doesn’t reply (we lose partition tolerance). In any distributed system, you have a trade-off: either (part of) your system will be offline until the network partition is resolved or you will have to live with inconsistent and stale data. Here are some systems that can work with inconsistent functionality (non-ACID systems): GIT (conflicts), DNS, social networks (if you connect to a different computer you might see a different timeline), NoSQL databases. 💡 If you do not design a system with consistency in mind, you don’t get consistency. Consensus Why do we need a system that is consistent? How do we create it? Let’s take an example: your start-up needs a server. You can use just one (or use the cloud) since you don’t have many clients. With more clients, you need a bigger server, which you need to spend more than the double on to run just twice as many operations (it does not scale linearly). If you continue with this idea, the cost will become unsustainable; on the other hand, using a single server but bigger is not affordable and/or ideal. You need also to think about the eventuality of black-out, floods, etc., where you can’t afford to lose money on. This idea is called scaling vertically. Scaling horizontally means using multiple cheaper servers in order to divide the work. The advantages of this solution include the possibility of buy servers with the best performance/price ratio and the eventual geographical distribution of them for better redundancy (for example in case of catastrophes). We have two great problems with this solution: Coordination: our servers need to work together as if they were a single computer; Scalability: the costs of coordination cannot be huge. Our system has to behave as a “single always-on machine”, processing all transactions one after the other (ACID properties): how can we do it? Distributed Computing: Part one 4 Introduction to state machine replication Let’s present deterministic state machines. Every machine has a state that determines its future behavior. Each machine has the same starting state. Receiving an input changes the state (and may produce output) deterministically. So we can say that two machines are in the same state if they have received the same input in the same order. For example a state could be the money in my bank account: the input is “deposit 50” and the state become “50”; if the input is “fetch 50”, the state becomes “0” and the output is “50”. If I try to fetch 50, but my state is 0, my output would be an error message (”NO”). Read-intensive workloads Input is a list (log) of commands. In many architectures, most commands do not change the state: we can run them on a single machine but we don’t have to replicate them because we don’t have to change the state in every machine. How can I implement distributed systems using servers that work like deterministic state machines? Our distributed systems can be based on servers that work like deterministic state machines: “write” commands that change the state are put in a shared log and execute them all on all machines. We have to run “read” commands locally. If most of the workload is made of “read”, our system will scale well. An example is key-value store. A client arrives and communicates with one of the 🐄 server (ex. set(”cow”, )). The command is communicated to the log, then to all the machines and then written on all the machines. How do we implement a shared log? We have a scenario called the “fail-stop scenario”. One-to-one messages only Messages can take arbitrary time (from the point of view of the computer, messages take a lot of time) Messages can be lost Computers can stop for an arbitrary time but they don’t lose data Distributed Computing: Part one 5 This scenario is similar to having to communicate only via SMS when you have bad coverage. The fail-stop scenario is a strong assumption, but it shows us how machines do things according to the protocol or they do nothing at all. We can have other consensus problems: a worse failure model than fail-stop is the byzantine consensus. In this model, broken nodes may send any messages, maybe even acting maliciously (this is the kind of scenario you need in cryptocurrencies). Paxos (or How to confuse people for a quarter of a century) In the ‘80s, Lynch and Liskov built a reliable redundant distributed system, but there was no proof that their system was correct for every single corner case. Lamport (LaTeX’s creator) wanted to show that they made mistakes and their goal was in possible: however he failed because he found an algorithm that could do it (Paxos). It was the first solution for the problem of consensus in our systems. The paper was submitted by Lamport in 1989: since reviewers did not understand it, it was published in 1998, and was still considered difficult. In 2001, Lamport made a “Paxos made simple”, but it was still not clear. In the end, Paxos was understood and Lamport won the Turing Award. Paxos is about consensus: creating a shared log in asynchronous distributed system and making it work. This problem, more specifically, is solved by Multi- Paxos (more than one log). The basic Paxos algorithm finds consensus on one log entry. How does Paxos work? Say you have an old mobile phone and you can communicate only via SMS: you want to decide with your friends whether to go for pizza or sushi (you want to all agree and go to the same place). Let’s see an example. 1. Send to all “Hey, what are we doing tonight?” 2. If most of your friends answered and nobody has plans, propose (for example, “pizza”) and send to all 3. If most of your friends agree, then it’s decided It’s important to note that we need an answer from the majority. It doesn’t sound complicated, but only if everything goes smoothly. Let’s see a fail-stop failure model, meaning failed nodes stop working (e.g., your mobile has no connectivity). They may resume (e.g., you eventually get signal) Any message may get lost or arrive even after days Distributed Computing: Part one 6 But Nobody loses their state Everybody is honest and makes no mistakes (to deal with malicious behavior you’ll need byzantine failure models) The consent is given when majority wins and we use “voting”. 💡 Paxos needs 2n+1 servers to tolerate n failures. The byzantine consensus needs 3n+1 to tolerate n failures. A decision is taken when a majority of n+1 nodes agree on it because two majorities must intersect (at least one participant will see a conflict and avoid it). Voting, in this case, is not based on conflicting interests: the participants just want to agree on something. Example: https://docs.google.com/presentation/d/1ossDCdSERSZmJXKfiCwDNqR- nOHTdIufAhERjQdndtM/edit#slide=id.g1df8353fe5_0_97 Nodes can be acceptors and one is the proposer, which proposes the decision to take. The proposer counts himself as a voter. Paxos: What possible failures can I encounter? What can go wrong? Some may machines stop working (sending and receiving messages). 💡 Maybe you failed because you don’t see messages coming (you are isolated in the network) or some other failed. You know because there is a timeout. The proposers’ failures vary for number of proposers. One proposer One or more acceptors fail → Still works as long as the majority is up A proposer fails in the “prepare” phase → Nothing happens, another proposer should eventually show up and start the algorithm Distributed Computing: Part one 7 💡 An acceptor can become a proposer. A proposer fails in the “accept” phase (usually discovered with a timeout) → Either another proposer overwrites the decision or it gets notified of what’s been done until now, and finishes the job Two or more proposers The algorithm will never result in a wrong output but (in theory) it can result in a livelock, an infinite loop of messages The acceptors’ failures are easier because they have a less critical role. We don’t have a problem as long as the majority doesn’t fail. In the real world is not the same thing. The new leader election can be done with Paxos since we only have a proposer at a time. Multi-Paxos build a full log of decisions and has an additional complexity, but is faster (one round-trip per transaction). Sometimes you have a cluster and you have to manage it. ❓ With ACID, whatever set of questions or message exchange happens, I know that I can rely on them and my messages have arrived, but I don’t care what happened “behind the scene”. Raft Raft is designed for understandability, even if it does the same thing as Paxos. Because it’s simple, it has been implemented in 28 languages. In Raft there are 3 roles: The leader: voted and elected The follower: follows the leader The candidate: is proposing themselves as leader When there is not a leader, a follower becomes a candidate and proposes themselves as leader. Since there can be more than one candidate, the followers can decide who to vote for. When the leader is elected, he sends logs. Heartbeats are messages and are used even when there are no logs to send: the concept is to inform that the leader is still alive and there is no need for a candidate. When there are no more heartbeats (the leader has been disconnected) a follower proposes himself as candidate. Distributed Computing: Part one 8 Let’s see the protocol in detail. 1. Leader election In a given term you can only have one leader (in Italian is called legislatura). One follower becomes a candidate after an election timeout and requests votes. A candidate can receives votes from its peers and one vote from itself. If the votes are a majority, the candidate becomes a leader. We can also have a split vote situation. Say two followers become candidates simultaneously and begin requesting votes. Each candidate receives a vote from themselves and an equal number from the peers. Each candidate requests votes from the peers that have already voted: the vote requests are denied. The candidates try to request votes from each other, but they are denied because they have already voted for themselves. The candidates wait for a randomized election timeout to occur (150ms-300ms). One candidate, then, begins the election for the new term and requests votes. The peers vote for him and he casts a vote for itself, so he becomes the leader for the new term. The other candidate, however, doesn’t know the first candidate won the term (he was on timeout) and begins requesting votes. Since its peers already voted, his requests are denied. The leader notifies the peers of his election and the other candidate steps down. We have to remember that, after the timeout, we had a new set of candidates that was a subset of the old set of candidates (this makes it easier). 2. Log replication Say a new uncommitted log entry is added to the leader. At the next heartbeat, the log entry is replicated to followers. A majority of nodes have written the log entry to disk, so it becomes committed. At the next heartbeat, the leader notifies followers of the updated committed entries. At the next heartbeat, no new log information is sent. Then a new uncommitted log entry is added to the leader, and the process repeats itself. What happens with network partitions? A new uncommitted log entry is added to the leader. On the next heartbeat, the entry is replicated to the followers. The followers acknowledge the entry and the entry is committed. On the next heartbeat, the committed entry is replicated to the followers. Say a network partition makes a majority of nodes inaccessible from the leader. A new log entry is added to the leader. The leader replicates the entry to the accessible followers. The followers acknowledge the entry but there is not a quorum. The separated part will be without leader, so it’s going to time out and there will be a candidate (and then a new leader). A log entry is added to the new leader. The log entry is replicated to the accessible followers. Distributed Computing: Part one 9 A majority of nodes acknowledges the entry so it becomes committed. On the next heartbeat, the followers are notified the entry is committed. Then the network recovers and there is no longer a partition: now we have two leaders and there are conflicts on log entries that need to be addressed. The new leader sends a heartbeat on the next heartbeat timeout. The older leader steps down (becomes a follower) after seeing a leader from a more recent term. Uncommitted entries from disconnected nodes are discarded and new log entries are appended to the previously disconnected nodes. 3. Log compaction Unbounded log can grow until there is no more disk Recovery time increases as log length increases We have three log compaction strategies. 1. Leader-initiated, stored in log 2. Leader-initiated, stored externally 3. Indipendently-initiated, stored externally Distributed Computing: Part one 10 Resources: https://raft.github.io/ Conclusions Consensus comes with a cost. It is very difficult to implement and use correctly, but we have a lot of libraries. All the machines responsible for consensus risk being a bottleneck, meaning they would go at the speed of the slowest. In common cases, a transaction takes a network round-trip to complete: if the machines are all close, it’s quick but what about correlated failures (like air conditioning failures in the machines room)? It’s again a trade-off between reliability and latency. Consensus: Apache Zookeeper What is Zookeeper? Zookepeer is an open source application. It was originally developed at Yahoo! and the paper was presented in 2010 at the USENIX ATC conference. The idea behind it is the need for a coordination service for a very big system. What do you do with a coordination service? Group membership: you want to know which services are serving your system, add new ones, remove others, … Leader election: this is not the same of the consensus models, but this system is built on top of the consensus mechanism; you need to know who is the master and who are the replicas, and eventually elect the master Dynamic configuration: the ability to update or change the configuration of the service at runtime without requiring a complete restart or downtime Status monitoring: know “how the machines are doing” Queuing, barriers, critical sections, locks, …: these concepts from parallel computing can be applied at distributed systems All of these points together make a coordination system. Distributed Computing: Part one 11 💡 We want to use a single system for all the points seen before (the developer has to learn and use just one system). How Zookeper is used The design goals for Zookeper were: Multiple outstanding requests: you want to do different things at once Read-intensive workloads: is easier to use and cheaper; every write that you do you can have hundreds or thousands of reads (you can spend more on a single write and optimize every single read) General Reliable Easy to use Building blocks 1. Builds on top of a (Raft-like) consensus algorithm (not Raft because Zookeeper was built before Raft) 2. Wait-free architecture (synchronized around queues and not locks, which can mean deadlocks) a. No locks or other primitives that stop a server b. No blocking in the implementation c. Simplifies the implementation d. No deadlocks e. Needs an alterative solution to wait for conditions 3. Ordering, meaning ordering of transactions a. Writes are linearizable (property: not only things are put in an order, but they are put in the same order they are in the outside world; they are executed in the order they are performed) b. Reads are serializable, so you might read old data (weaker than linearizable because when it’s serializable is already linearizable; it’s ACID isolation) c. If you only deal with a single machine, all operations on the same client will be serialized in FIFO order 4. Change events: this idea is essentially notification Distributed Computing: Part one 12 a. Clients can request to be notified of changes b. When a change happens, the client gets notified c. They get notification of a change before seeing the result d. You receive a notification on a channel, you write a callback and register the callback on the channel The inspiration is a distributed file system: you deal with files that are not on your drive but on someone elses’. Seems like a good solution because everything is together somewhere, but the file system does not guarantee everything we want (we can say it “almost works”, but in case of a failure it’s difficult to manage). So we can make an assumption: it works like a file system but only for small pieces of data plus notification of changes and minus partial read/write. APIs In Zookeeper you don’t have a distinction between file and directory, but there are ZNodes: they have content (file) and they have children (directory). create(path, data, acl, flags) delete(path, expectedVersion) setData(path, data, expectedVersion) getData(path, watch) exists(path, watch) getChildren(path, watch) void sync() setACL(path, acl, expectedVersion) getACL(path) create : we give its path, its small data, the access control list (ACL, present in many Windows file systems), flags (set of boolean variables that set some properties) and it creates a new ZNode delete : its path (with a delete the children of the node considered die) and the version of something you want to delete, because there may be a new version that you don’t want to delete setData : updates the data stored at the path (the version is used for concurrency control) getData : reads the data (watch is about getting notified if the content changes) exists : checks if a node exists at a given path getChildren : lists the children of a specific ZNode Distributed Computing: Part one 13 sync : makes sure that everything I have written is committed setACL : changes the control list getACL : visualizes the control lost The structure of Zookeeper Data model We have an hierarchical namespace. As seen before, ZNodes are like both files and directories. It’s important to note that data is read and written in its entirety. Flags It’s possible to create flags, that can later be used in some APIs. Ephemeral: the node is deleted when its creator fail (also important to release locks when the node disconnects) Sequence: used to append a monotonically-increasing counter Distributed Computing: Part one 14 Configuration A worker starts and gets the configuration with getData(”/myApp/config”, watch=True). Admins can change the configuration with setData(”/myApp/config”, newConf, expectedVersion=-1) and the workers get notified of the change; then they re-run the getData. Group membership A worker starts and registers itself in the group (if it uses ephemeral, when the creator dies, it dies too) with create(”/myApp/workers/” + my_name, my_info, ephemeral=True). Then it can list members with getChildren(”/myApp/workers”, watch=True). Leader election You can check who’s the current leader (again, not the same of the consensus algorithm) and, if successful, follow the leader. The API call, in this case, is getData(”/myApp/workers/leader”, watch=True). Otherwise you can candidate yourself as the leader and, if it’s successful, you become the leader (else restart). The API call is create(”/myApp/workers/leader”, my_name, ephemeral=True). This call is not always successful because two or more workers can candidate in the same moment. Workers are watching the changes for the leader, so they’ll be notified if the leader fails or changes. (Exclusive) locks In order to find out whether it’s your turn, you list who’s there and if you’re first, than it’s your turn and the lock is yours. The more efficient way is to wait for the node in front of you to release the lock (wait for the event): every node only watches the one in their front in the queue. Let’s see and apply this mechanism to the code. Distributed Computing: Part one 15 id = create("/myApp/locks/x-", sequence=True, ephemeral=True) getChildren("/myApp/locks/") If id is the first child, the lock is mine. Otherwise, if exists(lastChildBeforeID, watch=True) , wait for the event (when the previous owner releases the lock), else return to getChildren. Shared locks While exclusive locks are for writers, shared locks are for readers because they can read together. The only limit is that you have to check if you have an exclusive lock before of you in the line; otherwise you have to wait for the lock to get released. What’s the point of view of the code? id = create("/myApp/locks/s-", sequence=True, ephemeral=True) getChildren("/myApp/locks/") If there is no exclusive ( ”x-” ) lock before id , go ahead. Otherwise, if exists(last"x- "BeforeID, watch=True) , wait for the event (when the previous owner releases the lock), else return to getChildren. How Zookeeper is implemented We want systems like this to be fast: the fastest system we can base it on is the memory, because all servers have a full copy of the state in their memory (unlike network). Zookeeper uses a consensus protocol similar to Raft: there is a leader and the update is committed when a majority of servers saved the change (we need 2m+1 servers to tolerate m failures). We might read stale data since we are not necessarily talking to the leader. If there is a write, the server needs to notify the leader, which needs to broadcast the write; the others need to acknowledge and the server, which received the write in the first place, can send an ok to the client. 💡 A write is more expensive than a read! If we add new nodes to the cluster, our system will become faster for reads (every machine has a lower load) and slower for writes. Distributed Computing: Part one 16 Consensus: Google Cloud Spanner Google Cloud Spanner (GCS) is the system used by Google for the database dedicated to advertising. They have a database with several datacenters across the world, and they sure don’t want them to fail. It’s a sharded (divided responsibility) and replicated database. One key property is external consistency or linearization (like in Zookeeper). An example is removing someone from my friends list and then posting something that my ex-friend should not see: I care that the transactions are processed in the same order they are seen in the real world. We need something like Paxos and Raft, but I can’t all go through that. The best idea is using Paxos locally and then resort to clocks (clocks are sync pretty well). Philosophy You might have a whole global lock, but with a system this big is not possible: for example, we have just one write at a time and when someone is writing, you can’t read. Pieces of information are annotated with time. When we read, we request to read a piece of data at a precise timestamp. When it returns it probably is old. We know when a piece of data has got in the system and, eventually, even when it was Distributed Computing: Part one 17 canceled. The clocks need to be as precise as they can, but typical Internet latencies can be up to hundreds of milliseconds. How about uncertainties? Spanner will wait to handle uncertainties in the clock: it’s the key to drive uncertainties down. What’s the Time? How do we know what is the time with a very good resolution? At Google they worked a lot on this. A system that can be used is TrueTime , which is highly reliable. The idea is that we get an interval the system is really convinced we are in when I ask for the time (bounded uncertainty): TrueTime.now() returns an interval [earliest, latest] , meaning that time is between earliest and latest. Every datacenter has a set of machines called time master with GPS (regular ones): some even have atomic clocks. Of course we talk to the time masters using the network, so we have to consider Internet latency. We may assume the use of the Network Time Protocol (NTP). Time offset (t1 − t0 ) + (t2 − t3 ) θ= 2 Round-trip delay δ = (t3 − t0 ) − (t2 − t1 ) Measurements as described before are repeated, and statistics are extracted. The output is a confidence interval t ± ε = [earliest, latest]. However, one of the time masters could be wrong and tell us the wrong data: we need an algorithm to weed out “liars”. We can formalise our problem: given n [ai , bi ]intervals, find the sub- interval that is consistent with most cases; in case of a tie, return any of them. This is called the intersection algorithm. The idea is that we add 1 whenever we open an interval and subtract 1 whenever an interval gets closed: let’s see the formal implementation. Distributed Computing: Part one 18 Create a list containing (a, +1)and (b, −1)pairs for each [a, b]interval and sort them by the first element. Let’s call these values (vi , di ); di ∈ {−1, 1}. Compute the cumulative sums si of all dj values where j ≤ i: this is the number of intervals overlapping in [vi , vi+1 ]. Find the maximum value sM ; the result will be [vM , vM +1 ]. Example: image from Wikipedia After the clock is synchronized, the uncertainty grows according to worst-case assumptions on computers’ clock drift. We can say that a clock is imperfect but doesn’t “go crazy” (you test how often this happens). Clocks fail six times less than CPUs do. What is the percentage of times the clock behaves according to the local clock error? What is the precision of a clock? These are metrics that we can use to evaluate the behaviour of our distributed system in terms of time. Distributed Computing: Part one 19 Making Transactions Linearizable 💡 We want linearized transactions, so we want operations to be executed in our system in the same order that they are executed in the real world. Let’s see an example of version management. Time My friends X’s friends My posts 4 [X] [me] 8 [] [] 15 [”The government is bad”] Since I removed X from my friends, he will never read my post. In fact, the transaction that removes my friend happens before my complaint. A read timestamped before 8 won’t see the post. With a read timestamped after 15, X won’t see the post. This is ensured even if the write transactions happen in completely different clusters. What characteristics does a data model need to allow me to have linearized transactions? A key-value store We can lookup the value for a given string, as if it was a huge hashtable. Holds data like (key: string, timestamp: int64) → string. Distributed Computing: Part one 20 Nodes responsible for a key in multiple continents Use Paxos to get consensus. Allows asking the value at a given moment in time SQL-like semantics added afterwards We can assign timestamps to writes. What is the right time to assign a timestamp to a write? Any moment when the lock is acquired since no one else can be writing. 💡 You take the time two different times: when you acquire the lock and when you are ready to release it. Some transactions could be conflicting: we just care about the timestamp of the lock, even if the transaction began before/after the other. If a transaction is executed before another, it will be timestamped before it. Distributed Computing: Part one 21 We use TrueTime to assign timestamps. If the system is afraid of finishing before ts , it just waits until there is no doubt. If the uncertainty on time is too large, the system gets slowed down. The clock uncertainty should be smaller than the transaction length. Paxos intercontinental consensus latency: 100s of ms TrueTime latency: generally N, we have a sort-of consensus algorithm, i.e. high consistency (except failures). We can see some examples of configurations. N = 3, R = 2, W = 2 (default) → consistent and durable Distributed Computing: Part one 56 N = x, R = 1, W = x → slow writes and fast reads (great for read-intensive workloads) N = R = W = 1 → cache (like web cache) Some inconsistencies can be resolved with vector clocks. Based on the idea from 1986 (Ladin and Liskov), version_info gets a counter value for each machine they have passed through. One copy supersedes another if counters are not smaller for each machine. Otherwise, they’re independent and we ask the client what to do. What happens with failures? When machines go offline, it’s consider transient: permanent addition or removal is and administrator action. Reads and writes spill over to the first machine in the ring after the N that should handle them by default. When the machine comes back online, updates are reported to it. It can create some rare inconsistencies even when R + W > N. Merkle trees are a data structure used to compare data between nodes that store replicas of a partition. If the root is different, compare the children to find out which half is different, and so on recursively (”recursively hashing”). It is a fast way to spot differences and reconcile them. Distributed Computing: Part one 57 Merkle tree If everything else fails, the client is presented with more than one return value. What to do depends on the application (in the Amazon cart, if in doubt, stuff can be left in the cart) and can be reminiscent of exception handling. This is a pretty rare case: Amazon reports only 0.06% cases of more than one value returned. What are other optimizations? buffered writes: wait for a few writes to be committed before writing to the disk (performance/consistency tradeoff) throttling background operations (slow down gossip/maintenance when many requests are around) let coordinate read/writes to nodes who are responding fastest (additional load balancing) Now we can see some numbers coming from the paper. tens of thousands of machines tens of millions of requests, more than 3 millions checkouts in a day response time below 400ms at 99.9 percentile (average below 40) In 99.94% cases, requests return exactly one version. 99.9995% successful responses without timeouts, equivalent to 2.5 minutes of unavailability in a year). What about Cassandra? Distributed Computing: Part one 58 Cassandra is a project by Facebook, now handled by the Apache foundation. It has a very similar architecture and can be seen as Zookeeper for routing tables and seeds. We have rack-aware and datacenter-aware data placement: it uses Zookeeper to elect a leader and coordinate it. There are no vector clocks, we just get a timestamp, and the latest wins. Lightly-loaded nodes get “migrated” on the ring. Introduction to Large-Scale Data Processing: MapReduce Big Data What is Big Data? What is Big? The WWW is, for sure, one of the biggest collection of data. Big Data is defined by the 3 Vs: Volume, Velocity (you keep receiving data) and Variety. We can say that data often counts more than the algorithms used. The MapReduce Programming Model is a programming model inspired by functional programming and Bulk Synchronous Parallelism (BSP). It has execution frameworks for large-scale data processing and is designed to run on “commodity hardware” (i.e., medium-range servers). 💡 Functional programming is a programming paradigm that treats computation as the evaluation of mathematical functions and avoids changing state or mutable data. It emphasizes immutable data, pure functions (same output for same input), and handling data transformation through techniques like map, reduce, and filter rather than loops and state changes. Principles Our objective is to scale out, not up: many “commodity servers” are preferable to few high-end ones (cost grows more than linearly with performance). This means that our first principle is about buying more machines but smaller (as opposed to big machines). In some cases, a big enough server just doesn’t exist: in 2013 Google was estimated at 15 Exabytes (15,000,000,000,000,000,000 bytes), while the Internet Archive was estimated at 200 PetaBytes (2021). In many workloads, the disk I/O is the bottleneck. reading from a HDD → 200-300 MB/s SSDs → 2-13 GB/s Distributed Computing: Part one 59 Ethernet → up to 40 Gbps (i.e. 5 GB/s) RAM → about 20 GB/s (DDR4), 32-64 GB/s (DDR5) We want to read from several disks at the same time. Avoiding synchronization we have a Shared Nothing architecture. We have independent entities, with no common state. Synchronization introduces latencies and it is difficult to implement without bugs. A goal is to minimize sharing, so that we synchronize only when we need to. When you have a cluster that’s big enough, failures are the norm, not the exception. Hardware, software, network, electricity, cooling, natural disasters, attacks, … Cascading failures when the failures when the failure of a service makes another unavailable. Most failures are transient (data can be eventually recovered). Our objective is to move the processing to the data. High-Performance Computing (HPC) distinction between performance and storage nodes CPU-intensive tasks → computation is the bottleneck Data-Intensive workloads Network (if not the disks) is generally the bottleneck. We want to process the data close to the disks where it resides. Distributed filesystems are necessary and they need to enable local processing. We want to process data sequentially. Our data is too large to fit in memory, so it’s on disks. For a HDD, we’ve seen 200 MB/s (for sequential reads): disk seeks for random disk access make everything much slower. If we consider a 1TB database with 1010 100-byte records, updating 1% of the records with random access will require around a month (seek latency around 30ms), while rewriting all records will require around 3 hours (at 200 MB/s). We can see that there’s a big advantage in organizing computation for sequential reads. MapReduce is for batch processing involving (mostly) full scans of a dataset, and data collected elsewhere and copied to a distributed filesystem. We can see some examples. Distributed Computing: Part one 60 Compute PageRank, a score for the “reputation” of each page on the Web. Process a very large social graph. Train a large machine learning system. Log analysis What are our scalability goals? In two dimensions: data → if we double the data size, the same algorithm should ideally take around twice as much the time resources → if we double the cluster size, the same algorithm should ideally run in around half the time We have embarassingly parallel problems: shared-nothing computations that can be done separately on fragments of the dataset (e.g. convert data items between formats, filters, etc.). We have to exploit having embarassingly parallel sub- problems. 💡 Embarrassingly parallel problems (also known as perfectly parallel problems) are computational tasks that can be easily divided into independent, non-overlapping sub-tasks. These sub-tasks require little to no communication or dependency between them and can be executed simultaneously, often leading to highly efficient parallelization. Programming Model Map and reduce (or fold) are higher-order functions, meaning they accept functions as arguments. Distributed Computing: Part one 61 Functional map: takes a sequence as input; apply a single function m to each element of your dataset; produce a new sequence as output. Example: map(neg, [4, -1, 3]) = [-4, 1. -3] Functional reduce Given a list lwith ne elements, an initial value vo and a function r, the output we can compute: v1 = r(v0 , l0 ) v2 = r(v1 , l1 ) … vn = reduce(r, v0 , l) = r(vn−1 , ln−1 ) For example, sum(l) = reduce(add, 0, l). It can be seen as an aggregation operation. Dean and Ghemawat, engineers at Google, discovered that their scalable algorithms followed this pattern: A “map” part where original data is transformed, on the machines that were originally holding the data; A “reduce” part where the first results are aggregated. The MapReduce framework facilitated writing programs with this style. The MapReduce model was implemented as free/open source in Apache Hadoop Distributed Computing: Part one 62 MapReduce (originally developed at Yahoo!). We can divide the MapReduce model in phases. Map Phase It processes data where it’s read. It filters what’s not needed so you don’t waste network bandwidth sending it. It also can transform data (e.g. convert to the format that’s best for your computation). Unlike the functional map, this always creates key/value pairs. Each fragment of input determines its own output, alone (embarassingly parallel). Shuffle Phase Data gets grouped by key, so that we get a sequences of all values mapped to the same key. It is handled by the execution framework (Hadoop, Google, MapReduce), so programmers don’t have to do anything; yet, there are optimizations possible visible to the users. Data gets moved on the network: if data is well distributed along keys, work is well distributed between machines. Reduce Phase We have an aggregation operation, defined by the user, which is performed on all elements having the same key. The output is then written on the distributed file system. This output can be an input to a further map-reduce step. We can see an example: we want to count how many times each word is in a text. def map(text): for word in text emit word, 1 def reduce(word, counts): emit word, sum(counts) Combiners are a way to reduce the amount of data before sending it over the network. They are “mini-reducers”, run on mapper machines to pre-aggregate data. In Hadoop they’re not guaranteed to be run, so the algorithm must be correct without them. The combiner is run before the shuffle, while the reduce is run afterwards. What can we do with MapReduce? Everything. Trivially, we could send everything to a single reduce function and compute anything there, but it would scale terribly. Our question becomes “What can we do efficiently?”. This is about finding scalable solutions. This is non-trivial, because it’s about optimizing computation, Distributed Computing: Part one 63 communication, and sharing costs well. Many algorithms require multiple rounds of MapReduce. Patterns Our problem is building a co-occurrence matrix. M , a square n × nmatrix, where nis the number of words A cell mij contains the number of times the word wi occurs in the same context of wj (e.g. appear in the same sliding window of k words). This is a building block for more complex manipulations (e.g. natural language processing, NLP). A similar problem is a recommender system (Customers who buy X often also buy Y). M has size n2 , and can become very big quickly. In the English language we have hundreds of thousands of words, meaning we have tens of billions of cells. With other use cases we have billions of elements. Most of those cells will anyway have a value of 0, so we can just compute the nonzero values. Pairs approach This approach use complex keys: when the mapper encounters w1 close to w2 it will emit the ((w1 , w2 ), 1)pair, meaning “I’ve found the (w1 , w2 )pair once”. Stripes approach Say we have words [b, c, b, d]in the context around the word a. The mapper will return (a, {b : 2, c : 1, d : 1}): we associate to the key aa mapping to all the words corresponding non-empty columns in a matrix row. The combiner and the reducer will aggregate each of the stripes. Introduction to Large-Scale Data Processing: Hadoop Design Apache Hadoop is the software used for large datasets in most of the companies (apart from Google). It’s free, open source, under the Apache license and handled by the Apache foundation. It’s based on Java and is part of a large ecosystem. Since Java has some scalability problems, we might encounter some obstacles. HDFS: Hadoop Distributed File System MapReduce is based on: performing a map phase wherever data is read; Distributed Computing: Part one 64 a shuffle phase to move around processed data; a reduce phase to aggregate. HDFS is designed to enable this kind of computation: the idea is to move the computation near the data (the same machine does storage and computations). It’s inspired by GFS, Google File System. The HDFS is based on two principles. 1. Large datasets, that can’t be stored on a single machine a. Each “file” is partitioned in several machines. b. Network-based, with all the complications. c. Failure-tolerant 2. One distributed filesystem design, tailored to a. read-intensive workloads (many reads for a write); b. throughput, not latency (sequential reads); c. commodity hardware (cheap). HDFS Blocks Big files are broken in chunks (the default dimension is 128 MB). It is unrelated to space used on disks (a 1 MB file doesn’t use 128 MB): blocks are stored as files on the native filesystem. Blocks are replicated in different machines. We don’t use erasure coding because we can’t run processing right away (map). But why are blocks so large? To make seek times small compared to read. Considered 10ms seek and 100 MB/s read, for a 100 MB block seek time is 1%. Sequential reading is easier and faster! To handling metadata easily. HDFS Nodes NameNode: keeps metadata in RAM directory tree, and index of blocks per file (around 150 B/block) Metadata is around one million times smaller than the dataset (1 GB of RAM can index 1 PB of data). The load of NameNode is kept manageable exactly because there aren’t that many blocks. Writes are written in an atomic and synchronous way on a journal. Distributed Computing: Part one 65 It’s a good idea to put this journal somewhere on the net. Secondary NameNode It receives copies of the edit log from the NameNode. When the primary is down, the system uses it and stays read-only. If the journal is on the network, we can switch the secondary to primary. DataNode: store data, heartbeat to the NameNode with the list of their blocks HDFS Architecture History of a file read The client is often a machine in the same cluster. Get the block locations from the NameNode. Obtain a set of DataNodes, sorted by proximity to the client (i.e. first the same node, then the same rack). If MapReduce is reading, the data will be in the very same machine. Data is read in Map tasks, but there are some corner-case exceptions. History of a file write Distributed Computing: Part one 66 Client asks the NameNode for k DataNodes (default k = 3). k is the number of redundant copies. We have pipeline replication: the first DataNode will make a copy to the second, and that one to the third. Default: first replica off-rack, second replica in the same rack of the first (we have a trade-off between reliability and cluster bandwidth). Scheduling A job is made of tasks. A MapReduce job runs on a set of blocks specified by the programmer and has one phase each of map, shuffle and reduce. Map and reduce phases are divided in tasks: tasks are single-machine, independent jobs. The only “holistic” part is in the shuffle phase (where we have communications). Each machine runs a configurable number of tasks; often it’s one task per CPU so they don’t slow each other down. Map: by default, 1 HDFS block → 1 input split → 1 task The scheduler will do whatever is possible to run the tasks on a machine having that block. Distributed Computing: Part one 67 Map tasks are usually quick (a few seconds), unless they perform unusually large computations. Reduce: number of reduce tasks is user-specified Keys are partitioned randomly based on hash values: one task will handle several keys. Users can override this and write a custom partitioner (useful to handle skewed data). Reduce tasks have very variable runtime, depending on what they do. 💡 Sometimes time is proportional to the dimension of the data, if we don’t take in account the use of parallelism. Schedulers are usually FIFO, because it is intuitive. The priority of jobs is their arrival time. As soon as a machine is free, it’s given the first pending task by the first job. It may not be very efficient, since we could have very large jobs, and this penalizes small jobs which can wait forever. We might use fair schedulers, which take in account the dimensions of jobs and gives precedence to active jobs with least running tasks. It results in each job having roughly the same amount of work done in a given moment. Conceptually is very similar to processor-sharing and/or round-robin schedulers. It can also be configured to kill running tasks to free up space for jobs. Of course it has its deficits, since you can’t prioritize jobs in a queue. Other scheduling algorithm include capacity schedulers. The scheduler creates “virtual clusters” with a queue each and a dedicated amount of resources. It can be used to make sure that organizations/applications have access to a reasonable amount of computing power. Elasticity is possible: if a queue leaves unused resources, they can be used by another queue. What about shortest job first? Letting shortest jobs first ahead can be great for a loaded system, in particular for real-world jobs. The problem is that generally don’t know how long a job will need to run. There’s a big potential here to improve performance, but system designers are conservative people and you don’t know when the system will be problematic. Failures are common and can be related to both software and hardware problems. If a task fails, it’s retired a few times: after that, by default the job is marked as failed. Distributed Computing: Part one 68 If a task hangs (no progress), it’s killed and retried. If a worker machine fails, the scheduler notices the lack of heartbeats and removes it from the worker pool. If the scheduler fails, Zookeeper can be used to set up a backup and keep it updated. Shuffle (& Sort) Phase The output of a map stays in a buffer in memory, with a default size of 100 MB. A circular buffer is useful in cases where you have to continuously read and write (even simultaneously). When the buffer is filled, it’s partitioned (by destination reducer), sorted and saved to disk. There is an additional guarantee in Hadoop: reduce keys are always sorted. At the end of a map phase, spills are merged and sento to reducers. On the other hand, combiners are run right before spilling to disk, so they may not be run. 💡 A spill file is a temporary file created by a system or application when it runs out of memory (RAM) and needs to offload excess data to disk. This process is known as spilling to disk and is a common technique used in data processing frameworks, databases, and other applications to handle workloads that exceed available memory. Let’s have a look at a spill file. Distributed Computing: Part one 69 Reducers fetch data from mappers and run a merge, but mappers don’t delete data right after it’s sent to reducers. We’re essentially running a distributed mergesort. Output is saved (and replicated) on HDFS. Distributed Computing: Part one 70 Introduction to Large-Scale Data Processing: Apache Spark MapReduce has been a big improvement for “big data” on large clusters of unreliable machines. However, it’s less than perfect for important use cases, like multi-stage applications (e.g. iterative machine learning, graph processing) and interactive ad-hoc queries. Several specialized frameworks were designed to handle these cases. Each MapReduce job (i.e. something that has “one shuffle phase”) has to read and write from disk. This is the solution to deal with unreliability: write everything on disk and replicate it. However, this is slow. Everything on disk What if things could be kept in RAM without the disk every time? We could have a huge speedup, from 10 to 100 times. The goal of spark is to get in-memory processing and fault tolerance. Distributed Computing: Part one 71 In-memory processing Resilient Distributed Datasets (RDDs) Resilient Distributed Datasets (RDDs) are immutable, distributed in-memory data structures, partitioned among the machines in the cluster. They are built through coarse-grained operations that process the whole RDD, which are based on functional programming (map, filter, join, …). Fault recovery is performed using lineage: we have a log of all the operations done to get there; we can recover lost partitions by recomputing what’s missing; we don’t have a cost if nothing fails. Databases and key-value stores handle small updates and store everything on disk: they’re good for small modifications (transactions) that don’t modify most of the state. On the other hand, RDDs are efficient for large operations. How can I read logs? Let’s see and example of log mining. lines = spark.textFile("hdfs://...") def is_error(line): return line.startswith('ERROR') errors = lines.filter(is_error) def get_message(line): return line.strip().split() messages = errors.map(get_message) Distributed Computing: Part one 72 messages.persist() messages.filter(lambda m: 'foo' in m).count() messages.filter(lambda m: 'bar' in m).count() For fault recovery, RDDs track lineage (i.e. dependencies) for each block. messages = textFile(...) \.filter(lambda x: 'error' in x) \.map(lambda x: x.split()) It follows the lazy computation paradigm: you just compute when you need and output, otherwise you don’t do anything. 💡 Spark does the combining on its own, without a combiner. Introduction to Large-Scale Data Processing: GraphX When MapReduce showed up, several engines for efficient graph-based analytics were introduced: we had many machine learning and network analysis applications and we wanted to exploit the same cluster, but have ad-hoc optimizations. GraphX Distributed Computing: Part one 73 brings the same concepts to Spark, in a quite small package (the first version was only 2500 lines of code). PageRank An example is PageRank, the idea that made Google successful. It’s used to evaluate how important a node is in a graph (centrality), meaning that we can see which page is more important in the Web graph (pages and links). The idea is that a random walker starts from a random page and with probability d = 0.85, clicks a link at random; with probability 1 - d = 0.15, goes to another random page. The score of a page is the probability the random walker gets there. Let’s see an example for single-machine PageRank. rank = [1/n, 1/n,...] # array of length n for m iterations: new_rank = [(1-d)/n, (1-d)/n,...] # length n for each node x with t neighbors for each neighbor y of x new_rank[y] += d * rank[x] / t rank = new_rank Pregel 💡 Pregel is a computational framework designed for large-scale graph processing, developed by Google. It is optimized for distributed systems and is specifically tailored for solving problems on massive graphs, such as social networks, web graphs, and other graph-based data structures. Pregel adopts the Bulk Synchronous Parallel (BSP) computation model and processes graphs in a vertex-centric manner, enabling efficient and scalable graph computations. Let’s see an example. def PageRank(v: Id, msgs: List[Double]){ // Compute the message sum var msgSum = 0 for (m