Database Management Systems (Ramakrishnan) - Parallel & Distributed Databases - PDF

Document Details

TopnotchMoon

Uploaded by TopnotchMoon

2018

Raghu Ramakrishnan

Tags

database management systems parallel databases distributed databases computer science

Summary

This chapter from Database Management Systems by Ramakrishnan (2018) explores parallel and distributed database systems. It covers core concepts like parallelism architectures, performance, data partitioning, and query optimization. The material is suitable for undergraduate-level computer science study.

Full Transcript

22 PARALLEL AND DISTRIBUTED DATABASES ☛ What is the motivation for parallel and distributed DBMSs? ☛ What are the alternative architectures for parallel database systems? ☛ How are pipelining and data partitioning used to gain parallel...

22 PARALLEL AND DISTRIBUTED DATABASES ☛ What is the motivation for parallel and distributed DBMSs? ☛ What are the alternative architectures for parallel database systems? ☛ How are pipelining and data partitioning used to gain parallelism? ☛ How are dataflow concepts used to parallelize existing sequential code? ☛ What are alternative architectures for distributed DBMSs? ☛ How is data distributed across sites? ☛ How can we evaluate and optimize queries over distributed data? ☛ What are the merits of synchronous vs. asynchronous replication? ☛ How are transactions managed in a distributed environment? ➽ Key concepts: parallel DBMS architectures; performance, speed- up and scale-up; pipelined versus data-partitioned parallelism, block- ing; partitioning strategies; dataflow operators; distributed DBMS architectures; heterogeneous systems; gateway protocols; data distri- bution, distributed catalogs; semijoins, data shipping; synchronous versus asynchronous replication; distributed transactions, lock man- agement, deadlock detection, two-phase commit, Presumed Abort No man is an island, entire of itself; every man is a piece of the continent, a part of the main. —John Donne 725 726 Chapter 22 In this chapter we look at the issues of parallelism and data distribution in a DBMS. We begin by introducing parallel and distributed database systems in Section 22.1. In Section 22.2, we discuss alternative hardware configurations for a parallel DBMS. In Section 22.3, we introduce the concept of data partitioning and consider its influence on parallel query evaluation. In Section 22.4, we show how data partitioning can be used to parallelize several relational operations. In Section 22.5, we conclude our treatment of parallel query processing with a discussion of parallel query optimization. The rest of the chapter is devoted to distributed databases. We present an overview of distributed databases in Section 22.6. We discuss some alterna- tive architectures for a distributed DBMS in Section 22.7 and describe options for distributing data in Section 22.8. We describe distributed catalog man- agement in Section 22.9, then in Section 22.10, we discuss query optimization and evaluation for distributed databases. In Section 22.11, we discuss updating distributed data, and finally, in Sections 22.12 to 22.14 we describe distributed transaction management. 22.1 INTRODUCTION We have thus far considered centralized database management systems in which all the data is maintained at a single site and assumed that the processing of individual transactions is essentially sequential. One of the most important trends in databases is the increased use of parallel evaluation techniques and data distribution. A parallel database system seeks to improve performance through paral- lelization of various operations, such as loading data, building indexes, and evaluating queries. Although data may be stored in a distributed fashion in such a system, the distribution is governed solely by performance considera- tions. In a distributed database system, data is physically stored across several sites, and each site is typically managed by a DBMS capable of running in- dependent of the other sites. The location of data items and the degree of autonomy of individual sites have a significant impact on all aspects of the system, including query optimization and processing, concurrency control, and recovery. In contrast to parallel databases, the distribution of data is governed by factors such as local ownership and increased availability, in addition to performance issues. While parallelism is motivated by performance considerations, several distinct issues motivate data distribution: Parallel and Distributed Databases 727 Increased Availability: If a site containing a relation goes down, the relation continues to be available if a copy is maintained at another site. Distributed Access to Data: An organization may have branches in several cities. Although analysts may need to access data corresponding to different sites, we usually find locality in the access patterns (e.g., a bank manager is likely to look up the accounts of customers at the local branch), and this locality can be exploited by distributing the data accordingly. Analysis of Distributed Data: Organizations want to examine all the data available to them, even when it is stored across multiple sites and on multiple database systems. Support for such integrated access involves many issues; even enabling access to widely distributed data can be a challenge. 22.2 ARCHITECTURES FOR PARALLEL DATABASES The basic idea behind parallel databases is to carry out evaluation steps in par- allel whenever possible, and there are many such opportunities in a relational DBMS; databases represent one of the most successful instances of parallel computing. M M M Interconnection Network P P P P P P P P P Interconnection Network M M M Global Shared Memory Interconnection Network D D D D D D D D D SHARED NOTHING SHARED MEMORY SHARED DISK Figure 22.1 Physical Architectures for Parallel Database Systems Three main architectures have been proposed for building parallel DBMSs. In a shared-memory system, multiple CPUs are attached to an interconnection network and can access a common region of main memory. In a shared-disk system, each CPU has a private memory and direct access to all disks through an interconnection network. In a shared-nothing system, each CPU has local main memory and disk space, but no two CPUs can access the same storage area; all communication between CPUs is through a network connection. The three architectures are illustrated in Figure 22.1. 728 Chapter 22 The shared-memory architecture is closer to a conventional machine, and many commercial database systems have been ported to shared memory platforms with relative ease. Communication overhead is low, because main memory can be used for this purpose, and operating system services can be leveraged to utilize the additional CPUs. Although this approach is attractive for achieving moderate parallelism—a few tens of CPUs can be exploited in this fashion— memory contention becomes a bottleneck as the number of CPUs increases. The shared-disk architecture faces a similar problem because large amounts of data are shipped through the interconnection network. The basic problem with the shared-memory and shared-disk architectures is in- terference: As more CPUs are added, existing CPUs are slowed down because of the increased contention for memory accesses and network bandwidth. It has been noted that even an average 1 percent slowdown per additional CPU means that the maximum speed-up is a factor of 37, and adding additional CPUs ac- tually slows down the system; a system with 1000 CPUs is only 4 percent as effective as a single-CPU system! This observation has motivated the develop- ment of the shared-nothing architecture, which is now widely considered to be the best architecture for large parallel database systems. The shared-nothing architecture requires more extensive reorganization of the DBMS code, but it has been shown to provide linear speed-up, in that the time taken for operations decreases in proportion to the increase in the number of CPUs and disks, and linear scale-up, in that performance is sustained if the number of CPUs and disks are increased in proportion to the amount of data. Consequently, ever-more-powerful parallel database systems can be built by taking advantage of rapidly improving performance for single-CPU systems and connecting as many CPUs as desired. Speed-up and scale-up are illustrated in Figure 22.2. The speed-up curves show how, for a fixed database size, more transactions can be executed per second by adding CPUs. The scale-up curves show how adding more resources (in the form of CPUs) enables us to process larger problems. The first scale-up graph measures the number of transactions executed per second as the database size is increased and the number of CPUs is correspondingly increased. An alternative way to measure scale-up is to consider the time taken per transaction as more CPUs are added to process an increasing number of transactions per second; the goal here is to sustain the response time per transaction. 22.3 PARALLEL QUERY EVALUATION In this section, we discuss parallel evaluation of a relational query in a DBMS with a shared-nothing architecture. While it is possible to consider parallel Parallel and Distributed Databases 729 SPEED-UP SCALE-UP with DB SIZE SCALE-UP with # XACTS/SEC # transactions per second # transactions per second Linear speed-up (ideal) Linear scale-up (ideal) Sublinear scale-up Time per transaction Sublinear scale-up Linear scale-up (ideal) Sublinear speed-up # of CPUs # of CPUs, database size # of CPUs, # transactions per second Figure 22.2 Speed-up and Scale-up execution of multiple queries, it is hard to identify in advance which queries will run concurrently. So the emphasis has been on parallel execution of a single query. A relational query execution plan is a graph of relational algebra operators, and the operators in a graph can be executed in parallel. If one operator consumes the output of a second operator, we have pipelined parallelism (the output of the second operator is worked on by the first operator as soon as it is generated); if not, the two operators can proceed essentially independently. An operator is said to block if it produces no output until it has consumed all its inputs. Pipelined parallelism is limited by the presence of operators (e.g., sorting or aggregation) that block. In addition to evaluating different operators in parallel, we can evaluate each individual operator in a query plan in a parallel fashion. The key to evaluating an operator in parallel is to partition the input data; we can then work on each partition in parallel and combine the results. This approach is called data-partitioned parallel evaluation. By exercising some care, existing code for sequentially evaluating relational operators can be ported easily for data-partitioned parallel evaluation. An important observation, which explains why shared-nothing parallel database systems have been very successful, is that database query evaluation is very amenable to data-partitioned parallel evaluation. The goal is to minimize data shipping by partitioning the data and structuring the algorithms to do most of the processing at individual processors. (We use processor to refer to a CPU together with its local disk.) We now consider data partitioning and parallelization of existing operator eval- uation code in more detail. 730 Chapter 22 22.3.1 Data Partitioning Partitioning a large dataset horizontally across several disks enables us to ex- ploit the I/O bandwidth of the disks by reading and writing them in parallel. There are several ways to horizontally partition a relation. We can assign tuples to processors in a round-robin fashion, we can use hashing, or we can assign tuples to processors by ranges of field values. If there are n processors, the ith tuple is assigned to processor i mod n in round-robin partitioning. Recall that round-robin partitioning is used in RAID storage systems (see Section 9.2). In hash partitioning, a hash function is applied to (selected fields of) a tuple to determine its processor. In range partitioning, tuples are sorted (con- ceptually), and n ranges are chosen for the sort key values so that each range contains roughly the same number of tuples; tuples in range i are assigned to processor i. Round-robin partitioning is suitable for efficiently evaluating queries that ac- cess the entire relation. If only a subset of the tuples (e.g., those that satisfy the selection condition age = 20) is required, hash partitioning and range par- titioning are better than round-robin partitioning because they enable us to access only those disks that contain matching tuples. (Of course, this state- ment assumes that the tuples are partitioned on the attributes in the selection condition; if age = 20 is specified, the tuples must be partitioned on age.) If range selections such as 15 < age < 25 are specified, range partitioning is su- perior to hash partitioning because qualifying tuples are likely to be clustered together on a few processors. On the other hand, range partitioning can lead to data skew; that is, partitions with widely varying numbers of tuples across partitions or disks. Skew causes processors dealing with large partitions to become performance bottlenecks. Hash partitioning has the additional virtue that it keeps data evenly distributed even if the data grows and shrinks over time. To reduce skew in range partitioning, the main question is how to choose the ranges by which tuples are distributed. One effective approach is to take sam- ples from each processor, collect and sort all samples, and divide the sorted set of samples into equally sized subsets. If tuples are to be partitioned on age, the age ranges of the sampled subsets of tuples can be used as the basis for redistributing the entire relation. 22.3.2 Parallelizing Sequential Operator Evaluation Code An elegant software architecture for parallel DBMSs enables us to readily par- allelize existing code for sequentially evaluating a relational operator. The basic idea is to use parallel data streams. Streams (from different disks or Parallel and Distributed Databases 731 the output of other operators) are merged as needed to provide the inputs for a relational operator, and the output of an operator is split as needed to parallelize subsequent processing. A parallel evaluation plan consists of a dataflow network of relational, merge, and split operators. The merge and split operators should be able to buffer some data and should be able to halt the operators producing their input data. They can then regulate the speed of the execution according to the execution speed of the operator that consumes their output. As we will see, obtaining good parallel versions of algorithms for sequential operator evaluation requires careful consideration; there is no magic formula for taking sequential code and producing a parallel version. Good use of split and merge in a dataflow software architecture, however, can greatly reduce the effort of implementing parallel query evaluation algorithms, as we illustrate in Section 22.4.3. 22.4 PARALLELIZING INDIVIDUAL OPERATIONS This section shows how various operations can be implemented in parallel in a shared-nothing architecture. We assume that each relation is horizontally partitioned across several disks, although this partitioning may or may not be appropriate for a given query. The evaluation of a query must take the initial partitioning criteria into account and repartition if necessary. 22.4.1 Bulk Loading and Scanning We begin with two simple operations: scanning a relation and loading a relation. Pages can be read in parallel while scanning a relation, and the retrieved tuples can then be merged, if the relation is partitioned across several disks. More generally, the idea also applies when retrieving all tuples that meet a selection condition. If hashing or range partitioning is used, selection queries can be answered by going to just those processors that contain relevant tuples. A similar observation holds for bulk loading. Further, if a relation has asso- ciated indexes, any sorting of data entries required for building the indexes during bulk loading can also be done in parallel (see later). 732 Chapter 22 22.4.2 Sorting A simple idea is to let each CPU sort the part of the relation that is on its local disk and then merge these sorted sets of tuples. The degree of parallelism is likely to be limited by the merging phase. A better idea is to first redistribute all tuples in the relation using range par- titioning. For example, if we want to sort a collection of employee tuples by salary, salary values range from 10 to 210, and we have 20 processors, we could send all tuples with salary values in the range 10 to 20 to the first processor, all in the range 21 to 30 to the second processor, and so on. (Prior to the redis- tribution, while tuples are distributed across the processors, we cannot assume that they are distributed according to salary ranges.) Each processor then sorts the tuples assigned to it, using some sequential sorting algorithm. For example, a processor can collect tuples until its memory is full, then sort these tuples and write out a run, until all incoming tuples have been written to such sorted runs on the local disk. These runs can then be merged to create the sorted version of the set of tuples assigned to this processor. The entire sorted relation can be retrieved by visiting the processors in an order corresponding to the ranges assigned to them and simply scanning the tuples. The basic challenge in parallel sorting is to do the range partitioning so that each processor receives roughly the same number of tuples; otherwise, a proces- sor that receives a disproportionately large number of tuples to sort becomes a bottleneck and limits the scalability of the parallel sort. One good approach to range partitioning is to obtain a sample of the entire relation by taking samples at each processor that initially contains part of the relation. The (relatively small) sample is sorted and used to identify ranges with equal numbers of tu- ples. This set of range values, called a splitting vector, is then distributed to all processors and used to range partition the entire relation. A particularly important application of parallel sorting is sorting the data en- tries in tree-structured indexes. Sorting data entries can significantly speed up the process of bulk-loading an index. 22.4.3 Joins In this section, we consider how the join operation can be parallelized. We present the basic idea behind the parallelization and illustrate the use of the merge and split operators described in Section 22.3.2. We focus on parallel hash join, which is widely used, and briefly outline how sort-merge join can Parallel and Distributed Databases 733 be similarly parallelized. Other join algorithms can be parallelized as well, although not as effectively as these two algorithms. Suppose that we want to join two relations, say, A and B, on the age attribute. We assume that they are initially distributed across several disks in some way that is not useful for the join operation; that is, the initial partitioning is not based on the join attribute. The basic idea for joining A and B in parallel is to decompose the join into a collection of k smaller joins. We can decompose the join by partitioning both A and B into a collection of k logical buckets or partitions. By using the same partitioning function for both A and B, we ensure that the union of the k smaller joins computes the join of A and B; this idea is similar to intuition behind the partitioning phase of a sequential hash join, described in Section 14.4.3. Because A and B are initially distributed across several processors, the partitioning step itself can be done in parallel at these processors. At each processor, all local tuples are retrieved and hashed into one of k partitions, with the same hash function used at all sites, of course. Alternatively, we can partition A and B by dividing the range of the join at- tribute age into k disjoint subranges and placing A and B tuples into partitions according to the subrange to which their age values belong. For example, sup- pose that we have 10 processors, the join attribute is age, with values from 0 to 100. Assuming uniform distribution, A and B tuples with 0 ≤ age < 10 go to processor 1, 10 ≤ age < 20 go to processor 2, and so on. This approach is likely to be more susceptible than hash partitioning to data skew (i.e., the number of tuples to be joined can vary widely across partitions), unless the subranges are carefully determined; we do not discuss how good subrange boundaries can be identified. Having decided on a partitioning strategy, we can assign each partition to a processor and carry out a local join, using any join algorithm we want, at each processor. In this case, the number of partitions k is chosen to be equal to the number of processors n available for carrying out the join, and during partitioning, each processor sends tuples in the ith partition to processor i. After partitioning, each processor joins the A and B tuples assigned to it. Each join process executes sequential join code and receives input A and B tuples from several processors; a merge operator merges all incoming A tuples, and another merge operator merges all incoming B tuples. Depending on how we want to distribute the result of the join of A and B, the output of the join process may be split into several data streams. The network of operators for parallel join is shown in Figure 22.3. To simplify the figure, we assume that the processors doing the join are distinct from the processors that initially contain tuples of A and B and show only four processors. 734 Chapter 22 Ai Aj Bi Bj Ai Aj Bi Bj Ai Ai Bi Bi Aj Aj Bj Bj SPLIT SPLIT SPLIT SPLIT MERGE MERGE MERGE MERGE SCAN SCAN SCAN SCAN JOIN JOIN A B A B A’ B’ A" B" Ai Bi Aj Bj Figure 22.3 Dataflow Network of Operators for Parallel Join If range partitioning is used, this algorithm leads to a parallel version of a sort- merge join, with the advantage that the output is available in sorted order. If hash partitioning is used, we obtain a parallel version of a hash join. Improved Parallel Hash Join A hash-based refinement of the approach offers improved performance. The main observation is that, if A and B are very large and the number of partitions k is chosen to be equal to the number of processors n, the size of each partition may still be large, leading to a high cost for each local join at the n processors. An alternative is to execute the smaller joins Ai Bi , for i = 1... k, one after the other, but with each join executed in parallel using all processors. This approach allows us to utilize the total available main memory at all n processors in each join Ai Bi and is described in more detail as follows: 1. At each site, apply a hash function h1 to partition the A and B tuples at this site into partitions i = 1... k. Let A be the smaller relation. The number of partitions k is chosen such that each partition of A fits into the aggregate or combined memory of all n processors. 2. For i = 1... k, process the join of the ith partitions of A and B. To compute Ai Bi , do the following at every site: (a) Apply a second hash function h2 to all Ai tuples to determine where they should be joined and send tuple t to site h2(t). (b) As Ai tuples arrive to be joined, add them to an in-memory hash table. Parallel and Distributed Databases 735 (c) After all Ai tuples have been distributed, apply h2 to Bi tuples to determine where they should be joined and send tuple t to site h2(t). (d) As Bi tuples arrive to be joined, probe the in-memory table of Ai tuples and output result tuples. The use of the second hash function h2 ensures that tuples are (more or less) uniformly distributed across all n processors participating in the join. This approach greatly reduces the cost for each of the smaller joins and therefore reduces the overall join cost. Observe that all available processors are fully utilized, even though the smaller joins are carried out one after the other. The reader is invited to adapt the network of operators shown in Figure 22.3 to reflect the improved parallel join algorithm. 22.5 PARALLEL QUERY OPTIMIZATION In addition to parallelizing individual operations, we can obviously execute dif- ferent operations in a query in parallel and execute multiple queries in parallel. Optimizing a single query for parallel execution has received more attention; systems typically optimize queries without regard to other queries that might be executing at the same time. Two kinds of interoperation parallelism can be exploited within a query: The result of one operator can be pipelined into another. For example, consider a left-deep plan in which all the joins use index nested loops. The result of the first (i.e., the bottommost) join is the outer relation tuples for the next join node. As tuples are produced by the first join, they can be used to probe the inner relation in the second join. The result of the second join can similarly be pipelined into the next join, and so on. Multiple independent operations can be executed concurrently. For exam- ple, consider a (bushy) plan in which relations A and B are joined, relations C and D are joined, and the results of these two joins are finally joined. Clearly, the join of A and B can be executed concurrently with the join of C and D. An optimizer that seeks to parallelize query evaluation has to consider several issues, and we only outline the main points. The cost of executing individual operations in parallel (e.g., parallel sorting) obviously differs from executing them sequentially, and the optimizer should estimate operation costs accord- ingly. 736 Chapter 22 Next, the plan that returns answers quickest may not be the plan with the least cost. For example, the cost of A B plus the cost of C D plus the cost of joining their results may be more than the cost of the cheapest left-deep plan. However, the time taken is the time for the more expensive of A B and C D plus the time to join their results. This time may be less than the time taken by the cheapest left-deep plan. This observation suggests that a parallelizing optimizer should not restrict itself to left-deep trees and should also consider bushy trees, which significantly enlarge the space of plans to be considered. Finally, a number of parameters, such as available buffer space and the num- ber of free processors, are known only at run-time. This comment holds in a multiuser environment even if only sequential plans are considered; a multiuser environment is a simple instance of interquery parallelism. 22.6 INTRODUCTION TO DISTRIBUTED DATABASES As we observed earlier, data in a distributed database system is stored across several sites, and each site is typically managed by a DBMS that can run inde- pendent of the other sites. The classical view of a distributed database system is that the system should make the impact of data distribution transparent. In particular, the following properties are considered desirable: Distributed Data Independence: Users should be able to ask queries without specifying where the referenced relations, or copies or fragments of the relations, are located. This principle is a natural extension of phys- ical and logical data independence; we discuss it in Section 22.8. Further, queries that span multiple sites should be optimized systematically in a cost-based manner, taking into account communication costs and differ- ences in local computation costs. We discuss distributed query optimiza- tion in Section 22.10. Distributed Transaction Atomicity: Users should be able to write transactions that access and update data at several sites just as they would write transactions over purely local data. In particular, the effects of a transaction across sites should continue to be atomic; that is, all changes persist if the transaction commits and none persist if it aborts. We discuss this distributed transaction processing in Sections 22.11, 22.13, and 22.14. Although most people would agree that these properties are in general desir- able, in certain situations, such as when sites are connected by a slow long- distance network, these properties are not efficiently achievable. Indeed, it has been argued that, when sites are globally distributed, these properties are not even desirable. The argument essentially is that the administrative overhead Parallel and Distributed Databases 737 of supporting a system with distributed data independence and transaction atomicity—in effect, coordinating all activities across all sites to support the view of the whole as a unified collection of data—is prohibitive, over and above DBMS performance considerations. Keep these remarks about distributed databases in mind as we cover the topic in more detail in the rest of this chapter. There is no real consensus on what the design objectives of distributed databases should be, and the field is evolving in response to users’ needs. 22.6.1 Types of Distributed Databases If data is distributed but all servers run the same DBMS software, we have a homogeneous distributed database system. If different sites run under the control of different DBMSs, essentially autonomously, and are connected somehow to enable access to data from multiple sites, we have a heteroge- neous distributed database system, also referred to as a multidatabase system. The key to building heterogeneous systems is to have well-accepted standards for gateway protocols. A gateway protocol is an API that exposes DBMS functionality to external applications. Examples include ODBC and JDBC (see Section 6.2). By accessing database servers through gateway protocols, their differences (in capability, data format, etc.) are masked, and the differences between the different servers in a distributed system are bridged to a large degree. Gateways are not a panacea, however. They add a layer of processing that can be expensive, and they do not completely mask the differences among servers. For example, a server may not be capable of providing the services required for distributed transaction management (see Sections 22.13 and 22.14), and even if it is capable, standardizing gateway protocols all the way down to this level of interaction poses challenges that have not yet been resolved satisfactorily. Distributed data management, in the final analysis, comes at a significant cost in terms of performance, software complexity, and administration difficulty. This observation is especially true of heterogeneous systems. 22.7 DISTRIBUTED DBMS ARCHITECTURES Three alternative approaches are used to separate functionality across different DBMS-related processes; these alternative distributed DBMS architectures are called Client-Server, Collaborating Server, and Middleware. 738 Chapter 22 22.7.1 Client-Server Systems A Client-Server system has one or more client processes and one or more server processes, and a client process can send a query to any one server process. Clients are responsible for user-interface issues, and servers manage data and execute transactions. Thus, a client process could run on a personal computer and send queries to a server running on a mainframe. This architecture has become very popular for several reasons. First, it is rel- atively simple to implement due to its clean separation of functionality and because the server is centralized. Second, expensive server machines are not underutilized by dealing with mundane user-interactions, which are now rel- egated to inexpensive client machines. Third, users can run a graphical user interface that they are familiar with, rather than the (possibly unfamiliar and unfriendly) user interface on the server. While writing Client-Server applications, it is important to remember the boundary between the client and the server and keep the communication be- tween them as set-oriented as possible. In particular, opening a cursor and fetching tuples one at a time generates many messages and should be avoided. (Even if we fetch several tuples and cache them at the client, messages must be exchanged when the cursor is advanced to ensure that the current row is locked.) Techniques to exploit client-side caching to reduce communication overhead have been studied extensively, although we do not discuss them fur- ther. 22.7.2 Collaborating Server Systems The Client-Server architecture does not allow a single query to span multiple servers because the client process would have to be capable of breaking such a query into appropriate subqueries to be executed at different sites and then piecing together the answers to the subqueries. The client process would there- fore be quite complex, and its capabilities would begin to overlap with the server; distinguishing between clients and servers becomes harder. Eliminating this distinction leads us to an alternative to the Client-Server architecture: a Collaborating Server system. We can have a collection of database servers, each capable of running transactions against local data, which cooperatively execute transactions spanning multiple servers. When a server receives a query that requires access to data at other servers, it generates appropriate subqueries to be executed by other servers and puts the results together to compute answers to the original query. Ideally, the decom- Parallel and Distributed Databases 739 position of the query should be done using cost-based optimization, taking into account the cost of network communication as well as local processing costs. 22.7.3 Middleware Systems The Middleware architecture is designed to allow a single query to span mul- tiple servers, without requiring all database servers to be capable of managing such multi-site execution strategies. It is especially attractive when trying to integrate several legacy systems, whose basic capabilities cannot be extended. The idea is that we need just one database server capable of managing queries and transactions spanning multiple servers; the remaining servers need to han- dle only local queries and transactions. We can think of this special server as a layer of software that coordinates the execution of queries and transactions across one or more independent database servers; such software is often called middleware. The middleware layer is capable of executing joins and other relational operations on data obtained from the other servers but, typically, does not itself maintain any data. 22.8 STORING DATA IN A DISTRIBUTED DBMS In a distributed DBMS, relations are stored across several sites. Accessing a relation stored at a remote site incurs message-passing costs and, to reduce this overhead, a single relation may be partitioned or fragmented across several sites, with fragments stored at the sites where they are most often accessed or replicated at each site where the relation is in high demand. 22.8.1 Fragmentation Fragmentation consists of breaking a relation into smaller relations or frag- ments and storing the fragments (instead of the relation itself), possibly at different sites. In horizontal fragmentation, each fragment consists of a subset of rows of the original relation. In vertical fragmentation, each frag- ment consists of a subset of columns of the original relation. Horizontal and vertical fragments are illustrated in Figure 22.4. Typically, the tuples that belong to a given horizontal fragment are identified by a selection query; for example, employee tuples might be organized into fragments by city, with all employees in a given city assigned to the same frag- ment. The horizontal fragment shown in Figure 22.4 corresponds to Chicago. By storing fragments in the database site at the corresponding city, we achieve locality of reference—Chicago data is most likely to be updated and queried 740 Chapter 22 TID eid name city age sal t1 53666 Jones Madras 18 35 t2 53688 Smith Chicago 18 32 t3 53650 Smith Chicago 19 48 t4 53831 Madayan Bombay 11 20 t5 53832 Guldu Bombay 12 20 Vertical Fragment Horizontal Fragment Figure 22.4 Horizontal and Vertical Fragmentation from Chicago, and storing this data in Chicago makes it local (and reduces communication costs) for most queries. Similarly, the tuples in a given ver- tical fragment are identified by a projection query. The vertical fragment in the figure results from projection on the first two columns of the employees relation. When a relation is fragmented, we must be able to recover the original relation from the fragments: Horizontal Fragmentation: The union of the horizontal fragments must be equal to the original relation. Fragments are usually also required to be disjoint. Vertical Fragmentation: The collection of vertical fragments should be a lossless-join decomposition, as per the definition in Chapter 19. To ensure that a vertical fragmentation is lossless-join, systems often assign a unique tuple id to each tuple in the original relation, as shown in Figure 22.4, and attach this id to the projection of the tuple in each fragment. If we think of the original relation as containing an additional tuple-id field that is a key, this field is added to each vertical fragment. Such a decomposition is guaranteed to be lossless-join. In general, a relation can be (horizontally or vertically) fragmented, and each resulting fragment can be further fragmented. For simplicity of exposition, in the rest of this chapter, we assume that fragments are not recursively parti- tioned in this manner. Parallel and Distributed Databases 741 22.8.2 Replication Replication means that we store several copies of a relation or relation frag- ment. An entire relation can be replicated at one or more sites. Similarly, one or more fragments of a relation can be replicated at other sites. For example, if a relation R is fragmented into R1, R2, and R3, there might be just one copy of R1, whereas R2 is replicated at two other sites and R3 is replicated at all sites. The motivation for replication is twofold: Increased Availability of Data: If a site that contains a replica goes down, we can find the same data at other sites. Similarly, if local copies of remote relations are available, we are less vulnerable to failure of commu- nication links. Faster Query Evaluation: Queries can execute faster by using a local copy of a relation instead of going to a remote site. The two kinds of replication, called synchronous and asynchronous replication, differ primarily in how replicas are kept current when the relation is modified (see Section 22.11). 22.9 DISTRIBUTED CATALOG MANAGEMENT Keeping track of data distributed across several sites can get complicated. We must keep track of how relations are fragmented and replicated—that is, how relation fragments are distributed across several sites and where copies of frag- ments are stored—in addition to the usual schema, authorization, and statisti- cal information. 22.9.1 Naming Objects If a relation is fragmented and replicated, we must be able to uniquely identify each replica of each fragment. Generating such unique names requires some care. If we use a global name-server to assign globally unique names, local autonomy is compromised; we want (users at) each site to be able to assign names to local objects without reference to names systemwide. The usual solution to the naming problem is to use names consisting of several fields. For example, we could have: 742 Chapter 22 A local name field, which is the name assigned locally at the site where the relation is created. Two objects at different sites could have the same local name, but two objects at a given site cannot have the same local name. A birth site field, which identifies the site where the relation was created, and where information is maintained about all fragments and replicas of the relation. These two fields identify a relation uniquely; we call the combination a global relation name. To identify a replica (of a relation or a relation fragment), we take the global relation name and add a replica-id field; we call the combination a global replica name. 22.9.2 Catalog Structure A centralized system catalog can be used but is vulnerable to failure of the site containing the catalog. An alternative is to maintain a copy of a global system catalog, which describes all the data at every site. Although this approach is not vulnerable to a single-site failure, it compromises site autonomy, just like the first solution, because every change to a local catalog must now be broadcast to all sites. A better approach, which preserves local autonomy and is not vulnerable to a single-site failure, was developed in the R* distributed database project, which was a successor to the System R project at IBM. Each site maintains a local catalog that describes all copies of data stored at that site. In addition, the catalog at the birth site for a relation is responsible for keeping track of where replicas of the relation (in general, of fragments of the relation) are stored. In particular, a precise description of each replica’s contents—a list of columns for a vertical fragment or a selection condition for a horizontal fragment—is stored in the birth site catalog. Whenever a new replica is created or a replica is moved across sites, the information in the birth site catalog for the relation must be updated. To locate a relation, the catalog at its birth site must be looked up. This catalog information can be cached at other sites for quicker access, but the cached information may become out of date if, for example, a fragment is moved. We would discover that the locally cached information is out of date when we use it to access the relation, and at that point, we must update the cache by looking up the catalog at the birth site of the relation. (The birth site of a relation is recorded in each local cache that describes the relation, and the birth site never changes, even if the relation is moved.) Parallel and Distributed Databases 743 22.9.3 Distributed Data Independence Distributed data independence means that users should be able to write queries without regard to how a relation is fragmented or replicated; it is the respon- sibility of the DBMS to compute the relation as needed (by locating suitable copies of fragments, joining the vertical fragments, and taking the union of horizontal fragments). In particular, this property implies that users should not have to specify the full name for the data objects accessed while evaluating a query. Let us see how users can be enabled to access relations without considering how the relations are distributed. The local name of a relation in the system catalog (Section 22.9.1) is really a combination of a user name and a user-defined relation name. Users can give whatever names they wish to their relations, without regard to the relations created by other users. When a user writes a program or SQL statement that refers to a relation, he or she simply uses the relation name. The DBMS adds the user name to the relation name to get a local name, then adds the user’s site-id as the (default) birth site to obtain a global relation name. By looking up the global relation name—in the local catalog if it is cached there or in the catalog at the birth site—the DBMS can locate replicas of the relation. A user may want to create objects at several sites or refer to relations created by other users. To do this, a user can create a synonym for a global relation name, using an SQL-style command (although such a command is not currently part of the SQL:1999 standard) and subsequently refer to the relation using the synonym. For each user known at a site, the DBMS maintains a table of synonyms as part of the system catalog at that site and uses this table to find the global relation name. Note that a user’s program runs unchanged even if replicas of the relation are moved, because the global relation name is never changed until the relation itself is destroyed. Users may want to run queries against specific replicas, especially if asyn- chronous replication is used. To support this, the synonym mechanism can be adapted to also allow users to create synonyms for global replica names. 22.10 DISTRIBUTED QUERY PROCESSING We first discuss the issues involved in evaluating relational algebra operations in a distributed database through examples and then outline distributed query optimization. Consider the following two relations: 744 Chapter 22 Sailors(sid: integer, sname: string, rating: integer, age: real) Reserves(sid: integer, bid: integer, day: date, rname: string) As in Chapter 14, assume that each tuple of Reserves is 40 bytes long, that a page can hold 100 Reserves tuples, and that we have 1000 pages of such tuples. Similarly, assume that each tuple of Sailors is 50 bytes long, that a page can hold 80 Sailors tuples, and that we have 500 pages of such tuples. To estimate the cost of an evaluation strategy, in addition to counting the number of page I/Os, we must count the number of pages sent from one site to another because communication costs are a significant component of overall cost in a distributed database. We must also change our cost model to count the cost of shipping the result tuples to the site where the query is posed from the site where the result is assembled! In this chapter, we denote the time taken to read one page from disk (or to write one page to disk) as td and the time taken to ship one page (from any site to another site) as ts. 22.10.1 Nonjoin Queries in a Distributed DBMS Even simple operations such as scanning a relation, selection, and projection are affected by fragmentation and replication. Consider the following query: SELECT S.age FROM Sailors S WHERE S.rating > 3 AND S.rating < 7 Suppose that the Sailors relation is horizontally fragmented, with all tuples having a rating less than 5 at Shanghai and all tuples having a rating greater than 5 at Tokyo. The DBMS must answer this query by evaluating it at both sites and taking the union of the answers. If the SELECT clause contained AVG (S.age), com- bining the answers could not be done by simply taking the union—the DBMS must compute the sum and count of age values at the two sites and use this information to compute the average age of all sailors. If the WHERE clause contained just the condition S.rating > 6, on the other hand, the DBMS should recognize that this query could be answered by just executing it at Tokyo. As another example, suppose that the Sailors relation were vertically frag- mented, with the sid and rating fields at Shanghai and the sname and age fields at Tokyo. No field is stored at both sites. This vertical fragmentation Parallel and Distributed Databases 745 would therefore be a lossy decomposition, except that a field containing the id of the corresponding Sailors tuple is included by the DBMS in both frag- ments! Now, the DBMS has to reconstruct the Sailors relation by joining the two fragments on the common tuple-id field and execute the query over this reconstructed relation. Finally, suppose that the entire Sailors relation were stored at both Shanghai and Tokyo. We could answer any of the previous queries by executing it at either Shanghai or Tokyo. Where should the query be executed? This depends on the cost of shipping the answer to the query site (which may be Shanghai, Tokyo, or some other site) as well as the cost of executing the query at Shanghai and at Tokyo—the local processing costs may differ depending on what indexes are available on Sailors at the two sites, for example. 22.10.2 Joins in a Distributed DBMS Joins of relations at different sites can be very expensive, and we now consider the evaluation options that must be considered in a distributed environment. Suppose that the Sailors relation were stored at London, and the Reserves relation were stored at Paris. We consider the cost of various strategies for computing Sailors Reserves. Fetch As Needed We could do a page-oriented nested loops join in London with Sailors as the outer, and for each Sailors page, fetch all Reserves pages from Paris. If we cache the fetched Reserves pages in London until the join is complete, pages are fetched only once, but assume that Reserves pages are not cached, just to see how bad things can get. (The situation can get much worse if we use a tuple-oriented nested loops join!) The cost is 500td to scan Sailors plus, for each Sailors page, the cost of scanning and shipping all of Reserves, which is 1000(td + ts ). The total cost is therefore 500td + 500,000(td + ts ). In addition, if the query was not submitted at the London site, we must add the cost of shipping the result to the query site; this cost depends on the size of the result. Because sid is a key for Sailors, the number of tuples in the result is 100,000 (the number of tuples in Reserves) and each tuple is 40 + 50 = 90 bytes long; thus 4000/90 = 44 result tuples fit on a page, and the result size is 100,000/44=2273 pages. The cost of shipping the answer to another site, if necessary, is 2273 ts. In the rest of this section, we assume that the query is 746 Chapter 22 posed at the site where the result is computed; if not, the cost of shipping the result to the query site must be added to the cost. In this example, observe that, if the query site is not London or Paris, the cost of shipping the result is greater than the cost of shipping both Sailors and Reserves to the query site! Therefore, it would be cheaper to ship both relations to the query site and compute the join there. Alternatively, we could do an index nested loops join in London, fetching all matching Reserves tuples for each Sailors tuple. Suppose we have an unclus- tered hash index on the sid column of Reserves. Because there are 100,000 Reserves tuples and 40,000 Sailors tuples, each sailor has on average 2.5 reser- vations. The cost of finding the 2.5 Reservations tuples that match a given Sailors tuple is (1.2 + 2.5)td , assuming 1.2 I/Os to locate the appropriate bucket in the index. The total cost is the cost of scanning Sailors plus the cost of finding and fetching matching Reserves tuples for each Sailors tuple, 500td + 40, 000(3.7td + 2.5ts ). Both algorithms fetch required Reserves tuples from a remote site as needed. Clearly, this is not a good idea; the cost of shipping tuples dominates the total cost even for a fast network. Ship to One Site We can ship Sailors from London to Paris and carry out the join there, ship Reserves to London and carry out the join there, or ship both to the site where the query was posed and compute the join there. Note again that the query could have been posed in London, Paris, or perhaps a third site, say, Timbuktu! The cost of scanning and shipping Sailors, saving it at Paris, then doing the join at Paris is 500(2td + ts ) + 4500td , assuming that the version of the sort- merge join described in Section 14.10 is used and we have an adequate number of buffer pages. In the rest of this section we assume that sort-merge join is the join method used when both relations are at the same site. The cost of shipping Reserves and doing the join at London is 1000(2td + ts ) + 4500td. Semijoins and Bloomjoins Consider the strategy of shipping Reserves to London and computing the join at London. Some tuples in (the current instance of) Reserves do not join with any tuple in (the current instance of) Sailors. If we could somehow identify Parallel and Distributed Databases 747 Reserves tuples that are guaranteed not to join with any Sailors tuples, we could avoid shipping them. Two techniques, Semijoin and Bloomjoin, have been proposed for reducing the number of Reserves tuples to be shipped. The first technique is called Semijoin. The idea is to proceed in three steps: 1. At London, compute the projection of Sailors onto the join columns (in this case just the sid field) and ship this projection to Paris. 2. At Paris, compute the natural join of the projection received from the first site with the Reserves relation. The result of this join is called the reduction of Reserves with respect to Sailors. Clearly, only those Re- serves tuples in the reduction will join with tuples in the Sailors relation. Therefore, ship the reduction of Reserves to London, rather than the entire Reserves relation. 3. At London, compute the join of the reduction of Reserves with Sailors. Let us compute the cost of using this technique for our example join query. Suppose we have a straightforward implementation of projection based on first scanning Sailors and creating a temporary relation with tuples that have only an sid field, then sorting the temporary and scanning the sorted temporary to eliminate duplicates. If we assume that the size of the sid field is 10 bytes, the cost of projection is 500td for scanning Sailors, plus 100td for creating the temporary, plus 400td for sorting it (in two passes), plus 100td for the final scan, plus 100td for writing the result into another temporary relation; a total of 1200td. (Because sid is a key, no duplicates need be eliminated; if the optimizer is good enough to recognize this, the cost of projection is just (500 + 100)td.) The cost of computing the projection and shipping it to Paris is therefore 1200td + 100ts. The cost of computing the reduction of Reserves is 3 · (100 + 1000) = 3300td , assuming that sort-merge join is used. (The cost does not reflect that the projection of Sailors is already sorted; the cost would decrease slightly if the refined sort-merge join exploited this.) What is the size of the reduction? If every sailor holds at least one reservation, the reduction includes every tuple of Reserves! The effort invested in shipping the projection and reducing Reserves is a total waste. Indeed, because of this observation, we note that Semijoin is especially useful in conjunction with a selection on one of the relations. For example, if we want to compute the join of Sailors tuples with a rating greater than 8 with the Reserves relation, the size of the projection on sid for tuples that satisfy the selection would be just 20 percent of the original projection, that is, 20 pages. 748 Chapter 22 Let us now continue the example join, with the assumption that we have the additional selection on rating. (The cost of computing the projection of Sailors goes down a bit, the cost of shipping it goes down to 20ts , and the cost of the reduction of Reserves also goes down a little, but we ignore these reductions for simplicity.) We assume that only 20 percent of the Reserves tuples are included in the reduction, thanks to the selection. Hence, the reduction contains 200 pages, and the cost of shipping it is 200ts. Finally, at London, the reduction of Reserves is joined with Sailors, at a cost of 3 · (200 + 500) = 21100td. Observe that there are over 6500 page I/Os versus about 200 pages shipped, using this join technique. In contrast, to ship Reserves to London and do the join there costs 1000ts plus 4500td. With a high-speed network, the cost of Semijoin may be more than the cost of shipping Reserves in its entirety, even though the shipping cost itself is much less (200ts versus 1000ts ). The second technique, called Bloomjoin, is quite similar. The main difference is that a bit-vector is shipped in the first step, instead of the projection of Sailors. A bit-vector of (some chosen) size k is computed by hashing each tuple of Sailors into the range 0 to k − 1 and setting bit i to 1 if some tuple hashes to i, and 0 otherwise. In the second step, the reduction of Reserves is computed by hashing each tuple of Reserves (using the sid field) into the range 0 to k − 1, using the same hash function used to construct the bit-vector and discarding tuples whose hash value i corresponds to a 0 bit. Because no Sailors tuples hash to such an i, no Sailors tuple can join with any Reserves tuple that is not in the reduction. The costs of shipping a bit-vector and reducing Reserves using the vector are less than the corresponding costs in Semijoin. On the other hand, the size of the reduction of Reserves is likely to be larger than in Semijoin; so, the costs of shipping the reduction and joining it with Sailors are likely to be higher. Let us estimate the cost of this approach. The cost of computing the bit- vector is essentially the cost of scanning Sailors, which is 500td. The cost of sending the bit-vector depends on the size we choose for the bit-vector, which is certainly smaller than the size of the projection; we take this cost to be 20ts , for concreteness. The cost of reducing Reserves is just the cost of scanning Reserves, 1000td. The size of the reduction of Reserves is likely to be about the same as or a little larger than the size of the reduction in the Semijoin approach; instead of 200, we will take this size to be 220 pages. (We assume that the selection on Sailors is included, to permit a direct comparison with the cost of Semijoin.) The cost of shipping the reduction is therefore 220ts. The cost of the final join at London is 3 · (500 + 220) = 2160td. Parallel and Distributed Databases 749 Thus, in comparison to Semijoin, the shipping cost of this approach is about the same, although it could be higher if the bit-vector were not as selective as the projection of Sailors in terms of reducing Reserves. Typically, though, the reduction of Reserves is no more than 10 to 20 percent larger than the size of the reduction in Semijoin. In exchange for this slightly higher shipping cost, Bloomjoin achieves a significantly lower processing cost: less than 3700td versus more than 6500td for Semijoin. Indeed, Bloomjoin has a lower I/O cost and a lower shipping cost than the strategy of shipping all of Reserves to London! These numbers indicate why Bloomjoin is an attractive distributed join method; but the sensitivity of the method to the effectiveness of bit-vector hashing (in reducing Reserves) should be kept in mind. 22.10.3 Cost-Based Query Optimization We have seen how data distribution can affect the implementation of individual operations, such as selection, projection, aggregation, and join. In general, of course, a query involves several operations, and optimizing queries in a dis- tributed database poses the following additional challenges: Communication costs must be considered. If we have several copies of a relation, we must also decide which copy to use. If individual sites are run under the control of different DBMSs, the au- tonomy of each site must be respected while doing global query planning. Query optimization proceeds essentially as in a centralized DBMS, as described in Chapter 12, with information about relations at remote sites obtained from the system catalogs. Of course, there are more alternative methods to consider for each operation (e.g., consider the new options for distributed joins), and the cost metric must account for communication costs as well, but the overall planning process is essentially unchanged if we take the cost metric to be the total cost of all operations. (If we consider response time, the fact that certain subqueries can be carried out in parallel at different sites would require us to change the optimizer as per the discussion in Section 22.5.) In the overall plan, local manipulation of relations at the site where they are stored (to compute an intermediate relation to be shipped elsewhere) is encap- sulated into a suggested local plan. The overall plan includes several such local plans, which we can think of as subqueries executing at different sites. While generating the global plan, the suggested local plans provide realistic cost es- timates for the computation of the intermediate relations; the suggested local plans are constructed by the optimizer mainly to provide these local cost esti- mates. A site is free to ignore the local plan suggested to it if it is able to find a cheaper plan by using more current information in the local catalogs. Thus, 750 Chapter 22 site autonomy is respected in the optimization and evaluation of distributed queries. 22.11 UPDATING DISTRIBUTED DATA The classical view of a distributed DBMS is that it should behave just like a centralized DBMS from the point of view of a user; issues arising from distribu- tion of data should be transparent to the user, although, of course, they must be addressed at the implementation level. With respect to queries, this view of a distributed DBMS means that users should be able to ask queries without worrying about how and where relations are stored; we have already seen the implications of this requirement on query evaluation. With respect to updates, this view means that transactions should continue to be atomic actions, regardless of data fragmentation and replication. In particular, all copies of a modified relation must be updated before the modi- fying transaction commits. We refer to replication with this semantics as syn- chronous replication; before an update transaction commits, it synchronizes all copies of modified data. An alternative approach to replication, called asynchronous replication, has come to be widely used in commercial distributed DBMSs. Copies of a modified relation are updated only periodically in this approach, and a transaction that reads different copies of the same relation may see different values. Thus, asynchronous replication compromises distributed data independence, but it can be implemented more efficiently than synchronous replication. 22.11.1 Synchronous Replication There are two basic techniques for ensuring that transactions see the same value regardless of which copy of an object they access. In the first technique, called voting, a transaction must write a majority of copies to modify an object and read at least enough copies to make sure that one of the copies is current. For example, if there are 10 copies and 7 copies are written by update transactions, then at least 4 copies must be read. Each copy has a version number, and the copy with the highest version number is current. This technique is not at- tractive in most situations because reading an object requires reading multiple copies; in most applications, objects are read much more frequently than they are updated, and efficient performance on reads is very important. Parallel and Distributed Databases 751 In the second technique, called read-any write-all, to read an object, a trans- action can read any one copy, but to write an object, it must write all copies. Reads are fast, especially if we have a local copy, but writes are slower, relative to the first technique. This technique is attractive when reads are much more frequent than writes, and it is usually adopted for implementing synchronous replication. 22.11.2 Asynchronous Replication Synchronous replication comes at a significant cost. Before an update transac- tion can commit, it must obtain exclusive locks on all copies—assuming that the read-any write-all technique is used—of modified data. The transaction may have to send lock requests to remote sites and wait for the locks to be granted, and during this potentially long period, it continues to hold all its other locks. If sites or communication links fail, the transaction cannot commit until all the sites at which it has modified data recover and are reachable. Finally, even if locks are obtained readily and there are no failures, committing a transaction requires several additional messages to be sent as part of a commit protocol (Section 22.14.1). For these reasons, synchronous replication is undesirable or even unachievable in many situations. Asynchronous replication is gaining in popularity, even though it allows different copies of the same object to have different values for short periods of time. This situation violates the principle of distributed data independence; users must be aware of which copy they are accessing, recognize that copies are brought up-to-date only periodically, and live with this reduced level of data consistency. Nonetheless, this seems to be a practical compromise that is acceptable in many situations. Primary Site versus Peer-to-Peer Replication Asynchronous replication comes in two flavors. In primary site asynchronous replication, one copy of a relation is designated the primary or master copy. Replicas of the entire relation or fragments of the relation can be created at other sites; these are secondary copies, and unlike the primary copy, they can- not be updated. A common mechanism for setting up primary and secondary copies is that users first register or publish the relation at the primary site and subsequently subscribe to a fragment of a registered relation from another (secondary) site. In peer-to-peer asynchronous replication, more than one copy (although per- haps not all) can be designated as updatable, that is, a master copy. In addition to propagating changes, a conflict resolution strategy must be used to deal 752 Chapter 22 with conflicting changes made at different sites. For example, Joe’s age may be changed to 35 at one site and to 38 at another. Which value is ‘correct’ ? Many more subtle kinds of conflicts can arise in peer-to-peer replication, and in general peer-to-peer replication leads to ad hoc conflict resolution. Some spe- cial situations in which peer-to-peer replication does not lead to conflicts arise quite often, and in such situations peer-to-peer replication is best utilized. For example: Each master is allowed to update only a fragment (typically a horizontal fragment) of the relation, and any two fragments updatable by different masters are disjoint. For example, it may be that salaries of German em- ployees are updated only in Frankfurt, and salaries of Indian employees are updated only in Madras, even though the entire relation is stored at both Frankfurt and Madras. Updating rights are held by only one master at a time. For example, one site is designated a backup to another site. Changes at the master site are propagated to other sites and updates are not allowed at other sites (including the backup). But, if the master site fails, the backup site takes over and updates are now permitted at (only) the backup site. We will not discuss peer-to-peer replication further. Implementing Primary Site Asynchronous Replication The main issue in implementing primary site replication is determining how changes to the primary copy are propagated to the secondary copies. Changes are usually propagated in two steps, called Capture and Apply. Changes made by committed transactions to the primary copy are somehow identified during the Capture step and subsequently propagated to secondary copies during the Apply step. In contrast to synchronous replication, a transaction that modifies a replicated relation directly locks and changes only the primary copy. It is typically com- mitted long before the Apply step is carried out. Systems vary considerably in their implementation of these steps. We present an overview of some of the alternatives. Capture The Capture step is implemented using one of two approaches. In log-based Capture, the log maintained for recovery purposes is used to generate a record of updates. Basically, when the log tail is written to stable storage, all log Parallel and Distributed Databases 753 records that affect replicated relations are also written to a separate change data table (CDT). Since the transaction that generated the update log record may still be active when the record is written to the CDT, it may subsequently abort. Update log records written by transactions that subsequently abort must be removed from the CDT to obtain a stream of updates due (only) to committed transactions. This stream can be obtained as part of the Capture step or subsequently in the Apply step if commit log records are added to the CDT; for concreteness, we assume that the committed update stream is obtained as part of the Capture step and that the CDT sent to the Apply step contains only update log records of committed transactions. In procedural Capture, a procedure automatically invoked by the DBMS or an application program initiates the Capture process, which consists typically of taking a snapshot of the primary copy. A snapshot is just a copy of the relation as it existed at some instant in time. (A procedure that is automatically invoked by the DBMS, such as the one that initiates Capture, is called a trigger. We covered triggers in Chapter 5.) Log-based Capture has a smaller overhead than procedural Capture and, be- cause it is driven by changes to the data, results in a smaller delay between the time the primary copy is changed and the time that the change is propagated to the secondary copies. (Of course, this delay also depends on how the Apply step is implemented.) In particular, only changes are propagated, and related changes (e.g., updates to two tables with a referential integrity constraint be- tween them) are propagated together. The disadvantage is that implementing log-based Capture requires a detailed understanding of the structure of the log, which is quite system specific. Therefore, a vendor cannot easily implement a log-based Capture mechanism that will capture changes made to data in another vendor’s DBMS. Apply The Apply step takes the changes collected by the Capture step, which are in the CDT table or a snapshot, and propagates them to the secondary copies. This can be done by having the primary site continuously send the CDT or periodically requesting (the latest portion of) the CDT or a snapshot from the primary site. Typically, each secondary site runs a copy of the Apply process and ‘pulls’ the changes in the CDT from the primary site using periodic requests. The interval between such requests can be controlled by a timer or a user’s application program. Once the changes are available at the secondary site, they can be applied directly to the replica. 754 Chapter 22 In some systems, the replica need not be just a fragment of the original relation— it can be a view defined using SQL, and the replication mechanism is sufficiently sophisticated to maintain such a view at a remote site incrementally (by reeval- uating only the part of the view affected by changes recorded in the CDT). Log-based Capture in conjunction with continuous Apply minimizes the delay in propagating changes. It is the best combination in situations where the primary and secondary copies are both used as part of an operational DBMS and replicas must be as closely synchronized with the primary copy as possi- ble. Log-based Capture with continuous Apply is essentially a less expensive substitute for synchronous replication. Procedural Capture and application- driven Apply offer the most flexibility in processing source data and changes before altering the replica; this flexibility is often useful in data warehousing applications where the ability to ‘clean’ and filter the retrieved data is more important than the currency of the replica. Data Warehousing: An Example of Replication Complex decision support queries that look at data from multiple sites are be- coming very important. The paradigm of executing queries that span multiple sites is simply inadequate for performance reasons. One way to provide such complex query support over data from multiple sources is to create a copy of all the data at some one location and use the copy rather than going to the in- dividual sources. Such a copied collection of data is called a data warehouse. Specialized systems for building, maintaining, and querying data warehouses have become important tools in the marketplace. Data warehouses can be seen as one instance of asynchronous replication, in which copies are updated relatively infrequently. When we talk of replica- tion, we typically mean copies maintained under the control of a single DBMS, whereas with data warehousing, the original data may be on different software platforms (including database systems and OS file systems) and even belong to different organizations. This distinction, however, is likely to become blurred as vendors adopt more ‘open’ strategies to replication. For example, some products already support the maintenance of replicas of relations stored in one vendor’s DBMS in another vendor’s DBMS. We note that data warehousing involves more than just replication. We discuss other aspects of data warehousing in Chapter 25. Parallel and Distributed Databases 755 22.12 DISTRIBUTED TRANSACTIONS In a distributed DBMS, a given transaction is submitted at some one site, but it can access data at other sites as well. In this chapter we refer to the activity of a transaction at a given site as a subtransaction. When a transaction is submitted at some site, the transaction manager at that site breaks it up into a collection of one or more subtransactions that execute at different sites, submits them to transaction managers at the other sites, and coordinates their activity. We now consider aspects of concurrency control and recovery that require ad- ditional attention because of data distribution. As we saw in Chapter 16, there are many concurrency control protocols; in this chapter, for concreteness, we assume that Strict 2PL with deadlock detection is used. We discuss the follow- ing issues in subsequent sections: Distributed Concurrency Control: How can locks for objects stored across several sites be managed? How can deadlocks be detected in a distributed database? Distributed Recovery: Transaction atomicity must be ensured—when a transaction commits, all its actions, across all the sites at which it executes, must persist. Similarly, when a transaction aborts, none of its actions must be allowed to persist. 22.13 DISTRIBUTED CONCURRENCY CONTROL In Section 22.11.1, we described two techniques for implementing synchronous replication, and in Section 22.11.2, we discussed various techniques for imple- menting asynchronous replication. The choice of technique determines which objects are to be locked. When locks are obtained and released is determined by the concurrency control protocol. We now consider how lock and unlock requests are implemented in a distributed environment. Lock management can be distributed across sites in many ways: Centralized: A single site is in charge of handling lock and unlock requests for all objects. Primary Copy: One copy of each object is designated the primary copy. All requests to lock or unlock a copy of this object are handled by the lock manager at the site where the primary copy is stored, regardless of where the copy itself is stored. 756 Chapter 22 Fully Distributed: Requests to lock or unlock a copy of an object stored at a site are handled by the lock manager at the site where the copy is stored. The centralized scheme is vulnerable to failure of the single site that controls locking. The primary copy scheme avoids this problem, but in general, reading an object requires communication with two sites: the site where the primary copy resides and the site where the copy to be read resides. This problem is avoided in the fully distributed scheme, because locking is done at the site where the copy to be read resides. However, while writing, locks must be set at all sites where copies are modified in the fully distributed scheme, whereas locks need be set only at one site in the other two schemes. Clearly, the fully distributed locking scheme is the most attractive scheme if reads are much more frequent than writes, as is usually the case. 22.13.1 Distributed Deadlock One issue that requires special attention when using either primary copy or fully distributed locking is deadlock detection. (Of course, a deadlock prevention scheme can be used instead, but we focus on deadlock detection, which is widely used.) As in a centralized DBMS, deadlocks must be detected and resolved (by aborting some deadlocked transaction). Each site maintains a local waits-for graph, and a cycle in a local graph indicates a deadlock. However, there can be a deadlock even if no local graph contains a cycle. For example, suppose that two sites, A and B, both contain copies of objects O1 and O2, and that the read-any write-all technique is used. T 1, which wants to read O1 and write O2, obtains an S lock on O1 and an X lock on O2 at Site A, then requests an X lock on O2 at Site B. T 2, which wants to read O2 and write O1, meanwhile, obtains an S lock on O2 and an X lock on O1 at Site B, then requests an X lock on O1 at Site A. As Figure 22.5 illustrates, T 2 is waiting for T 1 at Site A and T 1 is waiting for T 2 at Site B; thus, we have a deadlock, which neither site can detect based solely on its local waits-for graph. To detect such deadlocks, a distributed deadlock detection algorithm must be used. We describe three such algorithms. The first algorithm, which is centralized, consists of periodically sending all lo- cal waits-for graphs to one site that is responsible for global deadlock detection. At this site, the global waits-for graph is generated by combining all the local graphs; the set of nodes is the union of nodes in the local graphs, and there is Parallel and Distributed Databases 757 T1 T2 At site A T1 T2 At site B T1 T2 Global Waits−for Graph Figure 22.5 Distributed Deadlock an edge from one node to another if there is such an edge in any of the local graphs. The second algorithm, which is hierarchical, groups sites into a hierarchy. For instance, sites might be grouped by state, then by country, and finally into a single group that contains all sites. Every node in this hierarchy constructs a waits-for graph that reveals deadlocks involving only sites contained in (the subtree rooted at) this node. All sites periodically (e.g., every 10 seconds) send their local waits-for graph to the site responsible for constructing the waits- for graph for their state. The sites constructing waits-for graphs at the state level periodically (e.g., every minute) send the state waits-for graph to the site constructing the waits-for graph for their country. The sites constructing waits-for graphs at the country level periodically (e.g., every 10 minutes) send the country waits-for graph to the site constructing the global waits-for graph. This scheme is based on the observation that more deadlocks are likely across closely related sites than across unrelated sites, and it puts more effort into detecting deadlocks across related sites. All deadlocks are eventually detected, but a deadlock involving two different countries may take a while to detect. The third algorithm is simple: If a transaction waits longer than some chosen time-out interval, it is aborted. Although this algorithm may cause many unnecessary restarts, the overhead of deadlock detection is (obviously!) low, and in a heterogeneous distributed database, if the participating sites cannot cooperate to the extent of sharing their waits-for graphs, it may be the only option. A subtle point to note with respect to distributed deadlock detection is that delays in propagating local information might cause the deadlock detection algorithm to identify ‘deadlocks’ that do not really exist. Such situations, called phantom deadlocks, lead to unnecessary aborts. For concreteness, we discuss the centralized algorithm, although the hierarchical algorithm suffers from the same problem. 758 Chapter 22 Consider a modification of the previous example. As before, the two transac- tions wait for each other, generating the local waits-for graphs shown in Figure 22.5, and the local waits-for graphs are sent to the global deadlock-detection site. However, T 2 is now aborted for reasons other than deadlock. (For ex- ample, T 2 may also be executing at a third site, where it reads an unexpected data value and decides to abort.) At this point, the local waits-for graphs have changed so that there is no cycle in the ‘true’ global waits-for graph. However, the constructed global waits-for graph will contain a cycle, and T 1 may well be picked as the victim! 22.14 DISTRIBUTED RECOVERY Recovery in a distributed DBMS is more complicated than in a centralized DBMS for the following reasons: New kinds of failure can arise: failure of communication links and failure of a remote site at which a subtransaction is executing. Either all subtransactions of a given transaction must commit or none must commit, and this property must be guaranteed despite any combination of site and link failures. This guarantee is achieved using a commit proto- col. As in a centralized DBMS, certain actions are carried out as part of normal execution to provide the necessary information to recover from failures. A log is maintained at each site, and in addition to the kinds of information maintained in a centralized DBMS, actions taken as part of the commit protocol are also logged. The most widely used commit protocol is called Two-Phase Commit (2PC). A variant called 2PC with Presumed Abort, which we discuss next, has been adopted as an industry standard. In this section, we first describe the steps taken during normal execution, con- centrating on the commit protocol, and then discuss recovery from failures. 22.14.1 Normal Execution and Commit Protocols During normal execution, each site maintains a log, and the actions of a sub- transaction are logged at the site where it executes. The regular logging activity described in Chapter 18 is carried out and, in addition, a commit protocol is followed to ensure that all subtransactions of a given transaction either commit or abort uniformly. The transaction manager at the site where the transaction originated is called the coordinator for the transaction; transaction managers at sites where its subtransactions execute are called subordinates (with re- spect to the coordination of this transaction). Parallel and Distributed Databases 759 We now describe the Two-Phase Commit (2PC) protocol, in terms of the messages exchanged and the log records written. When the user decides to commit a transaction, the commit command is sent to the coordinator for the transaction. This initiates the 2PC protocol: 1. The coordinator sends a prepare message to each subordinate. 2. When a subordinate receives a prepare message, it decides whether to abort or commit its subtransaction. It force-writes an abort or prepare log record, and then sends a no or yes message to the coordinator. Note that a prepare log record is not used in a centralized DBMS; it is unique to the distributed commit protocol. 3. If the coordinator receives yes messages from all subordinates, it force- writes a commit log record and then sends a commit message to all sub- ordinates. If it receives even one no message or receives no response from some subordinate for a specified time-out interval, it force-writes an abort log record, and then sends an abort message to all subordinates.1 4. When a subordinate receives an abort message, it force-writes an abort log record, sends an ack message to the coordinator, and aborts the subtrans- action. When a subordinate receives a commit message, it force-writes a commit log record, sends an ack message to the coordinator, and commits the subtransaction. 5. After the coordinator has received ack messages from all subordinates, it writes an end log record for the transaction. The name Two-Phase Commit reflects the fact that two rounds of messages are exchanged: first a voting phase, then a termination phase, both initiated by the coordinator. The basic principle is that any of the transaction man- agers involved (including the coordinator) can unilaterally abort a transaction, whereas there must be unanimity to commit a transaction. When a message is sent in 2PC, it signals a decision by the sender. To ensure that this decision survives a crash at the sender’s site, the log record describing the decision is always forced to stable storage before the message is sent. A transaction is officially committed at the time the coordinator’s commit log record reaches stable storage. Subsequent failures cannot affect the outcome of the transaction; it is irrevocably committed. Log records written to record the commit protocol actions contain the type of the record, the transaction id, and the identity of the coordinator. A coordinator’s commit or abort log record also contains the identities of the subordinates. 1 As an optimization, the coordinator need not send abort messages to subordinates who voted no. 760 Chapter 22 22.14.2 Restart after a Failure When a site comes back up after a crash, we invoke a recovery process that reads the log and processes all transactions executing the commit protocol at the time of the crash. The transaction manager at this site could have been the coordinator for some of these transactions and a subordinate for others. We do the following in the recovery process: If we have a commit or abort log record for transaction T , its status is clear; we redo or undo T , respectively. If this site is the coordinator, which can be determined from the commit or abort log record, we must periodically resend—because there may be other link or site failures in the system—a commit or abort message to each subordinate until we receive an ack. After we have received acks from all subordinates, we write an end log record for T. If we have a prepare log record for T but no commit or abort log record, this site is a subordinate, and the coordinator can be determined from the prepare record. We must repeatedly contact the coordinator site to determine the status of T. Once the coordinator responds with either commit or abort, we write a corresponding log record, redo or undo the transaction, and then write an end log record for T. If we have no prepare, commit, or abort log record for transaction T , T certainly could not have voted to commit before the crash; so we can unilaterally abort and undo T and write an end log record. In this case, we have no way to determine whether the current site is the coordinator or a subordinate for T. However, if this site is the coordinator, it might have sent a prepare message prior to the crash, and if so, other sites may have voted yes. If such a subordinate site contacts the recovery process at the current site, we now know that the current site is the coordinator for T , and given that there is no commit or abort log record, the response to the subordinate should be to abort T. Observe that, if the coordinator site for a transaction T fails, subordinates who voted yes cannot decide whether to commit or abort T until the coordinator site recovers; we say that T is blocked. In principle, the active subordinate sites could communicate among themselves, and if at least one of them contains an abort or commit log record for T , its status becomes globally known. To communicate among themselves, all subordinates must be told the identity of the other subordinates at the time they are sent the prepare message. However, 2PC is still vulnerable to coordinator failure during recovery because even if all subordinates voted yes, the coordinator (who also has a vote!) may have de- cided to abort T , and this decision cannot be determined until the coordinator site recovers. Parallel and Distributed Databases 761 We covered how a site recovers from a crash, but what should a site that is involved in the commit protocol do if a site that it is communicating with fails? If the current site is the coordinator, it should simply abort the transaction. If the current site is a subordinate, and it has not yet responded to the coor- dinator’s prepare message, it can (and should) abort the transaction. If it is a subordinate and has voted yes, then it cannot unilaterally abort the transac- tion, and it cannot commit either; it is blocked. It must periodically contact the coordinator until it receives a reply. Failures of communication links are seen by active sites as failure of other sites that they are communicating with, and therefore the solutions just outlined apply to this case as well. 22.14.3 Two-Phase Commit Revisited Now that we examined how a site recovers from a failure, and saw the inter- action between the 2PC protocol and the recovery process, it is instructive to consider how 2PC can be refined further. In doing so, we arrive at a more ef- ficient version of 2PC, but equally important perhaps, we understand the role of the various steps of 2PC more clearly. Consider three basic observations: 1. The ack messages in 2PC are used to determine when a coordinator (or the recovery process at a coordinator site following a crash) can ‘forget’ about a transaction T. Until the coordinator knows that all subordinates are aware of the commit or abort decision for T , it must keep information about T in the transaction table. 2. If the coordinator site fails after sending out prepare messages but before writing a commit or abort log record, when it comes back up, it has no information about the transaction’s commit status prior to the crash. How- ever, it is still free to abort the transaction unilaterally (because it has not written a commit record, it can still cast a no vote itself). If another site inquires about the status of the transaction, the recovery process, as we have seen, responds with an abort message. Therefore, in the absence of information, a transaction is presumed to have aborted. 3. If a subtransaction does no updates, it has no changes to either redo or undo; in other words, its commit or abort status is irrelevant. The first two observations suggest several refinements: When a coordinator aborts a transaction T , it can undo T and remove it from the transaction table immediately. After all, removing T from the table results in a ‘no information’ state with respect to T , and the default 762 Chapter 22 response (to an enquiry about T ) in this state, which is abort, is the correct response for an aborted transaction. By the same token, if a subordinate receives an abort message, it need not send an ack message. The coordinator is not waiting to hear from subor- dinates after sending an abort message! If, for some reason, a subordinate that receives a prepare message (and voted yes) does not receive an abort or commit message for a specified time-out interval, it contacts the coordi- nator again. If the coordinator decided to abort, there may no longer be an entry in the transaction table for this transaction, but the subordinate receives the default abort message, which is the correct response. Because the coordinator is not waiting to hear from subordinates after deciding to abort a transaction, the names of subordinates need not be recorded in the abort log record for the coordinator. All abort log records (for the coordinator as well as subordinates) can simply be appended to the log tail, instead of doing a force-write. After all, if they are not written to stable storage before a crash, the default decision is to abort the transaction. The third basic observation suggests some additional refinements: If a subtransaction does no updates (which can be easily detected by keep- ing a count of update log records), the subordinate can respond to a prepare message from the coordinator with a reader message, instead of yes or no. The subordinate writes no log records in this case. When a coordinator receives a reader message, it treats the message as a yes vote, but with the optimization that it does not send any more messages to the subordinate, because the subordinate’s commit or abort status is irrelevant. If all subtransactions, including the subtransaction at the coordinator site, send a reader message, we do not need the second phase of the commit pro- tocol. Indeed, we can simply remove the transaction from the transaction table, without writing any log records at any site for this transaction. The Two-Phase Commit protocol with the refinements discussed in this section is called Two-Phase Commit with Presumed Abort. 22.14.4 Three-Phase Commit A commit protocol called Three-Phase Commit (3PC) can avoid blocking even if the coordinator site fails during recovery. The basic idea is that, when Parallel and Distributed Databases 763 the coordinator sends out prepare messages and receives yes votes from all sub- ordinates, it sends all sites a precommit message, rather than a commit message. When a sufficient number—more than the maximum number of failures that must be handled—of acks have been received, the coordinator force-writes a commit log record and sends a commit message to all subordinates. In 3PC, the coordinator effectively postpones the decision to commit until it is sure that enough sites know about the decision to commit; if the coordinator sub- sequently fails, these sites can communicate with each other and detect that the transaction must be committed—conversely, aborted, if none of them has received a precommit message—without waiting for the coordinator to recover. The 3PC protocol imposes a significant additional cost during normal execution and requires that communication link failures do not lead to a network partition (wherein some sites cannot reach some other sites through any path) to ensure freedom from blocking. For these reasons, it is not used in practice. 22.15 REVIEW QUESTIONS Answers to the review questions can be found in the listed sections. Discuss the different motivations behind parallel and distributed databases. (Section 22.1) Describe the three main architectures for parallel DBMSs. Explain why the shared-memory and shared-disk approaches suffer from interference. What can you say about the speed-up and scale-up of the shared-nothing architecture? (Section 22.2) Describe and differentiate pipelined parallelism and data-partitioned paral- lelism. (Section 22.3) Discuss the following techniques for partitioning dat

Use Quizgecko on...
Browser
Browser