Distributed Algorithms Contents Index

Serializable Computations in Faulty Systems

In this webpage we develop algorithms for systems in which: Agents may halt or be arbitrarily slow; the same message may be delivered multiple times; messages may be delivered out of order; and messages may be lost. A transaction consists of two steps: a client (1) reads and (2) then writes shared variables. This page shows how epochs -- a proxy for time -- can be used to develop algorithms in which computations can be serialized: Each agent starts and completes a transaction before executing steps of another transaction.

Transactions

In this webpage we develop algorithms for systems in which agents may halt or be arbitrarily slow; the same message may be delivered multiple times; messages may be delivered out of order; and messages may be lost. The model that we used in the previous chapters no longer applies.

We develop an algorithm for a system that consists of a set of agents called clients and a set of agents called servers. There is a channel from each server to each client, and from each client to each server. Clients execute transactions in which they send requests to servers to read and write variables managed by servers.

Servers

Each server q has a variable q.v. Clients send requests to q to read or write q.v. A server q replies to a read request from a client p by sending p a copy of q.v. When a server q receives a request to write v the server assigns v to q.v. The only actions of a server are to respond to requests from clients.

Clients

In a transaction, a single client reads and then writes server variables. A client p sends read requests to servers and receives replies from them. Requests and replies may get lost. Let R be the set of servers from which p receives replies. If p receives at least M replies, where M is a parameter of the algorithm, then p proceeds to the write step of the transaction. If p receives fewer than M replies, then p terminates the transaction without executing a write step.

Associated with each client p is a function f_p which takes a list of server values as its argument. In the write step p broadcasts requests to servers to write:

 f_p(r.v for r in R) 
where r.v for r in R is the list of replies that client p receives to its read requests.

Write requests may get lost. Let W be the set of servers that receive and execute the write requests. A transaction executed by a client p

is as follows.


Transaction executed by client p

if len(R) >= M:
   for q in W:  q.v = f_p(r.v for r in R)

A transaction consists of a synchronous read of the values of a subset R of server variables followed by a synchronous write to a subset W of server variables. Next, we develop a distributed algorithm in which values of variables in any computation are the same as in a computation in which exactly one transaction is executed at a time. We use epochs for this purpose.

The Distributed Algorithm

