🎧 New: AI-Generated Podcasts Turn your study notes into engaging audio conversations. Learn more

Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...

Full Transcript

Chapter 3. Distributed Systems Essentials As I described in [[Chapter 2]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch02.html#distributed_systems_architectures_an_in), scaling a system naturally involves adding multiple independently moving parts. We run our so...

Chapter 3. Distributed Systems Essentials As I described in [[Chapter 2]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch02.html#distributed_systems_architectures_an_in), scaling a system naturally involves adding multiple independently moving parts. We run our software components on multiple machines and our databases across multiple storage nodes, all in the quest of adding more processing capacity. Consequently, our solutions are distributed across multiple machines in multiple locations, with each machine processing events concurrently, and exchanging messages over a network. This fundamental nature of distributed systems has some profound implications on the way we design, build, and operate our solutions. This chapter provides the basic information you need to know to appreciate the issues and complexities of distributed software systems. I'll briefly cover communications networks hardware and software, remote method invocation, how to deal with the implications of communications failures, distributed coordination, and the thorny issue of time in distributed systems. Communications Basics Every distributed system has software components that communicate over a network. If a mobile banking app requests the user's current bank account balance, a (very simplified) sequence of communications occurs along the lines of: 1. 2. 3. 4. 5. It almost sounds simple when you read the above, but in reality, there's a huge amount of complexity hidden beneath this sequence of communications. Let's examine some of these complexities in the following sections. **Communications Hardware** The bank balance request example above will inevitably traverse multiple different networking technologies and devices. The global internet is a heterogeneous machine, comprising different types of network communications channels and devices that shuttle many millions of messages per second across networks to their intended destinations. Different types of communications channels exist. The most obvious categorization is wired versus wireless. For each category there are multiple network transmission hardware technologies that can ship bits from one machine to another. Each technology has different characteristics, and the ones we typically care about are speed and range. For physically wired networks, the two most common types are local area networks (LANs) and wide area networks (WANs). LANs are networks that can connect devices at "building scale," being able to transmit data over a small number (e.g., 1--2) of kilometers. Contemporary LANs in data centers can transport between 10 and 100 gigabits per second (Gbps). This is known as the network's bandwidth, or capacity. The time taken to transmit a message across a LAN---the network's latency---is submillisecond with modern LAN technologies. WANs are networks that traverse the globe and make up what we collectively call the internet. These long-distance connections are the high speed data pipelines connecting cities, countries, and continents with fiber optic cables. These cables support a networking technology known as [[wavelength division multiplexing]](https://oreil.ly/H7uwR) which makes it possible to transmit up to 171 Gbps over 400 different channels, giving more than 70 terabits per second (Tbps) of total bandwidth for a single fiber link. The fiber cables that span the world normally comprise four or more strands of fiber, giving bandwidth capacity of hundreds of Tbps for each cable. Latency is more complicated with WANs, however. WANs transmit data over hundreds and thousands of kilometers, and the maximum speed that the data can travel in fiber optic cables is the theoretical speed of light. In reality, these cables can't reach the speed of light, but do get pretty close to it, as shown in [[Table 3-1]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#wan_speeds). **Path** **Distance** **Travel time (speed of light)** **Travel time (fiber optic cable)** --------------------------- -------------- ---------------------------------- ------------------------------------- New York to San Francisco 4,148 km 14 ms 21 ms New York to London 5,585 km 19 ms 28 ms New York to Sydney 15,993 km 53 ms 80 ms Table 3-1. WAN speeds Actual times will be slower than the fiber optic travel times in [[Table 3-1]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#wan_speeds) as the data needs to pass through networking equipment known as [[routers]](https://oreil.ly/t7I0Y). The global internet has a complex hub-and-spoke topology with many potential paths between nodes in the network. Routers are therefore responsible for transmitting data on the physical network connections to ensure data is transmitted across the internet from source to destination. Routers are specialized, high-speed devices that can handle several hundred Gbps of network traffic, pulling data off incoming connections and sending the data out to different outgoing network connections based on their destination. Routers at the core of the internet comprise racks of these devices and can process tens to hundreds of Tbps. This is how you and thousands of your friends get to watch a steady video stream on Netflix at the same time. Wireless technologies have different range and bandwidth characteristics. WiFi routers that we are all familiar with in our homes and offices are wireless Ethernet networks and use 802.11 protocols to send and receive data. The most widely used WiFi protocol, 802.11ac, allows for maximum (theoretical) data rates of up to 5,400 Mbps. The most recent 802.11ax protocol, also known as WiFi 6, is an evolution of 802.11ac technology that promises increased throughput speeds of up to 9.6 Gbps. The range of WiFi routers is of the order of tens of meters and of course is affected by physical impediments like walls and floors. Cellular wireless technology uses radio waves to send data from our phones to routers mounted on cell towers, which are generally connected by wires to the core internet for message routing. Each cellular technology introduces improved bandwidth and other dimensions of performance. The most common technology at the time of writing is 4G LTE wireless broadband. 4G LTE is around 10 times faster than the older 3G, able to handle sustained download speeds around 10 Mbps (peak download speeds are nearer 50 Mbps) and upload speeds between 2 and 5 Mbps. Emerging 5G cellular networks promise 10x bandwidth improvements over existing 4G, with 1--2 millisecond latencies between devices and cell towers. This is a great improvement over 4G latencies, which are in the 20--40 millisecond range. The trade-off is range. 5G base station range operates at about 500 meters maximum, whereas 4G provides reliable reception at distances of 10--15 km. This whole collection of different hardware types for networking comes together in the global internet. The internet is a heterogeneous network, with many different operators around the world and every type of hardware imaginable. [[Figure 3-1]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#simplified_view_of_the_internet) shows a simplified view of the major components that comprise the internet. Tier 1 networks are the global high-speed internet backbone. There are around 20 Tier 1 internet service providers (ISPs) who manage and control global traffic. Tier 2 ISPs are typically regional (e.g., one country), have lower bandwidth than Tier 1 ISPs, and deliver content to customers through Tier 3 ISPs. Tier 3 ISPs are the ones that charge you exorbitant fees for your home internet every month. Simplified view of the internet Figure 3-1. Simplified view of the internet There's a lot more complexity to how the internet works than described here. That level of networking and protocol complexity is beyond the scope of this chapter. From a distributed systems software perspective, we need to understand more about the "magic" that enables all this hardware to route messages from, say, my cell phone to my bank and back. This is where the *Internet Protocol (IP)* comes in. **Communications Software** Software systems on the internet communicate using the [[Internet Protocol (IP) suite]](https://oreil.ly/DJf0L). The IP suite specifies host addressing, data transmission formats, message routing, and delivery characteristics. There are four abstract layers, which contain related protocols that support the functionality required at that layer. These are, from lowest to highest: 1. 2. 3. 4. Each of the higher-layer protocols builds on the features of the lower layers. In the following section, I'll briefly cover IP for host discovery and message routing, and TCP and UDP that can be utilized by distributed applications. **Internet Protocol (IP)** IP defines how hosts are assigned addresses on the internet and how messages are transmitted between two hosts who know each other's addresses. Every device on the internet has its own address. These are known as Internet Protocol (IP) addresses. The location of an IP address can be found using an internet-wide directory service known as Domain Name System (DNS). DNS is a widely distributed, hierarchical database that acts as the address book of the internet. The technology currently used to assign IP addresses, known as Internet Protocol version 4 (IPv4), will eventually be replaced by its successor, IPv6. IPv4 is a 32-bit addressing scheme that before long will run out of addresses due to the number of devices connecting to the internet. IPv6 is a 128-bit scheme that will offer an (almost) infinite number of IP addresses. As an indicator, in July 2020 about [[33% of the traffic processed by Google.com]](https://oreil.ly/3ix6W) is IPv6. DNS servers are organized hierarchically. A small number of root DNS servers, which are highly replicated, are the starting point for resolving an IP address. When an internet browser tries to find a website, a network host known as the local DNS server (managed by your employer or ISP) will contact a root DNS server with the requested hostname. The root server replies with a referral to a so-called *authoritative* DNS server that manages name resolution for, in our banking example, *.com* addresses. There is an authoritative name server for each top-level internet domain (*.com*, *.org*, *.net*, etc.). Next, the local DNS server will query the *.com* DNS server, which will reply with the address of the DNS server that knows about all the IP addresses managed by *igbank.com*. This DNS is queried, and it returns the actual IP address we need to communicate with the application. The overall scheme is illustrated in [[Figure 3-2]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#example_dns_lookup_for_igbankdotcom). ![Example DNS lookup for igbank.com](media/image2.png) Figure 3-2. Example DNS lookup for igbank.com The whole DNS database is highly geographically replicated so there are no single points of failure, and requests are spread across multiple physical servers. Local DNS servers also remember the IP addresses of recently contacted hosts, which is possible as IP addresses don't change very often. This means the complete name resolution process doesn't occur for every site we contact. Armed with a destination IP address, a host can start sending data across the network as a series of IP data packets. IP delivers data from the source to the destination host based on the IP addresses in the packet headers. IP defines a packet structure that contains the data to be delivered, along with header data including source and destination IP addresses. Data sent by an application is broken up into a series of packets which are independently transmitted across the internet. IP is known as a best-effort delivery protocol. This means it does not attempt to compensate for the various error conditions that can occur during packet transmission. Possible transmission errors include data corruption, packet loss, and duplication. In addition, every packet is routed across the internet from source to destination independently. Treating every packet independently is known as packet switching. This allows the network to dynamically respond to conditions such as network link failure and congestion, and hence is a defining characteristic of the internet. This does mean, however, that different packets may be delivered to the same destination via different network paths, resulting in out-of-order delivery to the receiver. Because of this design, the IP is unreliable. If two hosts require reliable data transmission, they need to add additional features to make this occur. This is where the next layer in the IP protocol suite, the transport layer, enters the scene. **Transmission Control Protocol (TCP)** Once an application or browser has discovered the IP address of the server it wishes to communicate with, it can send messages using a transport protocol API. This is achieved using TCP or UDP, which are the popular standard transport protocols for the IP network stack. Distributed applications can choose which of these protocols to use. Implementations are widely available in mainstream programming languages such as Java, Python, and C++. In reality, use of these APIs is not common as higher-level programming abstractions hide the details from most applications. In fact, the IP protocol suite application layer contains several of these application-level APIs, including HTTP, which is very widely used in mainstream distributed systems. Still, it's important to understand TCP, UDP, and their differences. Most requests on the internet are sent using TCP. TCP is: - - - I'll explain each of these qualities, and why they matter, below. TCP is known as a connection-oriented protocol. Before any messages are exchanged between applications, TCP uses a three-step handshake to establish a two-way connection between the client and server applications. The connection stays open until the TCP client calls close() to terminate the connection with the TCP server. The server responds by acknowledging the close() request before the connection is dropped. Once a connection is established, a client sends a sequence of requests to the server as a data stream. When a data stream is sent over TCP, it is broken up into individual network packets, with a maximum packet size of 65,535 bytes. Each packet contains a source and destination address, which is used by the underlying IP protocol to route the messages across the network. The internet is a packet switched network, which means every packet is individually routed across the network. The route each packet traverses can vary dynamically based on the conditions in the network, such as link congestion or failure. This means the packets may not arrive at the server in the same order they are sent from the client. To solve this problem, a TCP sender includes a sequence number in each packet so the receiver can reassemble packets into a stream that is identical to the order they were sent. Reliability is needed as network packets can be lost or delayed during transmission between sender and receiver. To achieve reliable packet delivery, TCP uses a cumulative acknowledgment mechanism. This means a receiver will periodically send an acknowledgment packet that contains the highest sequence number of the packets received without gaps in the packet stream. This implicitly acknowledges all packets sent with a lower sequence number, meaning all have been successfully received. If a sender doesn't receive an acknowledgment within a timeout period, the packet is resent. TCP has many other features, such as checksums to check packet integrity, and dynamic flow control to ensure a sender doesn't overwhelm a slow receiver by sending data too quickly. Along with connection establishment and acknowledgments, this makes TCP a relatively heavyweight protocol, which trades off reliability over efficiency. This is where UDP comes into the picture. UDP is a simple, connectionless protocol, which exposes the user's program to any unreliability of the underlying network. There is no guarantee that delivery will occur in a prescribed order, or that it will happen at all. It can be thought of as a thin veneer (layer) on top of the underlying IP protocol, and deliberately trades off raw performance over reliability. This, however, is highly appropriate for many modern applications where the odd lost packet has very little effect. Think streaming movies, video conferencing, and gaming, where one lost packet is unlikely to be perceptible by a user. [[Figure 3-3]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#comparing_tcp_and_udp) depicts some of the major differences between TCP and UDP. TCP incorporates a connection establishment three-packet handshake (SYN, SYN ACK, ACK), and piggybacks acknowledgments (ACK) of packets so that any packet loss can be handled by the protocol. There's also a TCP connection close phase involving a four-way handshake that is not shown in the diagram. UDP dispenses with connection establishment, tear down, acknowledgments, and retries. Therefore, applications using UDP need to be tolerant of packet loss and client or server failures (and behave accordingly). Comparing TCP and UDP Figure 3-3. Comparing TCP and UDP Remote Method Invocation It's perfectly feasible to write our distributed applications using low-level APIs that interact directly with the transport layer protocols TCP and UDP. The most common approach is the standardized sockets library---see the brief overview in the sidebar. This is something you'll hopefully never need to do, as sockets are complex and error prone. Essentially, sockets create a bidirectional pipe between two nodes that you can use to send streams of data. There are (luckily) much better ways to build distributed communications, as I'll describe in this section. These approaches abstract away much of the complexity of using sockets. However, sockets still lurk underneath, so some knowledge is necessary. **An Overview of Sockets** A socket is one endpoint of a two-way network connection between a client and a server. Sockets are identified by a combination of the node's IP address and an abstraction known as a *port*. A port is a unique numeric identifier, which allows a node to support communications for multiple applications running on the node. Each IP address can support 65,535 TCP ports and another 65,535 UDP ports. On a server, each *{\, \}* combination can be associated with an application. This combination forms a unique endpoint that the transport layer uses to deliver data to the desired server. A socket connection is identified by a unique combination of client and server IP addresses and ports, namely *\*. Each unique connection also allocates a socket descriptor on both the client and the server. Once the connection is created, the client sends data to the server in a stream, and the server responds with results. The sockets library supports both protocols, with the SOCK\_STREAM option for TCP, and the SOCK\_DGRAM for UDP. You can write distributed applications directly to the sockets API, which is an operating system core component. Socket APIs are available in all mainstream programming languages. However, the sockets library is a low-level, hard-to-use API. You should avoid it unless you have a real need to write system-level code. In our mobile banking example, the client might request a balance for the user's checking account using sockets. Ignoring specific language issues (and security!), the client could send a message payload as follows over a connection to the server: *{"balance", "000169990"}* In this message, "balance" represents the operation we want the server to execute, and "000169990" is the bank account number. In the server, we need to know that the first string in the message is the operation identifier, and based on this value being "balance", the second is the bank account number. The server then uses these values to presumably query a database, retrieve the balance, and send back the results, perhaps as a message formatted with the account number and current balance, as below: *{"000169990", "220.77"}* In any complex system, the server will support many operations. In *igbank.com*, there might be for example "login", "transfer", "address", "statement", "transactions", and so on. Each will be followed by different message payloads that the server needs to interpret correctly to fulfill the client's request. What we are defining here is an application-specific protocol. As long as we send the necessary values in the correct order for each operation, the server will be able to respond correctly. If we have an erroneous client that doesn't adhere to our application protocol, well, our server needs to do thorough error checking. The socket library provides a primitive, low-level method for client/server communications. It provides highly efficient communications but is difficult to correctly implement and evolve the application protocol to handle all possibilities. There are better mechanisms. Stepping back, if we were defining the *igbank.com* server interface in an object-oriented language such as Java, we would have each operation it can process as a method. Each method is passed an appropriate parameter list for that operation, as shown in this example code: // Simple igbank.com server interface public interface IGBank { public float balance (String accNo); public boolean statement(String month) ; // other operations } There are several advantages of having such an interface, namely: - - - These benefits of an explicit interface are of course well known in software engineering. The whole discipline of object-oriented design is pretty much based upon these foundations, where an interface defines a contract between the caller and callee. Compared to the implicit application protocol we need to follow with sockets, the advantages are significant. This fact was recognized reasonably early in the creation of distributed systems. Since the early 1990s, we have seen an evolution of technologies that enable us to define explicit server interfaces and call these across the network using essentially the same syntax as we would in a sequential program. A summary of the major approaches is given in [[Table 3-2]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#summary_of_major_rpcsolidusrmi_technolo). Collectively, they are known as Remote Procedure Call (RPC), or Remote Method Invocation (RMI) technologies. **Technology** **Date** **Main features** ------------------------------------------------------------------------------------------- ------------- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [[Distributed Computing Environment (DCE)]](https://oreil.ly/bvbR3) Early 1990s DCE RPC provides a standardized approach for client/server systems. Primary languages were C/C++. [[Common Object Request Broker Architecture (CORBA)]](https://oreil.ly/IO9qD) Early 1990s Facilitates language-neutral client/server communications based on an object-oriented interface definition language (IDL). Primary language support in C/C++, Java, Python, and Ada. [[Java Remote Method Invocation (RMI)]](https://oreil.ly/1fvGm) Late 1990s A pure Java-based remote method invocation that facilitates distributed client/server systems with the same semantics as Java objects. XML web services 2000 Supports client/server communications based on HTTP and XML. Servers define their remote interface in the Web Services Description Language (WSDL). gRPC 2015 Open source, based on HTTP/2 for transport, and uses [[Protocol Buffers (Protobuf)]](https://oreil.ly/ytHhl) as the interface description language Table 3-2. Summary of major RPC/RMI technologies While the syntax and semantics of these RPC/RMI technologies vary, the essence of how each operates is the same. Let's continue with our Java example of *igbank.com* to examine the whole class of approaches. Java offers a Remote Method Invocation (RMI) API for building client/server applications. Using Java RMI, we can trivially make our IGBank interface example from above into a remote interface, as illustrated in the following code: import java.rmi.\*; // Simple igbank.com server interface public interface IGBank **extends Remote**{ public float balance (String accNo) throws **RemoteException;** public boolean statement(String month) throws **RemoteException ;** // other operations } The java.rmi.Remote interface serves as a marker to inform the Java compiler we are creating an RMI server. In addition, each method must throw java.rmi.RemoteException. These exceptions represent errors that can occur when a distributed call between two objects is invoked over a network. The most common reasons for such an exception would be a communications failure or the server object having crashed. We then must provide a class that implements this remote interface. The sample code below shows an extract of the server implementation: public class IGBankServer extends UnicastRemoteObject implements IGBank { // constructor/method implementations omitted public static void main(String args\[\]){ try{ IGBankServer server=new IGBankServer(); // create a registry in local JVM on default port Registry registry = LocateRegistry.createRegistry(1099); registry.bind(\"IGBankServer\", server); System.out.println(\"server ready\"); }catch(Exception e){ // code omitted for brevity} } } Points to note are: - - An extract from the client code to connect to the server is shown in the following example. It obtains a reference to the remote object by performing a lookup operation in the RMI registry and specifying the logical name that identifies the server. The reference returned by the lookup operation can then be used to call the server object in the same manner as a local object. However, there is a difference---the client must be ready to catch a RemoteException that will be thrown by the Java runtime when the server object cannot be reached: // obtain a remote reference to the server IGBank bankServer= (IGBank)Naming.lookup(\"rmi://localhost:1099/IGBankServer\"); //now we can call the server System.out.println(bankServer.balance(\"00169990\")); [[Figure 3-4]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#schematic_depicting_the_call_sequence_f) depicts the call sequence among the components that comprise an RMI system. The Stub and Skeleton are objects generated by the compiler from the RMI interface definition, and these facilitate the actual remote communications. The skeleton is in fact a TCP network endpoint (*host, port*) that listens for calls to the associated server. ![Schematic depicting the call sequence for establishing a connection and making a call to an RMI server object](media/image4.png) Figure 3-4. Schematic depicting the call sequence for establishing a connection and making a call to an RMI server object The sequence of operations is as follows: 1. 2. 3. 4. 5. 6. 7. 8. This Java RMI example illustrates the basics that are used for implementing any RPC/RMI mechanism, even in modern languages like [[Erlang]](https://oreil.ly/D5biM) and [[Go]](https://oreil.ly/zD8dS). You are most likely to encounter Java RMI when using the Java Enterprise JavaBeans (EJB) technology. EJBs are a server-side component model built on RMI, which have seen wide usage in the last 20 or so years in enterprise systems. Regardless of the precise implementation, the basic attraction of RPC/RMI approaches is to provide an abstract calling mechanism that supports *location transparency* for clients making remote server calls. Location transparency is provided by the registry, or in general any mechanism that enables a client to locate a server through a directory service. This means it is possible for the server to update its network location in the directory without affecting the client implementation. RPC/RMI is not without its flaws. Marshalling and unmarshalling can become inefficient for complex object parameters. Cross-language marshalling---client in one language, server in another---can cause problems due to types being represented differently in different languages, causing subtle incompatibilities. And if a remote method signature changes, all clients need to obtain a new compatible stub, which can be cumbersome in large deployments. For these reasons, most modern systems are built around simpler protocols based on HTTP and using JSON for parameter representation. Instead of operation names, HTTP verbs (PUT, GET, POST, etc.) have associated semantics that are mapped to a specific URL. This approach originated in the work by Roy Fielding on the REST approach.[****](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#ch01fn10) REST has a set of semantics that comprise a *RESTful* architecture style, and in reality most systems do not adhere to these. We'll discuss REST and HTTP API mechanisms in [[Chapter 5]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch05.html#application_services). Partial Failures The components of distributed systems communicate over a network. In communications technology terminology, the shared local and wide area networks that our systems communicate over are known as *asynchronous* networks. With asynchronous networks: - - - - - - **Note** This is in contrast with synchronous networks, which essentially are full duplex, transmitting data in both directions at the same time with each node having an [[identical clock for synchronization]](https://oreil.ly/SEPCs). What does this mean for our applications? Well, put simply, when a client sends a request to a server, how long does it wait until it receives a reply? Is the server node just being slow? Is the network congested and the packet has been dropped by a router? If the client doesn't get a reply, what should it do? Let's explore these scenarios in detail. The core problem here, namely whether and when a response is received, is known as handling partial failures, and the general situation is depicted in [[Figure 3-5]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#handling_partial_failures). Handling partial failures Figure 3-5. Handling partial failures When a client wishes to connect to a server and exchanges messages, the following outcomes may occur: - - - - - - The first three points are easy for the client to handle, as a response is received rapidly. A result from the server or an error message---either allows the client to proceed. Failures that can be detected quickly are easy to deal with. The rest of the outcomes pose a problem for the client. They do not provide any insight into the reason why a response has not been received. From the client's perspective, these three outcomes look exactly the same. The client cannot know without waiting (potentially forever) whether the response will arrive eventually or never arrive; waiting forever doesn't get much work done. More insidiously, the client cannot know if the operation succeeded and a server or network failure caused the result to never arrive, or if the request is on its way---delayed simply due to congestion in the network/server. These faults are collectively known as [*[crash faults]*](https://oreil.ly/AAc9M). The typical solution that clients adopt to handle crash faults is to resend the request after a configured timeout period. However, this is fraught with danger, as [[Figure 3-6]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#client_retries_a_request_after_timeout) illustrates. The client sends a request to the server to deposit money in a bank account. When it receives no response after a timeout period, it resends the request. What is the resulting balance? The server may have applied the deposit, or it may not, depending on the partial failure scenario. ![Client retries a request after timeout](media/image6.png) Figure 3-6. Client retries a request after timeout The chance that the deposit may occur twice is a fine outcome for the customer, but the bank is unlikely to be amused by this possibility. Therefore, we need a way to ensure in our server operations implementation that retried, duplicate requests from clients only result in the request being applied once. This is necessary to maintain correct application semantics. This property is known as *idempotence*. Idempotent operations can be applied multiple times without changing the result beyond the initial application. This means that for the example in [[Figure 3-6]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#client_retries_a_request_after_timeout), the client can retry the request as many times as it likes, and the account will only be increased by \$100. Requests that make no persistent state changes are naturally idempotent. This means all read requests are inherently safe and no extra work is needed on the server. Updates are a different matter. The system needs to devise a mechanism such that duplicate client requests do not cause any state changes and can be detected by the server. In API terms, these endpoints cause mutation of the server state and must therefore be idempotent. The general approach to building idempotent operations is as follows: - - - The database used to store idempotency keys can be implemented in, for example: - - Unlike application data, idempotency keys don't have to be retained forever. Once a client receives an acknowledgment of a success for an individual operation, the idempotency key can be discarded. The simplest way to achieve this is to automatically remove idempotency keys from the store after a specific time period, such as 60 minutes or 24 hours, depending on application needs and request volumes. In addition, an idempotent API implementation must ensure that the application state is modified *and* the idempotency key is stored. Both must occur for success. If the application state is modified and, due to some failure, the idempotent key is not stored, then a retry will cause the operation to be applied twice. If the idempotency key is stored but for some reason the application state is not modified, then the operation has not been applied. If a retry arrives, it will be filtered out as duplicate as the idempotency key already exists, and the update will be lost. The implication here is that the updates to the application state and idempotency key store must *both* occur, or *neither* must occur. If you know your databases, you'll recognize this as a requirement for transactional semantics. We'll discuss how distributed transactions are achieved in [[Chapter 12]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch12.html#strong_consistency). Essentially, transactions ensure *exactly-once semantics for operations*, which guarantees that all messages will always be processed exactly once---precisely what we need for idempotence. Exactly once does not mean that there are no message transmission failures, retries, and application crashes. These are all inevitable. The important thing is that the retries eventually succeed and the result is always the same. We'll return to the issue of communications delivery guarantees in later chapters. As [[Figure 3-7]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#communications_delivery_guarantees) illustrates, there's a spectrum of semantics, each with different guarantees and performance characteristics. *At-most-once* delivery is fast and unreliable---this is what the UDP protocol provides. *At-least-once* delivery is the guarantee provided by TCP/IP, meaning duplicates are inevitable. *Exactly-once* delivery, as we've discussed here, requires guarding against duplicates and hence trades off reliability against slower performance. Communications delivery guarantees Figure 3-7. Communications delivery guarantees As we'll see, some advanced communications mechanisms can provide our applications with exactly-once semantics. However, these don't operate at internet scale because of the performance implications. That is why, as our applications are built on the at-least-once semantics of TCP/IP, we must implement exactly-once semantics in our APIs that cause state mutation. Consensus in Distributed Systems Crash faults have another implication for the way we build distributed systems. This is best illustrated by the [[Two Generals' Problem]](https://oreil.ly/ap5eq), which is depicted in [[Figure 3-8]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#the_two_generalsapostrophe_problem). ![The Two Generals' Problem](media/image8.png) Figure 3-8. The Two Generals' Problem Imagine a city under siege by two armies. The armies lie on opposite sides of the city, and the terrain surrounding the city is difficult to travel through and visible to snipers in the city. In order to overwhelm the city, it's crucial that both armies attack at the same time. This will stretch the city's defenses and make victory more likely for the attackers. If only one army attacks, then they will likely be repelled. Given these constraints, how can the two generals reach agreement on the exact time to attack, such that both generals know for certain that agreement has been reached? They both need certainty that the other army will attack at the agreed time, or disaster will ensue. To coordinate an attack, the first general sends a messenger to the other, with instructions to attack at a specific time. As the messenger may be captured or killed by snipers, the sending general cannot be certain the message has arrived unless they get an acknowledgment messenger from the second general. Of course, the acknowledgment messenger may be captured or killed, so even if the original messenger does get through, the first general may never know. And even if the acknowledgment message arrives, how does the second general know this, unless they get an acknowledgment from the first general? Hopefully the problem is apparent. With messengers being randomly captured or extinguished, there is no guarantee the two generals will ever reach consensus on the attack time. In fact, it can be proven that it is not possible to *guarantee* agreement will be reached. There are solutions that increase the likelihood of reaching consensus. For example, *Game of Thrones* style, each general may send 100 different messengers every time, and even if most are killed, this increases the probability that at least one will make the perilous journey to the other friendly army and successfully deliver the message. The Two Generals' Problem is analogous to two nodes in a distributed system wishing to reach agreement on some state, such as the value of a data item that can be updated at either. Partial failures are analogous to losing messages and acknowledgments. Messages may be lost or delayed for an indeterminate period of time---the characteristics of asynchronous networks, as I described earlier in this chapter. In fact it can be demonstrated that consensus on an asynchronous network in the presence of crash faults, where messages can be delayed but not lost, is impossible to achieve within bounded time. This is known as the FLP Impossibility Theorem.[****](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#ch01fn11) Luckily, this is only a theoretical limitation, demonstrating it's not possible to *guarantee* consensus will be reached with unbounded message delays on an asynchronous network. In reality, distributed systems reach consensus all the time. This is possible because while our networks are asynchronous, we can establish sensible practical bounds on message delays and retry after a timeout period. FLP is therefore a worst-case scenario, and as such I'll discuss algorithms for establishing consensus in distributed databases in [[Chapter 12]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch12.html#strong_consistency). Finally, we should note the issue of Byzantine failures. Imagine extending the Two Generals' Problem to *N* generals who need to agree on a time to attack. However, in this scenario, traitorous messengers may change the value of the time of the attack, or a traitorous general may send false information to other generals. This class of *malicious* failures are known as Byzantine faults and are particularly sinister in distributed systems. Luckily, the systems we discuss in this book typically live behind well-protected, secure enterprise networks and administrative environments. This means we can in practice exclude handling Byzantine faults. Algorithms that do address such malicious behaviors exist, and if you are interested in a practical example, take a look at [[blockchain consensus mechanisms]](https://oreil.ly/r3vQT) and [[Bitcoin]](https://oreil.ly/IPohu). Time in Distributed Systems Every node in a distributed system has its own internal clock. If all the clocks on every machine were perfectly synchronized, we could always simply compare the timestamps on events across nodes to determine the precise order they occurred in. If this were reality, many of the problems I'll discuss with distributed systems would pretty much go away. Unfortunately, this is not the case. Clocks on individual nodes *drift* due to environmental conditions like changes in temperature or voltage. The amount of drift varies on every machine, but values such as 10--20 seconds per day are not uncommon. (Or with my current coffee machine at home, about 5 minutes per day!) If left unchecked, clock drift would render the time on a node meaningless---like the time on my coffee machine if I don't correct it every few days. To address this problem, a number of *time services* exist. A time service represents an accurate time source, such as a GPS or atomic clock, which can be used to periodically reset the clock on a node to correct for drift on packet-switched, variable-latency data networks. The most widely used time service is [[Network Time Protocol (NTP)]](http://www.ntp.org/), which provides a hierarchically organized collection of time servers spanning the globe. The root servers, of which there are around 300 worldwide, are the most accurate. Time servers in the next level of the hierarchy (approximately 20,000) synchronize to within a few milliseconds of the root server periodically, and so on throughout the hierarchy, with a maximum of 15 levels. Globally, there are more than 175,000 NTP servers. Using the NTP protocol, a node in an application running an NTP client can synchronize to an NTP server. The time on a node is set by a UDP message exchange with one or more NTP servers. Messages are time stamped, and through the message exchange the time taken for message transit is estimated. This becomes a factor in the algorithm used by NTP to establish what the time on the client should be reset to. A simple NTP configuration is shown in [[Figure 3-9]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#illustrating_using_the_ntp_service). On a LAN, machines can synchronize to an NTP server within a small number of milliseconds accuracy. One interesting effect of NTP synchronization for our applications is that the resetting of the clock can move the local node time forward or backward. This means that if our application is measuring the time taken for events to occur (e.g., to calculate event response times), it is possible that the end time of the event may be earlier than the start time if the NTP protocol has set the local time backward. Illustrating using the NTP service Figure 3-9. Illustrating using the NTP service In fact, a compute node has two clocks. These are: Applications can use an NTP service to ensure the clocks on every node in the system are closely synchronized. It's typical for an application to resynchronize clocks on anything from a one hour to one day time interval. This ensures the clocks remain close in value. Still, if an application really needs to precisely know the order of events that occur on different nodes, clock drift is going to make this fraught with danger. There are other time services that provide higher accuracy than NTP. [[Chrony]](https://oreil.ly/ylrw3) supports the NTP protocol but provides much higher accuracy and greater scalability than NTP---the reason it has been [[adopted by Facebook]](https://oreil.ly/JqKHP). Amazon has built the Amazon Time Sync Service by installing GPS and atomic clocks in its data centers. This service is available for free to all AWS customers. The takeaway from this discussion is that our applications cannot rely on timestamps of events on different nodes to represent the actual order of these events. Clock drift even by a second or two makes cross-node timestamps meaningless to compare. The implications of this will become clear when we start to discuss distributed databases in detail. Summary and Further Reading This chapter has covered a lot of ground to explain some of the essential characteristics of communications and time in distributed systems. These characteristics are important for application designers and developers to understand. The key issues that should resonate from this chapter are as follows: 1. 2. 3. 4. 5. These issues will pervade the discussions in the rest of this book. Many of the unique problems and solutions that are adopted in distributed systems stem from these fundamentals. There's no escaping them! An excellent source for more detailed, more theoretical coverage of all aspects of distributed systems is George Coulouris et al., *Distributed Systems: Concepts and Design*, 5th ed. (Pearson, 2001). Likewise for computer networking, you'll find out all you wanted to know and no doubt more in James Kurose and Keith Ross's *Computer Networking: A Top-Down Approach*, 7th ed. (Pearson, 2017). [****](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#ch01fn10-marker) Roy T. Fielding, "Architectural Styles and the Design of Network-Based Software Architectures." Dissertation, University of California, Irvine, 2000. [****](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#ch01fn11-marker) Michael J. Fischer et al., "Impossibility of Distributed Consensus with One Faulty Process," *Journal of the ACM* 32, no. 2 (1985): 374--82. [*[https://doi.org/10.1145/3149.214121]*](https://doi.org/10.1145/3149.214121). Chapter 4. An Overview of Concurrent Systems Distributed systems comprise multiple independent pieces of code executing in parallel, or concurrently, on many processing nodes across multiple locations. Any distributed system is hence by definition a concurrent system, even if each node is processing events one at a time. The behavior of the various nodes must of course be coordinated in order to make the application behave as desired. As I described in [[Chapter 3]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch03.html#distributed_systems_essentials), coordinating nodes in a distributed system is fraught with danger. Luckily, our industry has matured sufficiently to provide complex, powerful software frameworks that hide many of these distributed system perils from our applications (most of the time, anyway). The majority of this book focuses on describing how we can utilize these frameworks to build scalable distributed systems. This chapter, however, is concerned with concurrent behavior in our systems on a single node. By explicitly writing our software to perform multiple actions concurrently, we can optimize the processing and resource utilization on a single node, and hence increase our processing capacity both locally and system-wide. I'll use the Java 7.0 concurrency capabilities for examples, as these are at a lower level of abstraction than those introduced in Java 8.0. Knowing how concurrent systems operate "closer to the machine" is essential foundational knowledge when building concurrent and distributed systems. Once you understand the lower-level mechanisms for building concurrent systems, the more abstract approaches are easier to optimally exploit. And while this chapter is Java-specific, the fundamental problems of concurrent systems don't change when you write systems in other languages. Mechanisms for handling concurrency exist in all mainstream programming languages. [["Concurrency Models"]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#concurrency_models) gives some more details on alternative approaches and how they are implemented in modern languages. One final point. This chapter is a concurrency *primer*. It won't teach you everything you need to know to build complex, high-performance concurrent systems. It will also be useful if your experience writing concurrent programs is rusty, or you have some exposure to concurrent code in another programming language. The further reading section at the end of the chapter points to more comprehensive coverage of this topic for those who wish to delve deeper. Why Concurrency? Think of a busy coffee shop. If everyone orders a simple coffee, then the barista can quickly and consistently deliver each drink. Suddenly, the person in front of you orders a soy, vanilla, no sugar, quadruple-shot iced brew. Everyone in line sighs and starts reading their social media. In two minutes the line is out of the door. Processing requests in web applications is analogous to our coffee example. In a coffee shop, we enlist the help of a new barista to simultaneously make coffees on a different machine to keep the line length in control and serve customers quickly. In software, to make applications responsive, we need to somehow process requests in our server in an overlapping manner, handling requests concurrently. In the good old days of computing, each CPU was only able to execute a single machine instruction at any instant. If our server application runs on such a CPU, why do we need to structure our software systems to potentially execute multiple instructions concurrently? It all seems slightly pointless. There is actually a very good reason. Virtually every program does more than just execute machine instructions. For example, when a program attempts to read from a file or send a message on the network, it must interact with the hardware subsystem (disk, network card) that is peripheral to the CPU. Reading data from a magnetic hard disk takes around 10 milliseconds (ms). During this time, the program must wait for the data to be available for processing. Now, even an ancient CPU such as a [[1988 Intel 80386]](https://oreil.ly/NK6NC) can execute [[more than 10 million instructions per second (mips)]](https://oreil.ly/8jWH5). 10 ms is one hundredth of a second. How many instructions could our 80386 execute in a hundredth second? Do the math. (Hint---it's a lot!) A lot of wasted processing capacity, in fact. This is how operating systems such as Linux can run multiple programs on a single CPU. While one program is waiting for an I/O event, the operating system schedules another program to execute. By explicitly structuring our software to have multiple activities that can be executed in parallel, the operating system can schedule tasks that have work to do while others wait for I/O. We'll see in more detail how this works with Java later in this chapter. In 2001, IBM introduced the world's first multicore processor, a chip with two CPUs---see [[Figure 4-1]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#simplified_view_of_a_multicore_processo) for a simplified illustration. Today, even my laptop has 16 CPUs, or "cores," as they are commonly known. With a multicore chip, a software system that is structured to have multiple parallel activities can be executed concurrently on each core, up to the number of available cores. In this way, we can fully utilize the processing resources on a multicore chip, and thus increase our application's processing capacity. ![Simplified view of a multicore processor](media/image10.png) Figure 4-1. Simplified view of a multicore processor The primary way to structure a software system as concurrent activities is to use *threads*. Virtually every programming language has its own threading mechanism. The underlying semantics of all these mechanisms are similar---there are only a few primary threading models in mainstream use---but obviously the syntax varies by language. In the following sections, I'll explain how threads are supported in Java, and how we need to design our programs to be safe (i.e., correct) and efficient when executing in parallel. Armed with this knowledge, leaping into the concurrency features supported in other languages shouldn't be too arduous. **Concurrency Models** This chapter describes one model for concurrent systems, based on independently executing threads using locks to operate on shared, mutable resources. Concurrency models have been a much studied and explored topic in computer science for roughly the last 50 years. Many theoretical proposals have been put forward, and some of these are implemented in modern programming languages. These models provide alternative approaches for structuring and coordinating concurrent activities in programs. Here's a sampler that you might well encounter in your work: Hopefully this gives you a feel for the diversity of concurrency models and primitives in modern programming languages. Luckily, when you know the fundamentals and one model, the rest are straightforward to learn. Threads Every software process has a single thread of execution by default. This is the thread that the operating system manages when it schedules the process for execution. In Java, for example, the main() function you specify as the entry point to your code defines the behavior of this thread. This single thread has access to the program's environment and resources such as open file handles and network connections. As the program calls methods in objects instantiated in the code, the program's runtime stack is used to pass parameters and manage variable scopes. Standard programming language runtime stuff, that we all know and love. This is a sequential process. In your systems, you can use programming language features to create and execute additional threads. Each thread is an independent sequence of execution and has its own runtime stack to manage local object creation and method calls. Each thread also has access to the process' global data and environment. A simple depiction of this scheme is shown in [[Figure 4-2]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#comparing_a_single_threaded_and_multith). Comparing a single threaded and multithreaded process Figure 4-2. Comparing a single-threaded and multithreaded process In Java, we can define a thread using a class that implements the Runnable interface and defines the run() method. Let's look at a simple example: class NamingThread implements **Runnable** { private String name; public NamingThread(String threadName) { name = threadName ; System.out.println(\"Constructor called: \" + threadName) ; } public void **run()** { //Display info about this thread System.out.println(\"Run called : \" + name); System.out.println(name + \" : \" + Thread.currentThread()); // and now terminate \.... } } To execute the thread, we need to construct a Thread object using an instance of our Runnable and call the start() method to invoke the code in its own execution context. This is shown in the next code example, along with the output of running the code in bold text. Note this example has two threads: the main() thread and the NamingThread. The main thread starts the NamingThread, which executes asynchronously, and then waits for 1 second to give our run() method in NamingThread ample time to complete: public static void main(String\[\] args) { NamingThread name0 = new NamingThread(\"My first thread\"); //Create the thread Thread t0 = new Thread (name0); // start the threads t0.start(); //delay the main thread for a second (1000 milliseconds) try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) {} //Display info about the main thread and terminate System.out.println(Thread.currentThread()); } **===EXECUTION OUTPUT===** **Constructor called: My first thread** **Run called : My first thread** **My first thread : Thread\[Thread-0,5,main\]** **Thread\[main,5,main\]** For illustration, we also call the static currentThread() method, which returns a string containing: - - - Note that to instantiate a thread, we call the start() method, not the run() method we define in the Runnable. The start() method contains the internal system magic to create the execution context for a separate thread to execute. If we call run() directly, the code will execute, but no new thread will be created. The run() method will execute as part of the main thread, just like any other Java method invocation that you know and love. You will still have a single-threaded code. In the example, we use sleep() to pause the execution of the main thread and make sure it does not terminate before the NamimgThread. This approach, namely coordinating two threads by delaying for an absolute time period (1 second in the example) is not a very robust mechanism. What if for some reason---a slower CPU, a long delay reading disk, additional complex logic in the method---our thread doesn't terminate in the expected timeframe? In this case, main will terminate first---this is not what we intend. In general, if you are using absolute times for thread coordination, you are doing it wrong. Almost always. Like 99.99999% of the time. A simple and robust mechanism for one thread to wait until another has completed its work is to use the join() method. We could replace the try-catch block in the above example with: t0.join(); This method causes the calling thread (in this case, main) to block until the thread referenced by t0 terminates. If the referenced thread has terminated before the call to join(), then the method call returns immediately. In this way we can coordinate, or synchronize, the behavior of multiple threads. Synchronization of multiple threads is in fact the major focus of the rest of this chapter. Order of Thread Execution The system scheduler (in Java, this lives in the Java virtual machine \[JVM\]) controls the order of thread execution. From the programmer's perspective, the order of execution is *nondeterministic*. Get used to that term, I'll use it a lot. The concept of nondeterminism is fundamental to understanding multithreaded code. I'll illustrate this by building on the earlier NamingThread example. Instead of creating a single NamingThread, I'll create and start up a few. Three, in fact, as shown in the following code example. Again, sample output from running the code is in bold text beneath the code itself: NamingThread name0 = new NamingThread(\"thread0\"); NamingThread name1 = new NamingThread(\"thread1\"); NamingThread name2 = new NamingThread(\"thread2\"); //Create the threads Thread t0 = new Thread (name0); Thread t1 = new Thread (name1); Thread t2 = new Thread (name2); // start the threads t0.start(); !(media/image12.png) t1.start(); 1 t2.start(); !(media/image12.png) **===EXECUTION OUTPUT===** **Run called : thread0** **thread0 : Thread\[Thread-0,5,main\]** 2 **Run called : thread2** !(media/image14.png) **Run called : thread1** **thread1 : Thread\[Thread-1,5,main\]** 4 **thread2 : Thread\[Thread-2,5,main\]** **Thread\[main,5,main\]** The output shown is a sample from just one execution. You can see the code starts three threads sequentially, namely *t0*, *t1*, and *t2* (see !(media/image12.png)). Looking at the output, we see thread *t0* completes (see 2) before the others start. Next *t2*'s run() method is called (see !(media/image14.png)) followed by *t1*'s run() method, even though *t1* was started before *t2*. Thread *t1* then runs to completion (see 4) before *t2*, and eventually the *main* thread and the program terminate. This is just one possible order of execution. If we run this program again, we will almost certainly see a different execution trace. This is because the JVM scheduler is deciding which thread to execute, and for how long. Put very simply, once the scheduler has given a thread an execution time slot on a CPU, it can interrupt the thread after a specified time period and schedule another one to run. This interruption is known as preemption. Preemption ensures each thread is given an opportunity to make progress. Hence the threads run independently and asynchronously until completion, and the scheduler decides which thread runs when based on a scheduling algorithm. There's more to thread scheduling than this, and I'll explain the basic scheduling algorithm used later in this chapter. For now, there is a major implication for programmers; regardless of the order of thread execution---which you don't control---your code should produce correct results. Sounds easy? Read on. Problems with Threads The basic problem in concurrent programming is coordinating the execution of multiple threads so that whatever order they are executed in, they produce the correct answer. Given that threads can be started and preempted nondeterministically, any moderately complex program will have essentially an infinite number of possible orders of execution. These systems aren't easy to test. There are two fundamental problems that all concurrent programs need to avoid. These are race conditions and deadlocks, and these topics are covered in the next two subsections. **Race Conditions** Nondeterministic execution of threads implies that the code statements that comprise the threads: - - Hence, when many threads are executed on a single processor, their execution is *interleaved*. The CPU executes some steps from one thread, then performs some steps from another, and so on. If we are executing on a multicore CPU, then we can execute one thread per core. The statements of each thread execution are still however interleaved in a nondeterministic manner. Now, if every thread simply does its own thing and is completely independent, this is not a problem. Each thread executes until it terminates, as in our trivial Naming​Thread example. This stuff is a piece of cake! Why are these thread things meant to be complex? Unfortunately, totally independent threads are not how most multithreaded systems behave. If you refer back to [[Figure 4-2]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#comparing_a_single_threaded_and_multith), you will see that multiple threads share the global data within a process. In Java this is both global and static data. Threads can use shared data structures to coordinate their work and communicate status across threads. For example, we may have threads handling requests from web clients, one thread per request. We also want to keep a running total of how many requests we process each day. When a thread completes a request, it increments a global RequestCounter object that all threads share and update after each request. At the end of the day, we know how many requests were processed. A simple and elegant solution indeed. Well, maybe? The code below shows a very simple implementation that mimics the request counter example scenario. It creates 50,000 threads to update a shared counter. Note we use a lambda function for brevity to create the threads, and a (really bad idea) 5-second delay in main to allow the threads to finish:[****](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#ch01fn12) public class RequestCounter { final static private int NUMTHREADS = 50000; private int count = 0; public void inc() { count++; } public int getVal() { return this.count; } public static void main(String\[\] args) throws InterruptedException { final RequestCounter counter = new RequestCounter(); for (int i = 0; i \< NUMTHREADS; i++) { // lambda runnable creation Runnable thread = () -\> {counter.inc(); }; new Thread(thread).start(); } Thread.sleep(5000); System.out.println(\"Value should be \" + NUMTHREADS + \"It is: \" + counter.getVal()); } } What you can do at home is clone this code from [[the book's GitHub repo]](https://oreil.ly/fss-git-repo), run this code a few times, and see what results you get. In 10 executions my mean was 49,995. I didn't once get the correct answer of 50,000. Weird. Why? The answer lies in how abstract, high-level programming language statements, in Java in this case, are executed on a machine. In this example, to perform an increment of a counter, the CPU must: - - - This simple increment is actually a sequence of three machine-level operations. As [[Figure 4-3]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#increments_are_not_atomic_at_the_machin) shows, at the machine level these three operations are independent and not treated as a single *atomic* operation. By atomic, we mean an operation that cannot be interrupted and hence once started will run to completion. As the increment operation is not atomic at the machine level, one thread can load the counter value into a CPU register from memory, but before it writes the incremented value back, the scheduler preempts the thread and allows another thread to start. This thread loads the old value of the counter from memory and writes back the incremented value. Eventually the original thread executes again and writes back its incremented value, which just happens to be the same as what is already in memory. This means we've lost an update. From our 10 tests of the counter code above, we see this is happening on average 5 times in 50,000 increments. Hence such events are rare, but even if it happens 1 time in 10 million, you still have an incorrect result. ![Increments are not atomic at the machine level](media/image16.png) Figure 4-3. Increments are not atomic at the machine level When we lose updates in this manner, it is called a race condition. Race conditions can occur whenever multiple threads make changes to some shared state, in this case a simple counter. Essentially, different interleavings of the threads can produce different results. Race conditions are insidious, evil errors, because their occurrence is typically rare, and they can be hard to detect as most of the time the answer will be correct. Try running the multithreaded counter code example with 1,000 threads instead of 50,000, and you will see this in action. I got the correct answer nine times out of ten. So, this situation can be summarized as "same code, occasionally different results." Like I said, race conditions are evil! Luckily, eradicating them is straightforward if you take a few precautions. The key is to identify and protect *critical sections*. A critical section is a section of code that updates shared data structures and hence must be executed atomically if accessed by multiple threads. The example of incrementing a shared counter is an example of a critical section. Another is removing an item from a list. We need to delete the head node of the list and move the reference to the head of the list from the removed node to the next node in the list. Both operations must be performed atomically to maintain the integrity of the list. This is a critical section. In Java, the synchronized keyword defines a critical section. If used to decorate a method, then when multiple threads attempt to call that method on the same shared object, only one is permitted to enter the critical section. All others block until the thread exits the synchronized method, at which point the scheduler chooses the next thread to execute the critical section. We say the execution of the critical section is serialized, as only one thread at a time can be executing the code inside it. To fix the counterexample, you therefore just need to identify the inc() method as a critical section and make it a synchronized method, i.e.: synchronized public void inc() { count++; } Test it out as many times as you like. You'll always get the correct answer. Slightly more formally, this means any interleaving of the threads that the scheduler throws at us will always produce the correct results. The synchronized keyword can also be applied to blocks of statements within a method. For example, we could rewrite the above example as: public void inc() { synchronized(this){ count++; } } Underneath the covers, every Java object has a *monitor lock*, sometimes known as an intrinsic lock, as part of its runtime representation. The monitor is like the bathroom on a long-distance bus---only one person is allowed to (and should!) enter at once, and the door lock stops others from entering when in use. In our totally sanitary Java runtime environment, a thread must acquire the monitor lock to enter a synchronized method or synchronized block of statements. Only one thread can own the lock at any time, and hence execution is serialized. This, very basically, is how Java and similar languages implement critical sections. As a rule of thumb, you should keep critical sections as small as possible so that the serialized code is minimized. This can have positive impacts on performance and hence scalability. I'll return to this topic later, but I'm really talking about [[Amdahl's law]](https://oreil.ly/kLFs1) again, as introduced in [[Chapter 2]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch02.html#distributed_systems_architectures_an_in). Synchronized blocks are the serialized parts of a system as described by Amdahl, and the longer they execute for, then the less potential we have for system scalability. **Deadlocks** To ensure correct results in multithreaded code, I explained that we have to restrict the inherent nondeterminism to serialize access to critical sections. This avoids race conditions. However, if we are not careful, we can write code that restricts nondeterminism so much that our program stops. And never continues. This is formally known as a deadlock. A deadlock occurs when two or more threads are blocked forever, and none can proceed. This happens when threads need exclusive access to a shared set of resources and the threads acquire locks in different orders. This is illustrated in the example below in which two threads need exclusive access to critical sections A and B. Thread 1 acquires the lock for critical section A, and thread 2 acquires the lock for critical section B. Both then block forever as they cannot acquire the locks they need to continue. Two threads sharing access to two shared variables via synchronized blocks: - - - - - A deadlock, also known as a deadly embrace, causes a program to stop. It doesn't take a vivid imagination to realize that this can cause all sorts of undesirable outcomes. I'm happily texting away while my autonomous vehicle drives me to the bar. Suddenly, the vehicle code deadlocks. It won't end well. Deadlocks occur in more subtle circumstances than the simple example above. The classic example is the dining philosophers problem. The story goes like this. Five philosophers sit around a shared table. Being philosophers, they spend a lot of time thinking deeply. In between bouts of deep thinking, they replenish their brain function by eating from a plate of food that sits in front of them. Hence a philosopher is either eating or thinking or transitioning between these two states. In addition, the philosophers must all be physically very close, highly dexterous, and COVID-19 vaccinated friends, as they share chopsticks to eat. Only five chopsticks are on the table, placed between each philosopher. When one philosopher wishes to eat, they follow a protocol of picking up their left chopstick first, then their right chopstick. Once they are ready to think again, they first return the right chopstick, then the left. [[Figure 4-4]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#the_dining_philosophers_problem) depicts our philosophers, each identified by a unique number. As each is either concurrently eating or thinking, we can model each philosopher as a thread. The dining philosophers problem Figure 4-4. The dining philosophers problem The code is shown in [[Example 4-1]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#example_four-onedot_the_philosopher_thr). The shared chopsticks are represented by instances of the Java Object class. As only one object can hold the monitor lock on an object at any time, they are used as entry conditions to the critical sections in which the philosophers acquire the chopsticks they need to eat. After eating, the chopsticks are returned to the table and the lock is released on each so that neighboring philosophers can eat whenever they are ready. **Example 4-1. The philosopher thread** public class Philosopher implements Runnable { private final Object leftChopStick; private final Object rightChopStick; Philosopher(Object leftChopStick, Object rightChopStick) { this.leftChopStick = leftChopStick; this.rightChopStick = rightChopStick; } private void LogEvent(String event) throws InterruptedException { System.out.println(Thread.currentThread().getName() + \" \" + event); Thread.sleep(1000); } public void run() { try { while (true) { LogEvent(\": Thinking deeply\"); synchronized (leftChopStick) { LogEvent( \": Picked up left chopstick\"); synchronized (rightChopStick) { LogEvent(\": Picked up right chopstick -- eating\"); LogEvent(\": Put down right chopstick\"); } LogEvent(\": Put down left chopstick. Ate too much\"); } } // end while } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } To bring the philosophers described in [[Example 4-1]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#example_four-onedot_the_philosopher_thr) to life, we must instantiate a thread for each and give each philosopher access to their neighboring chopsticks. This is done through the thread constructor call at !(media/image12.png) in [[Example 4-2]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#example_four-twodot_dining_philosophers). In the for loop we create five philosophers and start these as independent threads, where each chopstick is accessible to two threads, one as a left chopstick, and one as a right. **Example 4-2. Dining philosophers---deadlocked version** private final static int NUMCHOPSTICKS = 5 ; private final static int NUMPHILOSOPHERS = 5; public static void main(String\[\] args) throws Exception { final Philosopher\[\] ph = new Philosopher\[NUMPHILOSOPHERS\]; Object\[\] chopSticks = new Object\[NUMCHOPSTICKS\]; for (int i = 0; i \< NUMCHOPSTICKS; i++) { chopSticks\[i\] = new Object(); } for (int i = 0; i \< NUMPHILOSOPHERS; i++) { Object leftChopStick = chopSticks\[i\]; Object rightChopStick = chopSticks\[(i + 1) % chopSticks.length\]; ph\[i\] = new Philosopher(leftChopStick, rightChopStick); 1 Thread th = new Thread(ph\[i\], \"Philosopher \" + i); th.start(); } } Running this code produces the following output on my first attempt. If you run the code you will almost certainly see different outputs, but the final outcome will be the same: Philosopher 3 : Thinking deeply Philosopher 4 : Thinking deeply Philosopher 0 : Thinking deeply Philosopher 1 : Thinking deeply Philosopher 2 : Thinking deeply Philosopher 3 : Picked up left chopstick Philosopher 0 : Picked up left chopstick Philosopher 2 : Picked up left chopstick Philosopher 4 : Picked up left chopstick Philosopher 1 : Picked up left chopstick Ten lines of output, then...nothing! We have a deadlock. This is a classic circular waiting deadlock. Imagine the following scenario: - - - Real philosophers in this situation would figure out some way to proceed by putting down a chopstick or two until one or more of their colleagues can eat. We can sometimes do this in our software by using timeouts on blocking operations. When the timeout expires, a thread releases the critical section and retries, allowing other blocked threads a chance to proceed. This is not optimal though, as blocked threads hurt performance and setting timeout values is an inexact science. It is much better, therefore, to design a solution to be deadlock-free. This means that one or more threads will always be able to make progress. With circular wait deadlocks, this can be achieved by imposing a resource allocation protocol on the shared resources, so that threads will not always request resources in the same order. In the dining philosophers problem, we can do this by making sure one of our philosophers picks up their right chopstick first. Let's assume we instruct Philosopher 4 to do this. This leads to a possible sequence of operations such as below: - - - - - In this example, Philosopher 4 must block, as Philosopher 0 already has acquired access to chopstick\[0\]. With Philosopher 4 blocked, Philosopher 3 is assured access to chopstick\[4\] and can then proceed to satisfy their appetite. The fix for the dining philosophers solution is shown in [[Example 4-3]](https://learning.oreilly.com/library/view/foundations-of-scalable/9781098106058/ch04.html#example_four-threedot_solving_the_dinin). **Example 4-3. Solving the dining philosophers deadlock** if (i == NUMPHILOSOPHERS - 1) { // The last philosopher picks up the right chopstick first ph\[i\] = new Philosopher(rightChopStick, leftChopStick); } else { // all others pick up the left chopstick first ph\[i\] = new Philosopher(leftChopStick, rightChopStick); } More formally we are imposing an ordering on the acquisition of shared resources, such that: chopStick\[0\] \

Use Quizgecko on...
Browser
Browser