Message Oriented Midleware (MOM) PDF
Document Details
Uploaded by RealisticHouston
FEUP
2024
Tags
Summary
This document provides an overview of Message Oriented Middleware (MOM). It details the concept of asynchronous communication, focusing on Java Message Service and its implementation. The document also touches upon distributed systems, explaining the concept of message-based communication and its role in networking.
Full Transcript
Message Oriented Midleware (MOM) October 1, 2024 1/48 Roadmap Message-based communication Asynchronous Communication (MOM) Concept Java Message Service Implementation Further Reading 2/48 R...
Message Oriented Midleware (MOM) October 1, 2024 1/48 Roadmap Message-based communication Asynchronous Communication (MOM) Concept Java Message Service Implementation Further Reading 2/48 Roadmap Message-based communication Asynchronous Communication (MOM) Concept Java Message Service Implementation Further Reading 3/48 Distributed System Definition A distributed system consists of a collection of distinct processes which are spatially separated and which communicate with one another by exchanging messages. (L. Lamport, "Time, Clocks and the Order of Events in a Distributed System", CACM) ▶ "A system is distributed if the message transmission delay is not negligible compared to the time between events in a single process." Process Process What is a message? is an atomic bit string ▶ Its format and its meaning are specified by a communications protocol 4/48 Message-based Communication and Networking ▶ The transport of a message from its source to its destination is performed by a computer network. Host Host Application Host Channel Application Host Host 5/48 Internet Protocols Application Specific communication services Transport Communication between 2 (or more) pro- cesses. Network Communication between 2 computers not directly connected with each other. Interface Communication between 2 computers di- rectly connected. ▶ On the Internet, the properties of the communication channel provided to an application depend on the transport protocol used (UDP or TCP): ▶ The design of a distributed application depends on the properties provided by the chosen transport protocol 6/48 Summary of the Properties of the Internet Transport Protocols Property UDP TCP Abstraction Message Stream Connection-based N Y Reliability (loss & duplication) N Y Order N Y Flow control N Y Number of recipients 1|n 1 ▶ The abstraction provided by TCP stems from the API, or is it intrinsic to the protocol? 7/48 TCP Reliability (Message loss) What does this mean? Can we assume all data sent through a TCP connection will be delivered to the remote end? 8/48 TCP Reliability (Message loss) What does this mean? Can we assume all data sent through a TCP connection will be delivered to the remote end? What if bad things happen? E.g.: ▶ Networking hardware misconfiguration or failures ▶ Unplugged cables ▶ Damaged cables, e.g. by road works or shark bites 8/48 TCP Reliability (Message loss) What does this mean? Can we assume all data sent through a TCP connection will be delivered to the remote end? What if bad things happen? E.g.: ▶ Networking hardware misconfiguration or failures ▶ Unplugged cables ▶ Damaged cables, e.g. by road works or shark bites What TCP guarantees is that the application will be notified if the local end is unable to communicate with the remote end ▶ Typically, send()/write() or recv()/read() will return an error code. E.g. ENOTCONN (the connection will be closed). ▶ TCP cannot guarantee that there is no data loss. It is up to the application to deal with this ▶ A web browser may just report the problem to the user ▶ If the application does not interface with the user, to try to connect again is a possibility But TCP does not re-transmit data that was lost in other connections 8/48 TCP Reliability (Message duplication) Why not always re-transmit messages that might have not been delivered? Issue The re-transmitted message may have been delivered before the connection was closed ▶ TCP is not able to filter data duplicated by the application ▶ Only duplicated TCP segments ▶ This may be an issue. E.g. if the duplicated data is a request for a non-idempotent operation such as: ▶ A credit/debit operation ▶ A purchase order In this case the application may need to synchronize with the remote end to learn if there was some data loss in either direction 9/48 RPC: the Idea Local procedure call: Code For Code For Code For Main Program Procedure A Procedure B main Call A Call B Exit Return Return Remote procedure call: Main Program Procedure A Procedure B On machine 1 On machine 2 On machine 3 (client) (server) (server) main call remote call remote proc. B proc. A Respond Respond to caller Exit to caller 10/48 Typical Architecture of an RPC System client process server process client program server functions rpc rpc ordinary ordinary call return call return client stub server stub marshaled marshaled marshaled marshaled request return request return network services network services client kernel server kernel Obs. RPC is typically implemented on top of the transport layer (TCP/IP) 11/48 Client Stub Request 1. Assembles message: parameter marshalling 2. Sends message, via write()/sendto() to server 3. Blocks waiting for response, via read()/recvfrom() ▶ Not in the case of asynchronous RPC Response 1. Receives responses 2. Extracts the results (unmarshalling 3. Returns to client ▶ Assuming synchronous RPC 12/48 Server Stub Request 1. Receives message with request, via read()/recvfrom() 2. Parses message to determine arguments (unmarshalling) 3. Calls function Response 1. Assembles message with the return value of the function 2. Sends message, via write()/sendto() 3. Blocks waiting for a new request 13/48 RPC ▶ RPC is a very useful communications paradigm ▶ Programming distributed applications with (non-asynchronous) RPC would be almost as simple as programming non-distributed applications, if it were not for failures ▶ How would failures affect the above time-diagrams? ▶ There are several more or less recent implementations: RPC libraries gRPC (Google), Avro (Apache Software Foundation (ASF)), Thrifty (originally Facebook, now ASF) Languages supporting RPC Java, Go, Erlang ▶ However it has its own limitations ▶ RPC is not always the best approach ▶ It is great for request-reply communication patterns, but even then there may be better alternatives 14/48 Asynchronous Communication Problem: Communication using TCP/UDP (explicitly using messages or via RPC) require synchronization between sender and receiver ▶ Synchronization requires waiting, leading to wasted time Solution: Use asynchronous communication ▶ The communicating parties need not be active simultaneously Sender Sender Sender Sender running running passive passive Receiver Receiver Receiver Receiver running passive running passive (a) (b) (c) (d) 15/48 Roadmap Message-based communication Asynchronous Communication (MOM) Concept Java Message Service Implementation Further Reading 16/48 Message Oriented Middleware (MOM) ▶ Asynchronous message-based communication ▶ Sender and receiver need not synchronize with one another to exchange messages ▶ Communication service (middleware) stores the messages as long as needed to deliver them ▶ The service is close to that of the (snail) mail service: ▶ The service guarantees may vary: ▶ order; ▶ reliability; ▶ Some MOM provides also an abstraction similar to discussion fora/news groups ▶ publishers may send messages ▶ subscribers may receive messages. 17/48 MOM: Basic Patterns P1 P1 C1 C1 P2 P2 C2 C2 P3 P3 Point-to-point The model is that of a queue. ▶ Several senders can put messages in a queue ▶ Several receivers can get messages from a queue But each message is delivered to at most one process (receiver) Publish-subscriber This is more like a discussion forum. Instead of queues we talk about topics ▶ Several publishers can put messages in a topic ▶ Several subscribers can get messages from a topic Unlike in queues a message may be delivered to more than one process (subscriber) 18/48 Messaging Service Implementation ▶ Asynchronous communication is provided by a messaging service Messaging interface Sending host Communication server Communication server Receiving host Buffer independent Routing of communicating Routing Application program hosts Application program To other (remote) communication server OS OS OS OS Local network Internetwork Local buffer Local buffer Incoming message ▶ Other deployments, e.g. with a single communication server, are possible ▶ At the lowest communication level, there must be synchronization between sender and receiver 19/48 Asynchronous Communication Applications ▶ This type of communication is appropriate for applications when the sender and receiver are loosely coupled. Some examples: Enterprise Application Integration Workflow applications Microservices Message based communication between people ▶ Email, SMS; ▶ Instant (real-time) messaging; 20/48 Java Message Service (JMS) ▶ JMS is an API for MoM, originally specified for J2EE (now Jakarta EE, because Oracle holds the Java trademark): ▶ It allows Java applications to access MOM in a portable way ▶ It provides a maximum common divisor of the functionality provided by well known MOM providers (IBM MQSeries, TIBCO) ▶ JMS is representative of the MOM functionalities that may be useful for developing enterprise applications ▶ JMS can be integrated with the Java Transaction Service, and therefore take advantage of transactions Note JMS has been replaced by Jakarta Messaging. But In these transparencies we still use JMS ▶ Actually, the API still uses JMS ▶ Jakarta Messaging 3.1 Specification is mostly identical to JMS Specification v2.0 rev. A 21/48 JMS Architecture and Model ▶ JMS supports two types of destinations: ▶ Queues (for single-destination communication) ▶ Topics (for multi-destination communication) ▶ JMS defines 2 fundamental components: JMS Provider i.e. the MOM service implementation; ▶ It includes client-side libraries JMS Client i.e. an application that sends/receives messages to a destination via a JMS provider ▶ JMS specifies the API, and its semantics, that a JMS provider offers to a client ▶ To use the JMS, a client must first set up a connection to the provider ▶ This is not a TCP connection, but it may be built on top of TCP ▶ Clients send/receive messages to/from destinations in the context of a session, ▶ Sessions are created in the context of a connection 22/48 JMS Model Source: Sun ▶ Each of these "boxes" corresponds to a Java type (i.e. class or interface) defined in JMS’s classic API (specified in JMS 1.1) ▶ Jakarta Messaging 3.1 provides a simplified API, which was introduced in JMS 2.0 23/48 JMS Messages ▶ JMS messages have 3 parts: Header: is a set of fields necessary for identifying and routing messages; ▶ This set is defined in the JMS specification ▶ JMSDeliveryMode, JMSMessageId, JMSExpiration, JMSRedelivered, JMSPriority are some of the 11 header fields Properties: these are optional fields that logically belong to the header – i.e. they are meta-data ▶ A property is a (key,value) pair key is a string, which must obey some rules value can be of one of several primitive types, as well as String or Object classes ▶ Properties are defined by the applications ▶ Essentially they are an extension mechanism, allowing a client to add fields to the header Body: data to exchange. Can be typed. ▶ JMS does not specify the format of the messages on the wire ▶ JMS specifies an API not a protocol 24/48 JMS Queues P1 C1 P2 C2 P3 ▶ Match the queue model described above: ▶ Several senders/producers can put messages in a queue ▶ Several receivers/consumers can get messages from a queue But each message is delivered to at most one receiver/consumer This helps improve scalability ▶ Queues are long lived ▶ are created by an administrator, not the clients ▶ are always available to receive messages, even if there are no active receivers ▶ this is critical for decoupling senders from receivers with exception of temporary queues (each JMS connection may have one temporary queue) 25/48 JMS Queues: Communication Semantics (1/2) Blocking Non-Blocking Asynchronous send() Y via callback receive() Y via timeout via callback send() Blocking send() blocks until message is sent ▶ Client may have to synchronize with JMS server (see below) Asynchronous callback is executed after sending the message or after synchronization with JMS server receive() may have timeout argument (in blocking mode) Non-blocking with 0 valued timeout (or via receiveNoWait()) Asynchronous callback is executed upon message reception 26/48 JMS Queues: Communication Semantics (2/2) Reliability depends mostly on the delivery mode, which may be set per message, e.g. in the send() call: PERSISTENT ensures once-and-only-once semantics, i.e. the failure (?crash?) of the JMS provider must not cause a message to be lost or to be delivered twice. ▶ Requires the JMS server to store the message in non-volatile storage ▶ Requires the client to synchronize with the JMS server NON_PERSISTENT ensures at-most-once semantics ▶ Message needs not survive a JMS server crash ▶ But JMS is expected to tolerate common network failures Essentially, these alternatives provide different trade-offs between reliability and performance ▶ If multiple clients consume messages from a given queue, then a client may not receive all messages. 27/48 JMS Queues: PERSISTENT Deliv. Implement. (1/5) Implementing PERSISTENT delivery is not trivial ▶ A distributed system is characterized by partial failures Let’s assume for simplicity’s sake, that there is just one server between producer and consumer P C The channel between a client (either producer or consumer) and the JMS server can loose messages, even if the provider uses TCP for communication with the server. E.g.: ▶ The message producer sends the message, but communication problems cause the message to be lost and... ▶ Producer must receive confirmation from JMS server ▶ A similar scenario may happen in the channel between the JMS server and the message consumer ▶ The consumer must acknowledge the reception, before the JMS server can dispose of the message 28/48 JMS Queues: PERSISTENT Deliv. Implement. (2/5) Consumer acknowledgment behavior is set per session. ▶ Consumer acknowledgment is used to ensure that a message is delivered to one consumer There are 3 (+1, as we’ll explain below) modes: AUTO_ACKNOWLEDGE the JMS session, i.e. the provider, automatically acknowledges upon a successful return from either receive() or the reception callback DUPS_OK_ACKNOWLEDGE the JMS session lazily acknowledges the delivery of messages. ▶ The provider may deliver a message, without sending an ACK to the server CLIENT_ACKNOWLEDGE it is up to the client to acknowledge the delivery of messages. ▶ The provider does not send ACK, the client does it whenever it sees fit, by calling the acknowledge method of Message ▶ Acknowledgment of a message, implicitly acknowledges previously received messages 29/48 JMS Queues: PERSISTENT Deliv. Implement. (2/5) Consumer acknowledgment behavior is set per session. ▶ Consumer acknowledgment is used to ensure that a message is delivered to one consumer There are 3 (+1, as we’ll explain below) modes: AUTO_ACKNOWLEDGE the JMS session, i.e. the provider, automatically acknowledges upon a successful return from either receive() or the reception callback Server Consumer JMS provider JMS client recv() msg.req msg msg.ack 29/48 JMS Queues: PERSISTENT Deliv. Implement. (2/5) Consumer acknowledgment behavior is set per session. ▶ Consumer acknowledgment is used to ensure that a message is delivered to one consumer There are 3 (+1, as we’ll explain below) modes: AUTO_ACKNOWLEDGE the JMS session, i.e. the provider, automatically acknowledges upon a successful return from either receive() or the reception callback DUPS_OK_ACKNOWLEDGE the JMS session lazily acknowledges the delivery of messages. ▶ The provider may deliver a message, without sending an ACK to the server CLIENT_ACKNOWLEDGE it is up to the client to acknowledge the delivery of messages. ▶ The provider does not send ACK, the client does it whenever it sees fit, by calling the acknowledge method of Message ▶ Acknowledgment of a message, implicitly acknowledges previously received messages 29/48 JMS Queues: PERSISTENT Deliv. Implement. (3/5) Issue: upon recovery of the server there may be uncertainty wrt message delivery. Before the failure, some messages ▶ May have not been sent ▶ Or their acknowledgment may have been lost Example Assumptions: Client uses receive() without callback and AUTO_ACKNOWLEDGE Server uses non-volatile storage only to save in-transit messages Server Consumer Server Consumer JMS provider JMS client JMS provider JMS client recv() recv() msg.req msg.req msg msg.ack 30/48 JMS Queues: PERSISTENT Deliv. Implement. (3/5) Issue: upon recovery of the server there may be uncertainty wrt message delivery. Before the failure, some messages ▶ May have not been sent ▶ Or their acknowledgment may have been lost Example Assumptions: Client uses receive() without callback and AUTO_ACKNOWLEDGE Server uses non-volatile storage only to save in-transit messages Server Consumer Server Consumer JMS provider JMS client JMS provider JMS client recv() recv() msg.req msg.req What ifmsg the provider saves other state in non-volatile storage? msg.ack 30/48 JMS Queues: PERSISTENT Deliv. Implement. (4/5) Upon failure of the consumer the provider may be uncertain about message delivery. Some messages ▶ May have not been delivered ▶ Or their acknowledgment may have been lost Server Consumer Server Consumer JMS provider JMS client JMS provider JMS client recv() recv() msg.req msg.req msg msg msg.ack ▶ Also possible (with AUTO_ACKNOWLEDGE): JMS server receives ACK, but consumer crashes before processing message 31/48 JMS Queues: PERSISTENT Deliv. Implement. (5/5) Upon recovery the provider may be uncertain about message delivery. Some messages ▶ May have not been delivered ▶ Or their acknowledgment may have been lost Solution: the provider resends messages whose delivery it is uncertain about with: JMSRedelivered header field set; JMSXDeliveryCount header property with the appropriate value ▶ It should be incremented every time the provider may have delivered the message ▶ For a redelivered message its value must be larger or equal to 2. Consumer learn about messages that it may have delivered before the failure ▶ The application may have more information than the JMS provider to deal with this uncertainty 32/48 JMS Queues: Once-and-only-once Guarantees ▶ JMS’ once-and-only-once delivery is not exactly once ▶ If a message may be a duplicate, the consumer is notified. ▶ Apache’s Kafka claims to support exactly once delivery ▶ Neha Narkhede, Exactly-Once Semantics Are Possible: Here’s How Kafka Does It ▶ But Kafka has the advantage that its topics are logs (implemented on non-volatile storage) whose records (messages/events) may persist for a long time (configurable) ▶ A consumer can read a record, even if it subscribes to a topic long after that record was added to the topic ▶ Each consumer has an offset pointing to the next message in a topic (more or less), and it can reset it to another value. ▶ Exactly once delivery on Kafka is built on top of: Idempotent send operations with no duplication on one partition ▶ A Kafka topic may have several partitions, each of which is a log Transactions to write atomically to different partitions ▶ Writing happens not only on a send, but also on a receive (as it modifies the partition’s offset) 33/48 JMS Queues: Session-based Transactions ▶ Session-based transactions are the fourth session mode. I.e., it (SESSION_TRANSACTED) cannot be used with the other session modes, e.g. AUTO_ACKNOWLEDGE ▶ All messages sent/received within a session execute in the scope of a transaction ▶ Essentially, operations in a transaction are the send()/receive(), or related callbacks, of the JMS API ▶ The key property here is atomicity ▶ To terminate the current transaction (and start the next one), a JMS client must call commit() so that all messages sent are added to the destination queue(s), and all messages received are acknowledged rollback() to cancel the sending of all messages sent in the scope of the current transaction, as well as the delivery of all messages received (what does this mean?) 34/48 JMS Queues: Distributed Transactions Session-based transactions just ensure atomicity of the send/receive operations of the JSM API ▶ The uncertainty regarding the delivery of messages may still occur ▶ E.g., if there is an error in commit() Distributed transactions provide stronger guarantees especially wrt processing ▶ Does not avoid uncertainty about message delivery in some failure scenarios ▶ Requires the use of the Java TransactionsAPI ▶ A JMS server should support the XAResource API, i.e. play a role similar to a DB. 35/48 JMS Queues: Message Order Order JMS also provides some order guarantees. ▶ Messages sent in the context of a session to a queue are delivered in the sending order ▶ This applies only to messages with the same delivery mode, e.g. NON_PERSISTENT message may be delivered ahead of an earlier PERSISTENT message ▶ However, it makes no guarantees wrt messages sent by different sessions In addition, these guarantees are affected by other JMS features: ▶ Higher priority messages may jump-ahead of lower priority ▶ Messages with later delivery time may be delivered after messages with an earlier delivery time ▶ Message selectors (provided as arguments of receive()) may also affect the order in which messages are delivered Also note that: ▶ If multiple clients consume messages from a queue, then a client may not receive all messages. 36/48 JMS Topics P1 C1 P2 C2 P3 ▶ Support the publish-subscribe pattern, as defined above: ▶ Several publishers can put messages in a topic ▶ Several subscribers can get messages from a topic Unlike in queues a message is delivered to more than one process (subscriber) ▶ Topics are long lived (like queues) ▶ are created by an administrator, not the clients ▶ are always available to receive messages, even if there are no active receivers ▶ this is critical for decoupling senders from receivers with exception of temporary topics (each JMS connection may have one temporary topic) 37/48 JMS Topics: sending and receiving messages ▶ Sending/receiving messages to/from a topic use the same API as that used for queues: Blocking Non-Blocking Asynchronous send() Y via callback receive() Y via timeout via callback send() ▶ Sender may specify (both for topics and queues) Earliest delivery time message cannot be delivered before (via a MessageProducer’s setDeliveryDelay method) Latest delivery time message should be dropped (via send()’s TTL argument) receive() ▶ Receiver may filter messages (both from topics or queues) using a Message selector condition on the values of header fields and message properties, passed as argument when a MessageConsumer is created 38/48 JMS Topic Subscription ▶ Consumers use subscriptions to receive (all) messages sent to the respective topics ▶ If a message selector is specified, some messages may be skipped ▶ A subscription may be: Unshared can have only one active consumer at a time Shared can have more than one active consumer ▶ Each message is delivered to only one consumer ▶ This helps improve scalability ▶ Furthermore, a subscription can be: Durable once created it exists until explictly deleted Non-Durable exists only while there is an active consumer ▶ But the topic continues to exist ▶ Subscription identification (different from topic identification): Unshared Shared Non-durable – Name [+ Client id] Durable Name + Client id Name [+ Client id] 39/48 JMS Topic Subscription and Reliability ▶ Message reliability depends both on the message’s delivery mode and on the durability of the subscription: Non-durable Durable at-most-once NON_PERSISTENT at-most-once (missed if inactive) once-and-only-once PERSISTENT once-and-only-once (missed if inactive) ▶ Durable subscriptions provide same guarantees as queues ▶ Like for queues, no duplication guarantees do not hold on session recovery (JMS 2.0 Rev. A, Sec. 6.2.11 & Sec. 6.2.12) ▶ It is up to the client/application to filter duplicates ▶ Asynchronism of subscribers and publishers and communication latency (JMS 2.0 Rev. A, Sec. 4.2.3): ▶ a message sent after a subscription may not be delivered; ▶ a message sent before a subscription may be delivered. Note Section numbers in Jakarta Messaging 3.1 are the same 40/48 JMS Topic Message Consumption Order ▶ The general guarantees for message delivery order are similar to those for queues: ▶ Messages sent by a session to a topic are delivered in the sending order ▶ Remember this applies only to messages with same delivery mode ▶ But these guarantees are affected by other JMS features: ▶ A message may jump-ahead of another with lower priority ▶ The delivery time of a message may also change the delivery order ▶ And so do message selectors. ▶ Furthermore, the order of delivery in a subscription may not match that in another subscription of the same topic (JMS 2.0 Rev. A, Sec. 6.2.9.1) ▶ Message delivery order is time dependent, and is outside of control of the client application. 41/48 JMS... JMS is not a service but an API ▶ Oracle’s J2EE implementation comprises a JMS provider. ▶ Open Message Queue is a reference implementation of a JMS provider ▶ There are several other MoM that support JMS, e.g. IBM MQS and Amazon’s SQS JMS does not support ▶ Fault-tolerance/load balancing, i.e. does not specify how clients implementing a critical service cooperate ▶ Error notification, i.e. messages for reporting problems or system events to clients. ▶ JMS Provider administration ▶ Security – i.e. it does not offer an API to manage security attributes of exchanged messages JMS is not a protocol nor specifies one ▶ Need to be careful to avoid deployment problems, when different clients use different JMS providers 42/48 JMS: Portability vs. Interoperability ▶ JMS is an API ▶ Promotes the portability of Java applications that use MOM ▶ A client that uses the JMS API, can use any JMS provider ▶ JMS is not a protocol ▶ JMS does not guarantee interoperability ▶ I.e. that a JMS provider can communicate with another JMS provider ▶ This may be a limitation when we need to integrate different JMS providers Messaging interface Sending host Communication server Communication server Receiving host Buffer independent Routing of communicating Routing Application program hosts Application program To other (remote) communication server OS OS OS OS Local network Internetwork Local buffer Local buffer Incoming message 43/48 Message Queuing Protocols AMQP Advanced Message Queuing Protocol, is an open-standard protocol first aproved by OASIS and later by ISO/IEC MTTQ at some point it was the acronym of Message Queuing Telemetry Transport, i.e. a protocol designed for industrial applications, is also an OASIS transport ▶ Nowadays, it is being proposed for IoT applications OpenWire is a public protocol used by Apache ActiveMQ (which provides a JMS API) ▶ But ActiveMQ also supports AMQP, MTTQ and other protocols 44/48 Architecture ▶ Larger scale systems may use message relays to route messages to their destinations ▶ E.g. if applications/services run on different data centers Sender A Application Application Receive queue R2 Message Send queue Application R1 Receiver B Application Router ▶ This architecture is very similar to that of SMTP, although nowadays almost every e-mail message just traverses two servers 45/48 Message Brokers ▶ MOM is often used for enterprise application integration. Sometimes: ▶ These applications may have been designed independently ▶ The syntax of the messages used by each of them may be different from one another ▶ Message brokers convert the format of the messages used by one application to the format used by another application ▶ Strictly, they are not part of the communication service Repository with conversion rules Source client Message broker and programs Destination client Broker program Queuing layer OS OS OS Network 46/48 Roadmap Message-based communication Asynchronous Communication (MOM) Concept Java Message Service Implementation Further Reading 47/48 Further Reading ▶ van Steen and Tanenbaum, Distributed Systems, 3rd Ed. ▶ Section 4.3 Message-oriented communication ▶ Jakarta, Jakarta Messaging v3.1 ▶ Essentially identical to: ▶ Oracle, JMS Specification v2.0 rev. A 48/48 Replication and Consistency Models Pedro F. Souto ([email protected]) October 1, 2024 1/16 Data Replication Replicate data at many nodes Performance local reads Reliability no data-loss unless data is lost in all replicas Availability data available unless all replicas fail or become unreachable Scalability balance load across nodes for reads 2/16 Data Replication: Challenge Upon an update Issue The values of the different replicas will be different R1 5 3 R2 5 R3 5 Solution Run some protocol that ▶ Pushes data to all replicas Challenge ensure data consistency Note Right now, we will focus on concurrency ▶ Failures make this even more challenging 3/16 Conflicts Observation Updating at different replicas may lead to different results, i.e. inconsistent data R1 5 3 7 R2 5 3 7 R3 5 7 3 4/16 Strong Consistency All replicas execute/apply updates in the same order ▶ Deterministic updates: same initial state leads to same result R1 5 3 3 7 R2 5 3 7 R3 5 7 3 7 coordinate Actually, total order is not enough: it must be sensible 5/16 Strong Consistency Models Sequential Consistency Serializability Linearizability 6/16 Sequential Consistency Model (Lamport 79) Definition An execution is sequential consistent iff it is identical to a sequential execution of all the operations in that execution such that ▶ all operations executed by any thread, appear in the order in which they were executed by the corresponding thread Observation This is the model provided by a multi-threaded system on a uniprocessor Counter-example Consider the following operations executed on two replicas of variables x and y , whose initial values are 2 and 3, respectively Répl. 1 Répl. 2 (2,3) (2,3) x = y + 2; y = x + 3; (5,5) (5,5) If the two operations are executed sequentially, the final result cannot be (5, 5) 7/16 Sequentially Consistent Execution Array Data type Read(a) read value of array’s element/index a Write(a, v) write value v to array’s element/index a Snapshot() read all values stored in the array Execution for an array with 4 elements (initial value [0, 0, 0, 0]) ▶ reads/snapshots access only one replica ▶ writes access more than one replica, but return immediately write(1,5) read(1) 1. write(1, 5) 5 read(2) 2. read(1) write(2,7) 0 3. read(2) write(3,2) snapshot() 4. write(2, 7) [0, 5, 7, 0] 5. snapshot() C1 R1 Rn C2 6. write(3, 2) src: Fekete and Ramamritham 09 ▶ What other ordering would be possible? 8/16 Sequential Consistency Is Not Composable ▶ Consider two arrays, u and v , each with 2 elements; ▶ Assume that we use a sequential consistent protocol to replicate each of the arrays, so that each array is sequential consistent u.write(1,5) Array u 1. u.write(0, 8) 2. u.snapshot() u.write(0,8) 3. u.write(1, 5) u.snapshot() [8, 0] C1 R1 Rn C2 src: Fekete and Ramamritham 09 9/16 Sequential Consistency Is Not Composable ▶ Consider two arrays, u and v , each with 2 elements; ▶ Assume that we use a sequential consistent protocol to replicate each of the arrays, so that each array is sequential consistent v.write(0,7) v.write(1,2) v.snapshot() Array v [0, 2] 1. v.write(1, 2) 2. v.snapshot() C1 R1 Rn C2 src: Fekete and Ramamritham 09 3. v.write(0, 7) 9/16 Sequential Consistency Is Not Composable ▶ Consider two arrays, u and v , each with 2 elements; ▶ Assume that we use a sequential consistent protocol to replicate each of the arrays, so that each array is sequential consistent u.write(1,5) Array u v.write(0,7) 1. u.write(0, 8) v.write(1,2) 2. u.snapshot() u.write(0,8) 3. u.write(1, 5) u.snapshot() Array v v.snapshot() [8, 0] [0, 2] 1. v.write(1, 2) C R 1 1 R n C 2 2. v.snapshot() src: Fekete and Ramamritham 09 3. v.write(0, 7) ▶ The combined execution may not be sequential consistent ▶ We can show that it is not possible to merge these two sequences into a single one that is sequential consistent. 9/16 Linearizability (Herlihy&Wing90) Def. An execution is linearizable iff it is sequential consistent and ▶ if op1 occurs before op2 , according to one omniscient observer, then op1 appears before op2 Assumption Operations have: start time finish time measured on some clock accessible to the omniscient observer ▶ op1 occurs before op2, if op1 ’s finish time is smaller than that op2 ’s start time ▶ If op1 and op2 overlap in time, their relative order may be any Important Even though the specification uses the concept of omniscient observer, which does not exist in a distributed system ▶ Communication delay is different from 0 and depends on distance (think speed of ligh and relativity) this does not mean that we cannot implement linearizability ▶ Usually, we add synchronization 10/16 Linearizable Execution of an Array Array Data type Read(a) read value of array’s element/index a Write(a, v) write value v to array’s element/index a Snapshot() read all values stored in the array Execution for an array with 4 elements (initial value [0, 0, 0, 0]) write(1,5) read(1) 1. write(1, 5) 5 read(2) write(2,7) 2. read(1) 0 3. read(2) write(3,2) snapshot() 4. write(2, 7) [0, 5, 7, 0] 5. snapshot() 6. write(3, 2) C1 R1 Rn C2 src: Fekete and Ramamritham 09 ▶ Is not linearlizable ▶ write(3,2) on C2 occurs before snapshot on C1 11/16 Linearizable Execution ▶ To ensure linearizability we need additional synchronization ▶ e.g. add an "ack" to the write operation write(1,5) read(1) 1. write(1, 5) 5 read(2) write(2,7) 2. read(1) 0 3. read(2) write(3,2) snapshot() 4. write(2, 7) [0, 5, 7, 0] 5. snapshot() 6. write(3, 2) C1 R1 Rn C2 src: Fekete and Ramamritham 09 ▶ write(3,2) on C2 is now concurrent with snapshot on C! ▶ Even though its finish time is earlier than that of snapshot 12/16 Linearizability is composable ▶ We will not demonstrate it here, you will have to accept it ▶ Why doesn’t the execution we have presented earlier show that linearizability is not composable? ▶ Even if the writes tookß longer supposedly because of additional synchronization u.write(1,5) Array u v.write(0,7) 1. u.write(0, 8) v.write(1,2) 2. u.snapshot() u.write(0,8) 3. u.write(1, 5) u.snapshot() v.snapshot() Array v [8, 0] [0, 2] 1. v.write(1, 2) C1 R1 Rn C2 2. v.snapshot() src: Fekete and Ramamritham 09 3. v.write(0, 7) 13/16 One-copy Serializability (Transaction-based Systems) Definition The execution of a set of transactions is one-copy serializable iff its outcome is similar to the execution of those transactions in a single copy Observation 1 Serializability used to be the most common consistency model used in transaction-based systems ▶ DB systems nowadays provide weaker consistency models to achieve higher performance Observation 2 This is essentially the sequential consistency model, when the operations executed by all processors are transactions ▶ The isolation property ensures that the outcome of the concurrent execution of a set of transactions is equal to some sequential execution of those transactions Observation 3 (Herlihy... sort of) Whereas Serializability Was proposed for databases, where there is a need to preserve complex application-specific invariants Sequential consistency Was proposed for multiprocessing, where programmers are expected to reason about concurrency 14/16 Weak Consistency ▶ Strong consistency models usually make it easy to reason about replicated systems ▶ Essentially, they strive to give the illusion of a non-replicated system ▶ However, their implementation usually requires tight synchronization among replicas, this adversely affects: ▶ Scalability (performance) ▶ Availability ▶ Weakly consistency models strive to: ▶ Improve scalability and availability ▶ While providing a set of guarantees that is useful for their users ▶ Weak consistency models are usually application domain dependent ▶ We (Prof. Carlos Baquero) will discuss some weak consistency models later in the course 15/16 Further Reading ▶ Fekete A.D., Ramamritham K. (2010) Consistency Models for Replicated Data. In: Charron-Bost B., Pedone F., Schiper A. (eds) Replication. Lecture Notes in Computer Science, vol 5959. pp. 1-17 ▶ van Steen and Tanenbaum, Distributed Systems, 3rd Ed. ▶ Section 7.2 Data-centric consistency models ▶ Section 7.3 Client-centric consistency models 16/16 Replication for Fault Tolerance Quorum Consensus Pedro F. Souto ([email protected]) October 2, 2024 1/23 Roadmap Quorums and Quorum Consensus Replication Ensuring Consistency with Transactions Playing with Quorums Dynamo Quorums Further Reading 2/23 Quorum Consensus Protocols ▶ Each (replicated) operation (e.g. read/write) requires a quorum ▶ This is a set of replicas ▶ The fundamental property of these quorums is that ▶ If the result of one operation depends on the result of another, then their quorums must overlap, i.e. have common replicas ▶ A simple way to define quorums is to consider all replicas as peers. ▶ In this case quorums are determined by their size, i.e. the number of replicas in the quorum ▶ This is equivalent to assign 1 vote to each replica ▶ In his work, Gifford proposed the use of weighted voting, i.e. the assignment of different votes to each replica, so as to obtain different trade-offs between performance and availability of the different operations 3/23 Read/Write Quorums Must Overlap ▶ The replicas provide only read and write operations ▶ These operations apply to the whole object ▶ Because the output of a read operation depends on previous write operations, the read quorum must overlap the write quorum: NR + NW > N, where NR is the size of the read quorum NW is the size of the write quorum N is the number of replicas Read quorum A B C D A B C D A B C D E F G H E F G H E F G H I J K L I J K L I J K L NR = 3, N W = 10 NR = 7, NW = 6 NR = 1, N W = 12 Write quorum (a) (b) (c) 4/23 Quorum Consensus Implementation IMP Each object’s replica has a version number Read Client 1. Polls a read quorum, to find out the current version ▶ A server replies with the current version 2. Reads the object value from an up-to-date replica. ▶ If the size of the object is small, it can be read as the read quorum is assembled Write Client 1. Polls a write quorum, to find out the current version ▶ A server replies with the current version 2. Writes the new value with the new version to a write quorum ▶ We assume that writes modify the entire object, not parts of it IMP A write operation depends on previous write operations (via the version) and therefore write quorums must overlap: NW + NW > N ▶ Quorum b) above, (NR = 7, NW = 6, N = 12) violates this requirement ▶ This is not needed if, in step 1, client polls a read quorum 5/23 Naïve Implementation with Faults A : (1, 2.3) B : (1, 2.3) C : (1, 2.3) ▶ N = 3, NR = 2, NW = 2 get_version ▶ First/left client attempts to write, but because of a partition it 1 1 updates only one replica (A) (2, 5.4) 6/23 Naïve Implementation with Faults A : (1, 2.3) B : (1, 2.3) C : (1, 2.3) ▶ N = 3, NR = 2, NW = 2 get_version ▶ First/left client attempts to write, but because of a partition it 1 1 updates only one replica (A) get_version ▶ Second/right client, in different partition, attempts to write and it (2, 5.4) 1 succeeds. 1 ▶ Variable has different values for the same version. (2, 1.7) (2, 1.7) 6/23 Naïve Implementation with Faults A : (1, 2.3) B : (1, 2.3) C : (1, 2.3) ▶ N = 3, NR = 2, NW = 2 get_version ▶ First/left client attempts to write, but because of a partition it 1 1 updates only one replica (A) get_version ▶ Second/right client, in different partition, attempts to write and it (2, 5.4) 1 succeeds. 1 ▶ Variable has different values for the same version. get_version (2, 1.7) (2, 1.7) ▶ The partition heals and each get_version 2 client does a read 2 2 ▶ Each client gets a value different 2 from the one it wrote. read ▶ I.e. protocol does not ensure 1.7 read-your-writes read 5.4 6/23 Naïve Implementation with Concurrent Writes ▶ N = 3, NR = 2, NW = 2 A : (1, 2.3) B : (1, 2.3) C : (1, 2.3) ▶ Two clients attempt to write the get_version get_version replicas at more or less the same time 1 1 1 ▶ The two write quorums are not 1 equal, even though they overlap (2, 5.4) (2, 5.4) ▶ Again, replicas end up in an (2, 1.7) (2, 1.7) inconsistent state. 7/23 Naïve Implementation with Concurrent Writes ▶ N = 3, NR = 2, NW = 2 A : (1, 2.3) B : (1, 2.3) C : (1, 2.3) ▶ Two clients attempt to write the get_version get_version replicas at more or less the same time 1 1 1 ▶ The two write quorums are not 1 equal, even though they overlap (2, 5.4) (2, 5.4) ▶ Again, replicas end up in an get_version (2, 1.7) (2, 1.7) inconsistent state. get_version 2 ▶ Soon after, each client does a 2 2 read read 2 ▶ Each client gets a value different 1.7 from the one it wrote. read 5.4 7/23 Roadmap Quorums and Quorum Consensus Replication Ensuring Consistency with Transactions Playing with Quorums Dynamo Quorums Further Reading 8/23 Ensuring Consistency with Transactions (1/2) ▶ Gifford assumes the use of transactions, which use two-phase commit, or some variant ▶ The write (or read) of each replica is an operation of a distributed transaction ▶ We can view the sequence of operations in a replica on behalf of a distributed transaction as a sub-transaction on that replica ▶ If the write is not accepted by at least a write quorum, the transaction aborts A : (1, 2.3) B : (1, 2.3) C : (1, 2.3) ▶ The left client will not get 〈τ1 , get_erson〉 〈τ2 : get_erson〉 the vote from replica B and therefore it will abort 〈τ1 : 1〉 〈τ2 : 1〉 transaction τ1 〈τ1 : 1〉 〈τ2 : 1〉 ▶ The state of replica A will not be changed 〈τ1 : (2, 5.4)〉 ▶ On the other hand, 〈τ2 : (2, 1.7)〉 〈τ : (2, 1.7)〉 2 transaction τ2 commits, and 2-phase commit 2-phase commit its write will be effective. (1, 2.3) (2, 1.7) (2, 1.7) 9/23 Ensuring Consistency with Transactions (2/2) ▶ Transactions also prevent consistencies in the case of concurrent writes ▶ Transactions ensure isolation, by using concurrency control ▶ Lets assume the use of locks ▶ Most likely, version-based (optimistic) CC is a better match A : (1, 2.3) B : (1, 2.3) C : (1, 2.3) ▶ Server B processes the 〈τ1 : get_erson〉 LHS client write request 〈τ2 : get_erson〉 first, and tries to acquire a 〈τ1 : 1〉 〈τ2 : 1〉 write lock on behalf of τ1 , 〈τ1 : 1〉 〈τ2 : 1〉 but τ2 is holding a read lock ▶ Likewise for write request 〈τ1 : (2, 5.4)〉 from the RHS client 2-phase commit 〈τ2 : (2, 1.7)〉 ▶ Upon commit of τ1 , server B (2, 5.4) (2, 5.4) 〈τ : (2, 1.7)〉 2 detects deadlock, and 2-phase commit (locally) aborts τ2 (2, 5.4) (1, 2.3) ▶ The outcome of the two-phase commit of τ2 will be abort, because server B has aborted τ2 in order to commit τ1 10/23 XA-based Quorum Consensus Implementation IMP Each object’s access is performed in the context of a transaction Read Client 1. Polls a read quorum, to find out the current version ▶ There is no need to read the object’s state ▶ Only the first time the transaction reads the object 2. Reads the object state from an up-to-date replica. ▶ Only the first time the transaction reads the object Write (supporting partial writes) Client: 1. Polls a write quorum, to find out the current version and which replicas are up-to-date ▶ On the first time the transaction writes the object ▶ Object state may have to be read from an up-to-date replica ▶ Replicas may have to be updated 2. Writes the new value with the new version ▶ All writes by a transaction are applied to the same replicas ▶ Because these will be the only ones with an up-to-date version 11/23 Transaction-based Quorum Consensus Replication ▶ Transactions solve both the problem of failures and concurrency. ▶ Transactions can also support more complex computations: ▶ E.g. with multiple operations and/or multiple replicated objects ▶ But, transactions also have problems of their own: Deadlocks are possible, if transactions use locks ▶ Can deadlock also occur when a transaction comprises a single operation on one object? ▶ Other concurrency control approaches, e.g. optimistic CC based on timestamps (or versions), may be used ▶ These also have trade-offs Blocking if transactions use two-phase commit ▶ If the coordinator fails at the wrong time, the participants, i.e. the servers, may have to wait for the coordinate to recover ▶ Meanwhile, the objects accessed by such a transaction may become inaccessible, causing aborts of other transactions ▶ It may be a good idea to use as coordinator proxy servers instead of clients, because the latter are failure-prone ▶ But this may reduce availability 12/23 Roadmap Quorums and Quorum Consensus Replication Ensuring Consistency with Transactions Playing with Quorums Dynamo Quorums Further Reading 13/23 Playing with Quorums (1/2) Read quorum A B C D A B C D A B C D E F G H E F G H E F G H I J K L I J K L I J K L NR = 3, N W = 10 NR = 7, NW = 6 NR = 1, N W = 12 Write quorum (a) (b) (c) ▶ By choosing NR and NW appropriately we can get different trade-offs of the performance/availability of the different operations. E.g.: ▶ The quorum in c) corresponds to a protocol known as read-one/write-all 14/23 Playing with Quorums (2/2) ▶ By assigning each replica its own number of votes, which may be different from one, weighted-voting provides extra flexibility. E.g., assuming the crash probability of each replica to be 0.01: source: Gifford79 ▶ In Example 1, the quorums are designed for performance rather than availability Question What is the advantage of a replica with 0 votes? 15/23 Quorum Consensus Fault Tolerance ▶ Quorum-consensus tolerates unavailability of replicas ▶ This includes unavailability caused by both process (replicas) failures and communication failures, including partitions ▶ Actually, quorum consensus replication does not require distinguishing between the two types of failure ▶ The availability analysis by Gifford relies on the probability of crashing of a replica/server ▶ But we can follow the standard approach to evaluate the resiliency of a fault-tolerant protocol in a distributed system Question Let f be the maximum number of replicas that may crash simultaneously. ▶ What is the minimum number of replicas that we need? ▶ Do we need to change the quorum constraints? (Assume 1 replica, 1 vote). 16/23 Roadmap Quorums and Quorum Consensus Replication Ensuring Consistency with Transactions Playing with Quorums Dynamo Quorums Further Reading 17/23 Dynamo ▶ Dynamo is a replicated key-value storage system developed at Amazon ▶ It uses quorums to provide high-availability ▶ Whereas Gifford’s quorums support a simple read/write memory abstraction, Dynamo supports an associative memory abstraction, essentially a put(key,value)/get(key) API ▶ Rather than a simple version number, each replica of a (key,value) pair has a version vector ▶ Dynamo further enhances high-availability, by using multi-version objects ▶ Thus sacrificing strong consistency under certain failure scenarios 18/23 Dynamo’s Quorums ▶ Each key is associated with a set of servers, the preference list ▶ The first N servers in this list are the main replicas ▶ The remaining servers are backup replicas and are used only in the case of failures ▶ Each operation (get()/put()) has a coordinator, which is one of the first N servers in the preference list. ▶ The coordinator is the process that executes the actions typically executed by the client in Gifford’s quorums ▶ As well as the actions required from a replica ▶ As in Gifford’s quorums: put(.) requires a quorum of W replicas get(.) requires a quorum of R replicas such that: R+W >N 19/23 Dynamo’s Quorums put(key,value,context) the coordinator: 1. Generates the version vector for the new version and writes the new value locally ▶ The new version vector is determined by the coordinator from the context, a set of version vectors 2. Sends the (key, value) and its version vector to the N first servers in the key’s preference list ▶ The put() is deemed successful if at least W–1 replicas respond get(key) the coordinator ▶ Requests all versions of the (key, value) pair, including the respective version vectors, from the remaining first N servers in the preference list ▶ On receiving the response from at least R–1 replicas, it returns all the (key,value) pairs whose version-vector are maximal ▶ If there are multiple pairs, the application that executed the get() is supposed reconcile the different versions and write-back the reconciled pair using put(). Without failures Dynamo provides strong consistency 20/23 Dynamo’s "Sloppy" Quorums and Hinted Handoff In the case of failures the coordinator may not be able to get a quorum from the N first replicas in the preference list To ensure availability the coordinator will try to get a sloppy quorum by enlisting the backup replicas in the preference list ▶ The copy of the (key, value) sent to the backup server has a hint in its metadata identifying the server that was supposed to keep that copy ▶ The backup server scans periodically the servers it is substituting ▶ Upon detecting the recovery of a server, it will attempt to transfer the copy of the (key,value) ▶ If it succeeds, the backup server will delete its local copy At the cost of consistency sloppy quorums do not ensure that every quorum of a get() overlaps every quorum of a put() Sloppy quorums are intended as a solution to temporary failures ▶ To handle failures with a longer duration, Dynamo uses a anti-entropy approach for replica synchronization 21/23 Roadmap Quorums and Quorum Consensus Replication Ensuring Consistency with Transactions Playing with Quorums Dynamo Quorums Further Reading 22/23 Further Reading ▶ David K. Gifford, Weighted Voting for Replicated Data, SOSP’79: Proceedings of the 7th ACM Symposium on Operating Systems Principles (SOSP’79), 1979, Pages 150-162 ▶ Section 4 describes several refinements of the basic idea (weighted voting) that allow to improve reliability or performance ▶ van Steen and Tanenbaum, Distributed Systems, 3rd Ed. ▶ Section 7.5.3: Replicated-Write Protocols ▶ Michael Whittaker, Aleksey Charapko, Joseph M. Hellerstein, Heidi Howard, Ion Stoica. Read-Write Quorum Systems Made Practical. In PaPoC ’21: Proceedings of the 8th Workshop on Principles and Practice of Consistency for Distributed Data. Pages 1-8 ▶ Giuseppe DeCandia, Deniz Hastorun, Madan Jampani, Gunavardhan Kakulapati, Avinash Lakshman, Alex Pilchin, Swaminathan Sivasubramanian, Peter Vosshall, and Werner Vogels. Dynamo: amazon’s highly available key-value store. In Proceedings of twenty-first ACM SIGOPS Symposium on Operating systems principles (SOSP ’07), 2007. Pages 205–220.23/23 Replication for Fault Tolerance Quorums-Consensus Replicated ADT Pedro F. Souto ([email protected]) October 8, 2024 1/21 Roadmap Initial and Final Quorums (Herlihy’s) Replicated ADTs Replicated Queue: an example of a replicated ADT Critical Evaluation 2/21 Quorum-Consensus and Replicated Abstract Data T. ▶ Herlihy proposed a generalization of quorum consensus to replicated abstract data types such as queues. Quorum for an operation is any set of replicas whose cooperation is sufficient to execute the operation. ▶ When executing an operation, a client: ▶ reads from an initial quorum ▶ writes to a final quorum ▶ For example, in the read operation, a client must read from some set of replicas, but its final quorum is empty. ▶ A quorum for an operation is any set of replicas that includes both an initial and a final quorum. ▶ Assuming that all replicas are considered equals, a quorum may be represented by a pair, (m, n), whose elements are the sizes of its initial, m, and its final, n, quorums ▶ Quorum intersection constraints are defined between the final quorum of one operation and the initial quorum of another 3/21 Example: Initial/Final Quorums for Read/Write Ops ▶ Object (e.g. file) read/write operations are subject to two constraints: 1. Each final quorum for write must intersect each initial quorum for read 2. Each final quorum for write must intersect each initial quorum for write ▶ To ensure that versions are updated properly ▶ These constraints can be represented by the following quorum intersection graph: write read An edge from A to B means that A final quorum must overlap B initial quorum ▶ Choices of minimal (size) quorums for an object with 5 replicas: Operation quorum choices read (1, 0) (2, 0) (3, 0) write (1, 5) (2, 4) (3, 3) 4/21 Example: Initial/Final Quorums for Read/Write Ops ▶ Object (e.g. file) read/write operations are subject to two constraints: 1. Each final quorum for write must intersect each initial quorum for read 2. Each final quorum for write must intersect each initial quorum for write ▶ To ensure that versions are updated properly ▶ These constraints can be represented by the following quorum intersection graph: write read An edge from A to B means that A final quorum must overlap B initial quorum ▶ Choices of minimal (size) quorums for an object with 5 replicas: Operation quorum choices non-minimal read (1, 0) (2, 0) (3, 0) (4, 0) (5, 0) write (1, 5) (2, 4) (3, 3) (4, 2) (5, 1) 4/21 Read/Write-based Replicated Queue (1/2) ▶ Read/write quorums can be used to implement arbitrary data types ▶ Any data type can be built on top of memory read/write operations ▶ A queue has two basic operations: Enq adds an item to the queue Deq removes least recent the item from the queue, raising an exception if the queue is empty 5/21 Read/Write-based Replicated Queue (1/2) ▶ Read/write quorums can be used to implement arbitrary data types ▶ Any data type can be built on top of memory read/write operations ▶ A queue has two basic operations: Enq adds an item to the queue Deq removes least recent the item from the queue, raising an exception if the queue is empty 1. Read an initial read quorum to determine the current version of the queue 2. Read the state from an updated replica 3. If the queue is not empty, normal deq: 3.1 Remove the item at the head of the queue 3.2 Write the new queue state to a final write quorum 3.3 Return the item removed in 3.1 4. If the queue is empty, abnormal deq, raise an exception 5/21 Read/Write-based Replicated Queue (2/2) ▶ From the minimal quorum choices for the read/write operations: Operation quorum choices read (1,0) (2,0) (3,0) write (1,5) (2,4) (3,3) we can derive the following minimal quorum choices for the operations on a replicated queue using read/write quorums: Operation quorum choices Enq (1,5) (2,4) (3,3) Normal Deq (1,5) (2,4) (3,3) Abnormal Deq (1,0) (2, 0) (3,0) ▶ Only the quorum choice in the last column makes sense ▶ The other choices would favor Abnormal Deq over both Normal Deq and Enq ▶ Remember a quorum must include both an initial quorum and a final quorum 6/21 Roadmap Initial and Final Quorums (Herlihy’s) Replicated ADTs Replicated Queue: an example of a replicated ADT Critical Evaluation 7/21 Herlihy’s Replication Method Timestamps instead of version numbers ▶ Reduce quorum intersection constraints ▶ Reduce messages Logs instead of (state) versions ▶ These changes allow for more flexible replication quorums Assumption Clients are able to generate timestamps that can be totally ordered ▶ This order is consistent to that seen by an omniscient observer, i.e. consistent with linearizability 8/21 Replicated Read/Write Objects with Timestamps Read similar to the version-based, except that a client uses the timestamp instead of the version to identify a replica that is up-to-date Write there is no need to read the versions from an initial quorum: ▶ Timestamp generation guarantees total order consistent with the order seen by an omniscient observer ▶ No need for initial message round ▶ Client needs only write the new state to a final quorum ▶ Only for whole state changes Quorum intersection graph write read Minimal quorum choices for 5 replicas (treated as equals) Operation Minimal Quorum Choices Read (1,0) (2,0) (3,0) (4,0) (5,0) Write (0,5) (0,4) (0,3) (0,2) (0,1) 9/21 Replicated Event Logs vs Replicated State Idea rather than replicate state, replicate event (operations) logs ▶ An event log subsumes the state Event State change, represented as a pair of: Operation with respective arguments, e.g. Read() or Write(x) Outcome a termination condition and returned results, e.g. Ok(x) or Ok() E.g. [Read(), Ok(x)] and [Write(x), Ok()] Event log a sequence of log entries Log entry is a timestamped event: t0 : [op(args); term(results)] E.g., an Enq event in a queue might be: t0 : [Enq(x); Ok ()] ▶ Entries in a log are ordered by their timestamps 10/21 Roadmap Initial and Final Quorums (Herlihy’s) Replicated ADTs Replicated Queue: an example of a replicated ADT Critical Evaluation 11/21 Herlihy’s Replicated Queue Deq implementation – Client: 1. reads the logs from a Deq inital quorum and creates a view View is a log obtained by: 1.1 merging in timestamp order the entries of a set of logs 1.2 discarding duplicates 2. reconstructs the queue state from the view, and finds the item to return 3. if the queue is not-empty, records the Deq event by: 3.1 appending a new entry to the view 3.2 sending the modified view to a Deq final quorum of replicas ▶ Replicas merge this view with their local logs 4. returns the response (the dequeued item or an exception) to Deq’s caller. Note: This is just conceptual implementation. There are many possible optimizations to improve performance. ▶ How can we avoid sending the whole view to all of the final quorum? 12/21 Herlihy’s Replicated Queue: Constraints Enq Normal Deq Operation Quorum Choices Enq (0,1) (0,2) (0,3) Normal Deq (3,1) (2,2) (1,3) Abnormal Deq (3,0) (2,0) (1,0) Abnormal Deq 1. Every initial Deq quorum must intersect every final Enq quorum ▶ So that the reconstructed queue reflects all previous Enq events 2. Every initial Deq quorum must intersect every final Deq quorum ▶ So that the reconstructed queue reflects all previous Deq events Note 1 The views for Enq operations need not include any prior events, because Enq returns no information about the queue’s state ▶ An initial Enq quorum may be empty. Note 2 As before, an abnormal Deq has an empty final quorum. 13/21 Herlihy’s Replicated Queue: Example Execution Trace 3 replicas: Enq(0,2), NDeq(2,2), ADeq(2,0) Op. Rep. 1 Rep. 2 Rep. 3 Enq(x): R1, R2 14/21 Herlihy’s Replicated Queue: Example Execution Trace 3 replicas: Enq(0,2), NDeq(2,2), ADeq(2,0) Op. Rep. 1 Rep. 2 Rep. 3 Enq(x): R1, R2 t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] Deq(): R2, R3 ▶ A missing entry is represented as blank 14/21 Herlihy’s Replicated Queue: Example Execution Trace 3 replicas: Enq(0,2), NDeq(2,2), ADeq(2,0) Op. Rep. 1 Rep. 2 Rep. 3 Enq(x): R1, R2 t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] Deq(): R2, R3 t2:[Deq():Ok(x)] t2:[Deq():Ok(x)] Enq(y): R1, R2 ▶ A missing entry is represented as blank 14/21 Herlihy’s Replicated Queue: Example Execution Trace 3 replicas: Enq(0,2), NDeq(2,2), ADeq(2,0) Op. Rep. 1 Rep. 2 Rep. 3 Enq(x): R1, R2 t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] Deq(): R2, R3 t2:[Deq():Ok(x)] t2:[Deq():Ok(x)] Enq(y): R1, R2 t3:[Enq(y);Ok()] t3:[Enq(y);Ok()] Enq(z): R1, R3 ▶ A missing entry is represented as blank 14/21 Herlihy’s Replicated Queue: Example Execution Trace 3 replicas: Enq(0,2), NDeq(2,2), ADeq(2,0) Op. Rep. 1 Rep. 2 Rep. 3 Enq(x): R1, R2 t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] Deq(): R2, R3 t2:[Deq():Ok(x)] t2:[Deq():Ok(x)] Enq(y): R1, R2 t3:[Enq(y);Ok()] t3:[Enq(y);Ok()] Enq(z): R1, R3 t4:[Enq(z);Ok()] t4:[Enq(z);Ok()] Deq(): R1, R3 ▶ A missing entry is represented as blank ▶ No single replica contains all the entries that define the queue’s state 14/21 Herlihy’s Replicated Queue: Example Execution Trace 3 replicas: Enq(0,2), NDeq(2,2), ADeq(2,0) Op. Rep. 1 Rep. 2 Rep. 3 Enq(x): R1, R2 t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] Deq(): R2, R3 t2:[Deq():Ok(x)] t2:[Deq():Ok(x)] t2:[Deq():Ok(x)] Enq(y): R1, R2 t3:[Enq(y);Ok()] t3:[Enq(y);Ok()] t3:[Enq(y);Ok()] Enq(z): R1, R3 t4:[Enq(z);Ok()] t4:[Enq(z);Ok()] Deq(): R1, R3 t5:[Deq():Ok(y)] t5:[Deq():Ok(y)] ▶ A missing entry is represented as blank 14/21 Herlihy’s Replicated Queue: Example Execution Trace 3 replicas: Enq(0,2), NDeq(2,2), ADeq(2,0) Op. Rep. 1 Rep. 2 Rep. 3 Enq(x): R1, R2 t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] Deq(): R2, R3 t2:[Deq():Ok(x)] t2:[Deq():Ok(x)] t2:[Deq():Ok(x)] Enq(y): R1, R2 t3:[Enq(y);Ok()] t3:[Enq(y);Ok()] t3:[Enq(y);Ok()] Enq(z): R1, R3 t4:[Enq(z);Ok()] t4:[Enq(z);Ok()] Deq(): R1, R3 t5:[Deq():Ok(y)] t5:[Deq():Ok(y)] ▶ A missing entry is represented as blank Minimal quorum choices for 5 replicas (treated as equals) Operation Quorums Operation Quorum Enq (0,1) (0,2) (0,3) Enq (3,3) Normal Deq (5,1) (4,2) (3,3) Normal Deq (3,3) Abnormal Deq (5,0) (4,0) (3,0) Abnormal Deq (3,0) 14/21 Herlihy’s Replicated Queue: Example Execution Trace 3 replicas: Enq(0,2), NDeq(2,2), ADeq(2,0) Op. Rep. 1 Rep. 2 Rep. 3 Enq(x): R1, R2 t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] Deq(): R2, R3 t2:[Deq():Ok(x)] t2:[Deq():Ok(x)] t2:[Deq():Ok(x)] Enq(y): R1, R2 t3:[Enq(y);Ok()] t3:[Enq(y);Ok()] t3:[Enq(y);Ok()] Enq(z): R1, R3 t4:[Enq(z);Ok()] t4:[Enq(z);Ok()] Deq(): R1, R3 t5:[Deq():Ok(y)] t5:[Deq():Ok(y)] ▶ A missing entry is represented as blank Minimal quorum choices for 5 replicas (treated as equals) Operation Quorums Operation Quorum Enq (0,1) (0,2) (0,3) Enq (3,3) Normal Deq (5,1) (4,2) (3,3) Normal Deq (3,3) Abnormal Deq (5,0) (4,0) (3,0) Abnormal Deq (3,0) ▶ With read/write quorums there is only one quorum choice ▶ Using ADTs, Enq can be more available, at the expense of Deq’s availability 14/21 Herlihy’s Replicated Queue: Optimizations (1/2) Disadvantages logs and messages grow indefinitely Fixes Garbage collect logs take advantage of observation ▶ If an item A has been dequeued, all items enqueued before A must have been dequeued ▶ However, we cannot just remove all the entries with earlier timestamps ▶ Otherwise, some of these might be added again upon merging logs ▶ Instead, we keep the horizon timestamp, i.e. the timestamp of the Enq entry of the most recently dequeued item ▶ Furthermore, the log has only Enq entries whose timestamps are later than the horizon timestamp ▶ In a certain way, we just keep the state of the queue Cache logs at clients 15/21 Herlihy’s Replicated Queue: Optimizations (2/2) Deq implementation 1. Client reads from an initial Deq quorum 1.1 the horizon timestamp 1.2 the local logs (which include only Enq log entries) 2. The client: 2.1 creates a view as before 2.2 discards all entries earlier than the latest observed horizon time The oldest Enq entry indicates ▶ the item to dequeue ▶ the new horizon time 3. The client writes the new horizon time to a final Deq quorum ▶ Replicas in the quorum discard earlier entries Example trace Enq(x)R1R2 Rep. 1 Rep. 2 Rep. 3 horizon: 0 horizon: 0 horizon: 0 16/21 Herlihy’s Replicated Queue: Optimizations (2/2) Deq implementation 1. Client reads from an initial Deq quorum 1.1 the horizon timestamp 1.2 the local logs (which include only Enq log entries) 2. The client: 2.1 creates a view as before 2.2 discards all entries earlier than the latest observed horizon time The oldest Enq entry indicates ▶ the item to dequeue ▶ the new horizon time 3. The client writes the new horizon time to a final Deq quorum ▶ Replicas in the quorum discard earlier entries Example trace Enq(x)R1R2Deq()R2R3 Rep. 1 Rep. 2 Rep. 3 horizon: 0 horizon: 0 horizon: 0 t1:[Enq(x);Ok()] t1:[Enq(x);Ok()] 16/21 Herlihy’s Replicated Queue: Optimizations (2/2) Deq implementation 1. Client reads from an initial Deq quorum 1.1 the horizon timestamp 1.2 the local logs (which include only Enq log entries) 2. The client: 2.1 creates a view as before 2.2 discards all entries earlier than the latest observed horizon time The oldest Enq entry indicates ▶ the item to dequeue ▶ the new horizon time 3. The client writes the new horizon time to a final Deq quorum ▶ Replicas in the quorum discard earlier entries Example trace Enq(x):R1R2 Deq()R2R3Enq(y)R1R2 Rep. 1 Rep. 2 Rep. 3 horizon: 0 horizon: t1 horizon : t1 t1:[Enq(x);Ok()] 16/21 Herlihy’s Replicated Queue: Optimizations (2/2) Deq implementation 1. Client reads from an initial Deq quorum 1.1 the horizon timestamp 1.2 the local logs (which include only Enq log entries) 2. The client: 2.1 creates a view as before 2.2 discards all entries earlier than the latest observed horizon time The oldest Enq entry indicates ▶ the item to dequeue ▶ the new horizon time 3. The client writes the new horizon time to a final Deq quorum ▶ Replicas in the quorum discard earlier entries Example trace Enq(x):R1R2 Deq()R2R3 Enq(y)R1R2 Enq(z)R1R3 Rep. 1 Rep. 2 Rep. 3 horizon: 0 horizon: t1 horizon : t1 t1:[Enq(x);Ok()] t2:[Enq(y);Ok()] t2:[Enq(y);Ok()] 16/21 Herlihy’s Replicated Queue: Optimizations (2/2) Deq implementation 1. Client reads from an initial Deq quorum 1.1 the horizon timestamp 1.2 the local logs (which include only Enq log entries) 2. The client: 2.1 creates a view as before 2.2 discards all entries earlier than the latest observed horizon time The oldest Enq entry indicates ▶ the item to dequeue ▶ the new horizon time 3. The client writes the new horizon time to a final Deq quorum ▶ Replicas in the quorum discard earlier entries Ex. trace Enq(x):R1R2 Deq()R2R3 Enq(y)R1R2 Enq(z)R1R3 Deq()R1R3 Rep. 1 Rep. 2 Rep. 3 horizon: 0 horizon: t1 horizon : t1 t1:[Enq(x);Ok()] t2:[Enq(y);Ok()] t2:[Enq(y);Ok()] t3:[Enq(z);Ok()] t3:[Enq(z);Ok() 16/21 Herlihy’s Replicated Queue: Optimizations (2/2) Deq implementation 1. Client reads from an initial Deq quorum 1.1 the horizon timestamp 1.2 the local logs (which include only Enq log entries) 2. The client: 2.1 creates a view as before 2.2 discards all entries earlier than the latest observed horizon time The oldest Enq entry indicates ▶ the item to dequeue ▶ the new horizon time 3. The client writes the new horizon time to a final Deq quorum ▶ Replicas in the quorum discard earlier entries Ex. trace Enq(x):R1R2 Deq()R2R3 Enq(y)R1R2 Enq(z)R1R2 Deq()R1R3 Rep. 1 Rep. 2 Rep. 3 horizon: t2 horizon: t1 horizon : t2 t2:[Enq(y);Ok()] t3:[Enq(z);Ok()] t3:[Enq(z);Ok() 16/21 Roadmap Initial and Final Quorums (Herlihy’s) Replicated ADTs Replicated Queue: an example of a replicated ADT Critical Evaluation 17/21 Issues with Replicated ADTs Timestamps generated by clients and consistent with linearizability ▶ Herlihy’s relies on transactions, and hierarchical timestamps ▶ So the problem reduces to that of ordering transactions ▶ However, if we use locking the serial order ensured by transactions is usually determined at commit time ▶ If replicated ADTs do not use transactions this is challenging ▶ How do clients generate a timestamp consistent with linearizability, if the initial quorum is empty (e.g. in the case of Enq)? Logs must be garbage collected to bound the size of messages ▶ Garbage collecting log entries is ADT-dependent ▶ Herlihy’s solution for replicated queues may be very effective ▶ But it may be harder or less effective for other replicated ADTs 18/21 (Herlihy’s) Replicated ADTs vs CvRDTs ▶ There are clear similarities between replicated ADTs and CvRDTs, i.e. state-based CRDTs ▶ They both support replicated data types ▶ Herlihy’s logs appear to be a monotonic semi-lattice object. ▶ Log entries merging is similar to CvRDTs’ merge operation ▶ They both require design ingenuity ▶ Unless, you can solve your problem with a cataloged solution ▶ Actually, it is not clear whether you are able to implement some data types. Is there a conflict-free replicated queue? ▶ However there are also important differences: ▶ CRDTs do not ensure strong consistency, but strong eventual consistency ▶ However, CRDTs will likely be more available, an operation can be executed as long as there is one accessible replica ▶ Replicated ADTs require a quorum to perform one operation ▶ Can we easily convert an implementation of a replicated ADT to an implementation of a CvRDT? ▶ E.g. just by dropping the quorum intersection constraints? 19/21 Quorum Consensus: Final Thoughts ▶ Quorum-based systems are usually restricted to "simple" data storage systems ▶ Herlihy generalized this approach to other ADT, including dictionaries, i.e. mappings from keys to values ▶ SMR with Paxos, uses majority voting – which is a special kind of quorum ▶ What is the real difference between these two approaches? ▶ The concept of quorum appears to be fundamental ▶ Quorums need not be restricted to assigning/counting votes ▶ Consider replicas laid out in a square of ℓ by ℓ replicas ▶ Let a read quorum be the set of (ℓ) replicas in one column. ▶ Try to find out write and read quorums that are both minorities