We showed that given any computation \(C\), there exists a computation \(C'\) where:
  1. For each agent \(v\) the sequence of steps at \(v\) is the same in \(C\) and \(C'\), and
  2. steps in \(C'\) are executed in ascending order of epochs.

Next we describe an algorithm in which each transaction has a unique epoch and all steps of a transaction have the epoch of the transaction. Therefore in \(C'\) all steps of a transaction with epoch \(t\) are executed after all steps of transactions with epoch less than \(t\) and before all steps of transactions with epoch greater than \(t\).

Ensuring that each Epoch is Unique

An epoch is an ordered pair (t, p_id) where p_id is the id of client p and t is a number. A client p increases the t-component of an epoch at each successive transaction that p executes. Epochs of different transactions executed by the same client are different because the t-components are different. Epochs of transactions executed by different clients are different because their ids are different.

In the description of the algorithm we use the short form t for an epoch rather than (t, p_id). We implicitly assume that an epoch has a p_id-component without writing it explicitly. We assume that client ids are totally ordered and so epochs are also totally ordered.

All Steps of a Transaction have the Epoch of the Transaction

Each agent x has a field x.t which is the epoch of the transaction that x is processing. The epoch of a step taken by x is x.t. Each message m has a field m.t called the epoch of the message. When an agent x sends a message m it sets m.t to x.t. When an agent y receives a message m the agent processes the message only if m.t = y.t. If m.t \(\neq\) y.t then m is discarded without being processed, and m has the same effect as a lost message. So all steps and messages in a transaction have the epoch of the transaction.

Algorithm for a Server

A server waits to get requests from clients. Let's look at three cases for steps taken by a server q when it receives a request r.

  1. r.t \(<\) q.t:

    If a server q receives a request r from a transaction with an epoch r.t that is smaller than the epoch q.t of the transaction that q is processing then q discards the request without processing it. A discarded request has the same effect as a request that is lost.

  2. r.t \(=\) q.t:

    If a server receives a request from the transaction that the server is processing then the server responds to the request.

  3. r.t \(>\) q.t:

    If a server q receives a request from a transaction with an epoch r.t that is greater than the epoch q.t of the transaction that the server is processing then the server stops processing the transaction with epoch q.t; it increases q.t to r.t; and it starts processing the transaction with the increased value of q.t by responding to the request.

Therefore the epoch q.t of a step in which a server q responds to a request r is the same as r.t -- the epoch of the request.

A reply sent by a server has two fields, v and t. When server q sends a reply to a read request it sets field v to q.v and field t to q.t.

A write request has two fields, v and t where v is the value to be written and t is the epoch of the request. A server q responds to a request to write a value v by assigning v to q.v.


Server Algorithm
# initialization
q.v, q.t = init, 0

start()
def receive(request, client):
   if request.t >= q.t
      q.t = request.t
      if isinstance(request, ReadRequest):
         # send reply to client
         send(Reply(q.v, q.t), client)
      else:
         // message is a WriteRequest
         q.v = request.v

Algorithm for a Client

Each client p has a variable p.copy into which p copies replies to its read requests. p.copy is a dictionary and p copies the reply that it gets from a server q into p.copy[q].

Each client gets a sequence of messages called clock ticks. A clock tick message has no fields. A client initiates a new transaction when the client gets a clock tick message. This is the only way in which a transaction is created. A clock tick has no function other than to make a client stop the transaction that it is processing and start a new transaction.

When a client p gets a clock tick message it executes:

p.t = p.t + pos()

where pos returns an arbitrary positive value, and then p initiates a new transaction with epoch p.t. Client p continues executing the transaction with epoch p.t until p gets its next clock tick message at which point p stops executing the transaction with with epoch p.t, it increments p.t by pos(), and starts a new transaction with the increased value of p.t as the epoch of the new transaction.

A client p starts a new transaction by setting p.copy to empty and sending read requests to all servers. A read request has a single field which is the epoch of the transaction that p is executing. A client waits to receive replies from servers and waits simultaneously for the next clock tick.

If p receives a reply, message, sent in the transaction that p is processing, i.e, if message.t \(=\) p.t, then p copies the value in the reply into p.copy.

If the client receives at least M replies before it receives its next clock tick then the client proceeds to the write step. If the client receives its next clock tick before it receives M replies then the client does not execute a write step and the transaction terminates.

In the write step, p broadcasts write requests to servers. The list of replies, r.v for r in R, that p received in this transaction, in Pythonic notation is list(p.copy.values())

A write request has two fields, the value v to be written and the epoch of the transaction in which the request is made.


Client Algorithm
# Initialization
p.t = 0

start()
def receive(message, sender):
   if isinstance(message, ClockTick):
      # Received clock tick
      p.t = p.t + pos()
      # Start new transaction with epoch p.t
      p.copy = {}
      # Send read request to q
      for q in Q: send(ReadRequest(p.t), q)

   else:
      # received a reply to a read request
      if message.t == p.t:
         p.copy[sender] = message.v
         if len(p.copy) >= M:
            # v = f_p(r.v for r in R) 
            v = f_p(list(p.copy.values()))
            # Send request to q to write v.
            for q in Q: send(WriteRequest(v, p.t), q)

Serializability

We have shown that given a computation \(C\) of there exists a computation \(C'\), with the same steps as \(C\) and in which transactions in \(C'\) are executed in increasing order of epochs.

Steps may occur out of order in \(C\): A step \(s\) with epoch \(t\) at an agent \(X\) may occur before a step \(s'\) with epoch \(t'\) at an agent \(X'\), where \(t > t'\), if \(X'\) is different from \(X\). By contrast, in \(C'\) steps occur in order: Step \(s'\) occurs before step \(s\).

The intuition of the proof about epochs is as follows. If \(C\) has adjacent steps where a step with epoch \(t\) at an agent \(X\) occurs before a step with epoch \(t'\) at an agent \(X'\), and where \(t > t'\), then the steps are independent. So, switching the order of steps results in a sequence \(C'\) that is also computation.

The sequence of steps at agent \(X\) remains the same in \(C\) and \(C'\). Likewise, the sequence of steps at agent \(X'\) remains the same in \(C\) and \(C'\). The sequence of steps at each agent is the same in \(C\) and \(C'\). The ordering of steps between different agents is altered to go from \(C\) to \(C'\).

Let t[i] be the transaction in \(C\) with the i-th largest epoch for i > 0. Let p[i] be the client that executes the transaction and let R[i] be the set of servers from which p[i] receives replies to its read requests, and W[i] the set of servers that receive write requests in the transaction. So, R[i] is the set of servers from which p[i] receives reply r where r.t = t[i], and W[i] is the set of servers that receive write requests w where m.t = t[i].

The sequence of values q.v for each server q in a computation \(C\) is the same as in the following execution of transactions in ascending epoch order.


for i in positive_integers:
   if len(R[i]) >= M:
      for q in W[i]:  q.v = f_p(r.v for r in R[i])

Note: for i in positive_integers is the pseudo-code notation that generates the infinite sequence of positive integers, 1, 2, 3.....

Tracking Epochs in which Values are Assigned to Server Variables

Next we describe an algorithm that records the epoch in which a server variable is assigned a value. A server variable q.v a tuple (q.v.s, q.v.t) where q.v.t is the latest epoch in which q assigned a value to q.v. Client and server algorithms are as given earlier except for one change to the client algorithm: the statement
v = p.f(...)
is replaced by
v.s, v.t = p.f(...), p.t

The values assigned to server variables in a computation of the distributed algorithm is the same as in the following sequential for-loop in which t[i] is the i-th largest epoch in the computation.


for i in positive_integers:
   if len(R[i]) >= M:
      for q in W[i]:
         q.v.s, q.v.t  =  f_p[i](r.v for r in R[i]), t[i]
  

Next

Next we develop Paxos, a distributed consensus algorithm. We prove that Paxos satisfies the specifications for consensus by showing that an equivalent nondeterministic sequential program satisfies the specifications.

K. Mani Chandy, Emeritus Simon Ramo Professor, California Institute of Technology