Cloud-Based Data Processing PDF
Document Details

Uploaded by UnquestionableChalcedony5532
Cambridge Middle School
Jana Giceva
Tags
Summary
This document by Jana Giceva presents a comprehensive lecture on cloud-based data processing, focusing on OLAP in the cloud. The topics cover data warehouse system architecture, storage, query execution, and the transition to cloud environments. The document discusses various architectures to support scalability, performance, and elasticity.
Full Transcript
Cloud-Based Data Processing OLAP in the cloud Jana Giceva 1 Data warehouse system architecture ▪ Shared-nothing architectures are the dominant architecture for high-performance data warehousing. ▪ Important architectural dimensions and methods − Storage: − Data...
Cloud-Based Data Processing OLAP in the cloud Jana Giceva 1 Data warehouse system architecture ▪ Shared-nothing architectures are the dominant architecture for high-performance data warehousing. ▪ Important architectural dimensions and methods − Storage: − Data layout, storage format, partitioning, distribution? − Query engine: − Query optimization, execution models, parallelism, multi-tenancy? − Cluster: − (Meta-) data sharing, resource allocation and management 5 Data warehouse storage ▪ Data layout: column-store vs. row-store − Column-stores only read relevant data (skip irrelevant data by skipping unrelated columns) − Suitable for analytical workloads (better use of CPU caches, SIMD registers, lightweight compression). − e.g., Apache Parquet, ORC, etc. ▪ Storage format: compression is key! − Trades I/O for CPU and good fit for large datasets (storage) and I/O intensive workloads − Excellent synergy with column-stores − e.g., RLE, gzip, LZ4, etc. ▪ Pruning: skip irrelevant data − Data is usually ordered → can maintain a sparse MinMax index − Allows to skip irrelevant data horizontally (rows). 6 Table partitioning and distribution ▪ Data is spread based on a key − Functions: hash, range, list ▪ Distribution (system-driven) − Goal: parallelism − each compute node gets a piece of the data − each query has work on every piece (keep everyone busy) ▪ Partitioning (user-specified) img src: P.Boncz (CWI) − Goal 1: data lifecycle management − Data warehouse e.g., keeps last six months − Every night: load one new day, drop the oldest partition − Goal 2: improve access pattern with partition pruning − When querying for month “May”, drop partitions P1,P3,P4. 7 Query Execution Scalability is not as important unless you can make the most out of the underlying hardware. Scalable system Cores Time [s] Spark 128 857 Giraph 128 596 GraphX 128 419 SingleThreaded 1 275 Src: McSherry et al. Scalability! But at what COST? HotOS’15 ▪ Vectorized execution − Data is pipelined in batches (of few thousand rows) to save I/O, and greatly improve cache efficiency. − E.g., Actian Vortex, Hive, Drill, Snowflake, MySQL’s Heatwave accelerator, etc. ▪ And/or JIT code generation − Generate a program that executes the exact query plan, JIT compile it, and run on your data. − E.g., Tableau/HyPer/Umbra, SingleStore, AWS Redshift, etc. 8 Moving to the cloud 9 Cloud-native warehouses ▪ Design considerations for good performance, scalability, elasticity, fault-tolerance, etc. ▪ Challenges at cloud-scale for cloud-data: − Storage: − Abstracting from the storage format − Distribution and partitioning of data even more relevant at cloud-scale − Data caching across the (deep) storage hierarchy (cost/performance) − Query execution: − Distributed query execution: combine scale-out and scale-up − Distributed query optimization − Global resource-aware scheduling ▪ Service form factor − Reserved-capacity services vs. serverless instances vs. auto-scaling 10 Shared-nothing architecture ▪ Scales well for star-schema queries Network as very little bandwidth is required to join − a small (broadcast) dimensions table with DB DB DB DB − a large (partitioned) fact table. Local ▪ Elegant design with homogeneous nodes storage Disk Disk Disk Disk Every query processor node (DB) has its own local attached storage (disk). Data is horizontally partitioned across the processor nodes. Each node is only responsible for the rows on its own local disks 11 A worker’s instance Leader Each compute node has dedicated CPU, memory and locally attached disk storage. Slice 1 NODE Slice 2 Slice 1 NODE Slice 2 Memory, storage, and data partitioned among the slices. Hash and round-robin table partitioning / distribution. ▪ The leader distributes data to the slices and assigns workload to them. ▪ The number of slices per node depends on the node size. ▪ Within a node, the leader decides how to distribute data between the slices (or the user can specify the distribution key, to better match the query’s joins and aggregations). 13 Within a slice Data stored in columns, with multi-dimensional sorting. ID NAME AGE DATE Q: Why was sorting important? Rich collection of compression options (RLE, dictionary, gzip, etc.) 14 Fault tolerance Leader Slice 1 NODE Slice 2 Slice 1 NODE Slice 2 ▪ Make blocks small, e.g. 1 MB ▪ Each block is replicated on a different compute node ▪ Blocks also stored on S3 ▪ Internally, S3 triply replicates S3 each block 15 Handling node failures Leader ▪ Assume node 1 fails: − Option #1: node 2 processes load until Slice 1 NODE Slice 1 NODE Slice 2 Slice 2 node 1 is restored − Option #2: new node is instantiated − node 3 processes workload using data in S3 − until the local disks are restored S3 16 Within a slice, cont. Data stored in columns, with multi-dimensional sorting. ID NAME AGE DATE Q: Why was sorting important? Rich collection of compression options (RLE, dictionary, gzip, etc.) Columns stored in blocks. Min and Max value of each block retained in a zone map. 17 Example: old version of Amazon (AWS) Redshift ▪ Classic shared-nothing design with locally attached storage ▪ The execution engine is ParAccel DBMS − Classic MPP, JIT C++ ▪ Leverages standard AWS services: − EC2 + EBS + S3, Virtual Private Cloud ▪ Redshift cluster: Leader + Compute nodes − Leader parses a query and builds an optimal execution plan. − Creates compiled code and distributes it to the compute nodes for processing. − Aggregates the results before returning the result to the client. 19 Drawbacks of shared-nothing architecture ▪ Tightly couples compute and storage resources ▪ Heterogeneous workloads − a system configuration that is ideal for bulk loading (high I/O bandwidth, light compute) is a poor fit for complex queries (low I/O bandwidth, heavy compute). ▪ Membership changes − if the set of nodes changes potentially a large volume of data needs to be reshuffled ▪ Online upgrades − possible but very hard when everything is coupled and expected to be homogeneous. ▪ This makes it problematic to use it in the cloud setting 20 Shared-storage architectures Separating compute and storage 21 Separating Compute and Storage ▪ Evolution of cloud-data warehouse architectures over the years ▪ Engines maintain state comprised of: cache, metadata, transaction log, and data On-premise Storage-separate State-separate architecture architecture architecture Caches Caches Caches Metadata Metadata Metadata Transaction Log Transaction Log Transaction Log Data Data Data ▪ The first step is decoupling of storage and compute – more flexible scaling − Both layers can scale-up or down independently − Storage is abundant and cheaper than compute − User only pays for the compute needed to query a working subset of the data 22 Separate compute and storage Compute VW Virtual Warehouse Storage ▪ Queries against the same data can be given the resources to meet their needs − This flexibility is not feasible with shared-nothing approach (e.g., in old Redshift) 23 Logic behind virtual warehouses ▪ Dynamically created cluster of compute instances − Pure compute resources Virtual Warehouse − Can be created, destroyed, and resized at any time Cluster of compute instances ▪ Local disk cache file headers and table columns Local data cache layer ▪ Sizing mechanisms: − Number of EC2 instances − Size of each instance (#cores, I/O capacity) ▪ Each query mapped to exactly one virtual warehouse (VW) ▪ Each VW can run multiple queries in parallel ▪ Every VW has access to the same shared data without needing to copy it. 24 Disaggregated Compute-Storage Architectures ▪ Advantages: − Elasticity: storage and compute resources need to be scaled independently − Availability: tolerate cluster and node failures − Heterogeneous workloads: high I/O bandwidth or heavy compute ▪ Key features: − Disaggregation of compute and storage − Multi-tenancy − Elastic data warehouses − Local SSDs caching − Cloud storage service, e.g., AWS S3, Google Cloud Storage, Azure Blob Storage Src: Li et al. Cloud-Native Databases, VLDB’22 ▪ Examples: − Snowflake, new AWS Redshift 25 Example: Snowflake ▪ Snowflake was the pioneering system to separate storage and compute in the cloud setting: − two loosely coupled, independently scalable services. ▪ Compute − handled by a proprietary shared-nothing execution engine. − highly elastic. ▪ Storage − handled by Amazon S3, Azure Blob storage, or Google Cloud storage. − dynamically cached on local storage clusters used to execute the queries 26 Example: Snowflake The Brain: Key data management services. The Muscle: Shared-nothing execution engine (virtual warehouse) The Storage: Shared-storage for data and query results. img src: Dageville et al. (2016) The Snowflake Elastic Data Warehouse. SIGMOD 27 Snowflake’s table storage ▪ Tables are horizontally partitioned into large immutable files ID − Similar to blocks or pages in a traditional database system NAME AGE ▪ Within each file: ID − The values of each attribute (column) are grouped together VALUES − Heavily compressed (e.g., gzip, RLE, etc.) NAME ▪ For accelerated query processing: VALUES − MinMax value of each column of each file of each table are kept in a catalog AGE − used for pruning at runtime. VALUES 28 Disaggregated Compute-Memory-Storage Arch. ▪ Advantages: − Elasticity: storage, compute and memory can be scaled independently − Schedule the resources for better utilization − Complex workloads: cope with the large intermediate results ▪ Key features: − Shuffle-memory layer for speeding up joins − Reduces I/O cost by avoiding to write the intermediate results to disks ▪ Examples: − BigQuery Src: Li et al. Cloud-Native Databases, VLDB’22 30 Example: BigQuery ▪ Compute (Clusters) + Shuffle tier (Colossus DFS) + Storage ▪ Dremel Query Engine ▪ Distributed memory shuffle tier for Melnik et al. Dremel: A Decade of Interactive SQL Analysis at Web Scale query optimization − Reduced the shuffle latency by 10x − Enabled 10x larger shuffles − Reduced the resource cost by > 20% 31 BigQuery’s Shuffle Workflow ▪ Producer in each worker: − generates partitions and − sends them to the shuffling layer ▪ Consumer: − combines the partitions and − performs the operations locally ▪ Large intermediate results can be spilled to local disks Melnik et al. Dremel: A Decade of Interactive SQL Analysis at Web Scale 32 Stateless shared-storage architectures Separating compute and state 33 Separating Compute and Storage / State ▪ In stateful architectures, state of in-flight transaction is stored in the compute node and is not hardened into persistent storage until the transaction commits. On-premise Storage-separate State-separate architecture architecture architecture Caches Caches Caches Metadata Metadata Metadata Transaction Log Transaction Log Transaction Log Data Data Data ▪ When a compute node fails, the state of non-committed transaction is lost → fail the transaction ▪ Resilience to compute node failure and elastic assignment of data to compute are not possible in stateful architectures → the need to move to stateless architectures. 34 Stateless compute architectures ▪ Compute nodes should not hold any state information State-separate architecture Caches ▪ Caches need to be as close to the compute as possible − Can be lazily reconstructed from persistent storage Metadata − No need to be decoupled from compute Transaction Log Data ▪ All data, transactional logs and metadata need to be externalized ▪ Enables partial restart of query execution in the event of compute node failures and online changes of the cluster topology ▪ Examples: BigQuery using the shuffle tier and dynamic scheduler, POLARIS 35 Example: POLARIS ▪ As before: − Separation of storage and compute − Compute done by Polaris pools − Shared centralized services − Metadata and Transactions ▪ Stateless architecture within a pool − Data stored durably in remote storage − Metadata and transactional log is offloaded to centralized services (built for high availability and performance) ▪ Enables POLARIS to support any cloud form-factor (reserved instances, auto- scaling or serverless). img src: Aguilar-Saborit et al. (2020) POLARIS: The Distributed SQL Engine in Azure Synapse. VLDB 36 Storage layer considerations ▪ Data cells – abstraction from the underlying data format and storage system − Converging data lakes and warehouses ▪ Hash-based distribution − To enable easy and balanced distribution − Hash-distribution h(r) is a system-defined function that returns the hash bucket (distribution) that r belongs to – mapping cells to compute nodes. ▪ The Partitioning function p(r) is used for range pruning when range or equality predicates are defined over r. img src: Aguilar-Saborit et al. (2020) POLARIS: The Distributed SQL Engine in Azure Synapse. VLDB 37 Distributed query processing – part 1 ▪ All incoming queries are compiled in two phases: − Stage 1 uses SQL server query optimizer to generate all logical equivalent plans to execute a query − Uses data for the collection of files/tables, partitions, and distributions − Stage 2 does distributed cost-based query optimization to enumerate all physical distributed implementations of these logical query plans. − Picks one with the least estimate cost (taking data movement cost into account). 38 Distributed query processing – part 2 ▪ Task 𝑇𝑖 – physical execution of an operator 𝐸 on the 𝑖𝑡ℎ hash-distribution of its inputs. ▪ Tasks are instantiated templates of (the code executing) expression 𝐸 that run in parallel across 𝑁 hash-distributions of the inputs. ▪ A task has three components: − Inputs: collection of cells for each input’s data partition stored either in local or remote storage − Task template: code to execute on the compute nodes, representing the operator expression 𝐸. − Output: collection of cells produced by the task. Used either input for another task or the final result. 39 Task organization ▪ Model the distribute query execution of queries via hierarchical state machines ▪ Execution of the query task DAG is top-down in topological sort-order. ▪ State machines to have fine-grained control at task-level and define a predictable model for recovering from failures. ▪ States and transitions are logged at each step – necessary for debugging and resuming after failover. ▪ Low resource overhead for tracking concurrent execution of many queries. 40 POLARIS Summary ▪ The separation of state and compute enable offering different service form-factors: − Serverless, capacity reservations, multiple pools. ▪ Data cell abstraction for efficient processing of diverse collection of data formats and storage systems ▪ Combines scale-up and scale-out − Scale-up: intra-partition parallelism, vectorized processing, columnar storage, careful control flow, cache-hierarchy optimizations, deep enhancements to query optimization, etc. − Fine-grained scale-out: distributed query processing inspired by big data query execution frameworks ▪ Elastic query processing via − Separation of state and compute − Flexible abstraction of datasets as cells − Task inputs defined in terms of cells − Fine-grained orchestration of tasks using state machines. 41 Serverless 43 Serverless Computing for Queries ▪ Issue queries in the cloud without worrying about resource provisioning and pay by query granularity ▪ Two main approaches: − Serverless Databases: − Rely on the cloud SQL engine and storage to execute the queries with dynamic resource provisioning − The DB service can pause when idle and resume when a query comes in − Serverless functions + Cloud storage: − Rely on Function-as-a-Service (FaaS) and cloud storage to run queries with on-demand resources 44 Function-as-a-Service (FaaS) ▪ FaaS − A serverless compute model where developers write and deploy small, self-contained functions. − Functions are event-driven and run only in response to triggers (e.g., API calls, file uploads). − Pay only for execution time and resources used. ▪ Function lifetime 1. Trigger: An event (e.g., HTTP request) invokes a function. 2. Execution: The function runs on a stateless server instance and automatically scales to handle demand. 3. Termination: Compute resources are deprovisioned after execution, eliminating idle costs. Event-driven, stateless, autoscaling, pay-per-use. Hellerstein et al. “Serverless Computing: One Step Forward, Two Steps Back” In CIDR. 2019 45 Serverless Functions + Cloud storage ▪ Two challenges: − Functions are stateless − Stragglers increase the overall latency of the parallel query processing ▪ Approach: − Use cloud storage to exchange state → similar to state-separate query processing − Use tuned models to detect stragglers and invoke functions with duplication computation 46 Serverless Functions + Cloud storage ▪ Query processing using lambda functions − Invoke many tasks in each stage − Each task writes the intermediate results to a single object file − Combiners can be used to reduce the read cost of the large shuffle ▪ Trade-off between the number of invoked tasks (performance) and cost Src: Starling: A scalable query engine on cloud functions. SIGMOD’20 47 Serverless Functions + Cold Starts ▪ What is a cold start? − When a function is invoked for the first time, or after a period of inactivity. − The platform provisions a new container or runtime environment, leading to initial latency. ▪ Impact: Increased latency can range from 10ms to several seconds, depending on platform and runtime. ▪ Approach: − Keep pre-warmed instances available − Reduce function size for faster initialization. − Predictive function instantiation for repetitive tasks Said Kazimov. “Limitations of serverless computing”, 2019 48 Summary of Serverless Computing for Queries Category Example Approach Scaling Pricing model System Serverless Azure SQL Stateful SQL More CPU, Pay for active Database AWS Athena engine + memory or service with min- BigQuery Auto-pausing stand-by max bound and resuming nodes Function as a Starling Lambda + Cloud Invoke more Pay for used fns Service (FaaS) Lambada Storage functions and storage 49 Summary of covered architectures ▪ Shared nothing architecture (e.g., old Redshift) − Independent nodes, each with its own compute, memory, and storage ▪ Shared storage architecture Disaggregated compute-storage (e.g., Snowflake) − Complete separation of compute and storage layers Disaggregated compute-memory-storage (e.g., BigQuery) − Uses a disaggregated shared memory pool to speed up shuffle operations. ▪ Stateless shared storage architecture (e.g., POLARIS) − Compute nodes are stateless beyond temporary caching. − Metadata and transactional logs are externalized to centralized storage. ▪ Serverless (e.g., Lambada) − Rely on Function-as-a-Service (FaaS) and cloud storage to run queries with on-demand resources 50 OLAP in the Cloud Building Blocks 51 I. Shuffling Primitive ▪ Goal: Enable scale-out for all query plans − Difficult to achieve without robust shuffling in place ▪ A network-based exchange operator used in distributed SQL engines ▪ Can repartition data across the cluster − Hash-based manner (e.g., GROUP BY keys) − Range-based (sort or distribution on sorted columns) − Random (simple load balancing to avoid skew) ▪ The design space for shuffle is huge 52 I. Shuffling Primitive Dimension Examples Deployment In service / separate service Topology N:N / N:M / Multi-level Storage Main memory / Local disk / Networked storage Partitioning Static / Dynamic / Overpartitioned Execution Mode Streaming across stages / Blocking Fault Tolerance Can restart within query / Need to restart whole query Networking Implicit (distributed file system) / Explicit (TCP, …) / Specialized (RDMA) Data Layout Row-based / Column-based / Compressed / Uncompressed 53 II. Query Optimization ▪ Phase 1: Query undergoes Logical Optimization − Rule-Based: Predicate Pushdown, Redundant Join Removal. − Cost-Based: Join Reordering based on estimated cardinalities. − Goal: Create an efficient, logically equivalent query plan before considering distribution. ▪ Phase 2: Translate to “Canonical” Distributed Plan − Insert Shuffles to create a correct distributed plan − Can be run right away and return correct results, but might be slow ▪ Phase 3: Query undergoes Distributed Optimization − Selecting Broadcast Joins − Eliminating Redundant Shuffles − Sideways Information Passing 54 II. Query Optimization 55 II. Query Optimization 56 II. Query Optimization 57 II. Query Optimization 58 III. Query Orchestration (the case of Firebolt) ▪ Distributed query plan is broken into “Stages” − Maximum connected subgraph between shuffles ▪ Every stage is executed on a subset of the nodes ▪ Scheduler performs topological sort on stage graph to identify stage ordering ▪ Stages are sent out to the nodes for processing 59 III. Query Orchestration – Local Runtime ▪ Node receives stage plan ▪ Stage plan is turned into an execution plan for the single-node runtime ▪ Stage plan operators map to primitives in the single-node runtime: − Sources: Table Scan, Read from Shuffle − Compute: Expression, Join Build, Join Probe, Pre-Aggregation, Aggregate Merge ○ − Sinks: Write to Shuffle, Return to User ▪ Stage is split into pipelines that use morsel-driven parallelism ▪ Only shuffle needs to reason about the cluster topology 60 III. Query Orchestration – Local Runtime ▪ Node receives stage plan ▪ Stage plan is turned into an execution plan for the single-node runtime ▪ Stage plan operators map to primitives in the single-node runtime: − Sources: Table Scan, Read from Shuffle − Compute: Expression, Join Build, Join Probe, Pre-Aggregation, Aggregate Merge ○ − Sinks: Write to Shuffle, Return to User ▪ Stage is split into pipelines that use morsel-driven parallelism ▪ Only shuffle needs to reason about the cluster topology 61 IV. Resource Management Challenges ▪ How can you estimate the resource consumption of a stage? ▪ What stages should overlap during execution? ▪ How do you decide whether to queue a query? ▪ How do you scale for concurrency? ▪ How do you minimize the cost of a workload? 62 References The material covered in this class is mainly based on: ▪ Slides from “Big Data for Data Science” from Prof. Peter Boncz, CWI (link) ▪ Slides from “Tutorial on Cloud-native Databases” from Li, Dong and Zhang, VLDB’22 (link) Papers: ▪ Dageville et al. The Snowflake Elastic Data Warehouse SIGMOD 2016 ▪ Aguilar-Saborit et a.. POLARIS: The Distributed SQL Engine in Azure Synapse. VLDB 2020 ▪ Pavlo et al. A Comparison of Approaches to Large-Scale Data Analysis. SIGMOD 2009 ▪ Vuppalapati et al. Building an Elastic Query Engine on Disaggregated Storage. NSDI 2020 ▪ Gupta et al. Amazon Redshift and the Case for Simpler Data Warehouses. SIGMOD 2015 ▪ Amazon Redshift Re-invented. SIGMOD 2022 Further reading: ▪ Tan et al. Choosing a Cloud DBMS: Architectures and Tradeoffs. VLDB 2019 ▪ Melnik et al. Dremel: A Decade of Interactive SQL Analysis at Web-Scale. VLDB 2019 ▪ Perron et al. Starling: A Scalable Query Engine on Cloud Functions. SIGMOD 2020 ▪ Mueller et al. Lambada: Interactive Data Analytics on Cold Data using Serverless Cloud Infrastructure. SIGMOD 2020 ▪ Winter et al. On-Demand State Separation for Cloud Data Warehousing. VLDB 2022 63