In this webpage we develop algorithms for systems in which: Agents may halt or be arbitrarily slow; the same message may be delivered multiple times; messages may be delivered out of order; and messages may be lost. A transaction consists of two steps: a client (1) reads and (2) then writes shared variables. This page shows how epochs -- a proxy for time -- can be used to develop algorithms in which computations can be serialized: Each agent starts and completes a transaction before executing steps of another transaction.
In this webpage we develop algorithms for systems in which agents may halt or be arbitrarily slow; the same message may be delivered multiple times; messages may be delivered out of order; and messages may be lost. The model that we used in the previous chapters no longer applies.
We develop an algorithm for a system that consists of a set of agents called clients and a set of agents called servers. There is a channel from each server to each client, and from each client to each server. Clients execute transactions in which they send requests to servers to read and write variables managed by servers.
Servers
Each server q
has a variable q.v
.
Clients send requests to q
to read or write
q.v
. A server q
replies to a read
request from a client p
by sending p
a
copy of q.v
. When a server q
receives a
request to write v
the server assigns v
to q.v
. The only actions of a server are to respond
to requests from clients.
Clients
In a transaction, a single client reads and then writes server variables.
A client p
sends read requests to
servers and receives replies from them. Requests and replies may
get lost. Let R
be the set of servers from which
p
receives replies.
If p
receives at least M
replies, where
M
is a parameter of the algorithm, then
p
proceeds to the write step of the transaction.
If p
receives fewer than M
replies, then
p
terminates the transaction without executing a
write step.
Associated with each client p
is a function
f_p
which takes a list of server values as its
argument. In the write step p
broadcasts requests to
servers to write:
f_p(r.v for r in R)where
r.v
for r in R
is the list of replies that client
p
receives to its read requests.
Write requests may get lost.
Let W
be the set of servers that receive and execute
the write requests. A transaction executed by a client
p
Transaction executed by client
p
if len(R) >= M: for q in W: q.v = f_p(r.v for r in R)
A transaction consists of a synchronous read of the values of a subset
R
of server variables followed by a synchronous write to
a subset W
of server variables. Next, we develop a
distributed algorithm in which values of variables in any computation
are the same as in a computation in which
exactly one transaction is executed at a time.
We use epochs for this
purpose.
Next we describe an algorithm in which each transaction has a unique epoch and all steps of a transaction have the epoch of the transaction. Therefore in \(C'\) all steps of a transaction with epoch \(t\) are executed after all steps of transactions with epoch less than \(t\) and before all steps of transactions with epoch greater than \(t\).
An epoch is an ordered pair (t, p_id)
where
p_id
is the id of client p
and
t
is a number. A client p
increases the
t
-component of an epoch at each successive transaction
that p
executes. Epochs of different transactions
executed by the same client are different because the
t
-components are different. Epochs of transactions
executed by different clients are different because their ids are
different.
In the description of the algorithm we use the short form
t
for an epoch rather than (t, p_id)
.
We implicitly assume that an epoch has a p_id
-component without
writing it explicitly.
We assume that client ids are totally ordered and so
epochs are also totally ordered.
Each agent x
has a field x.t
which
is the epoch of the transaction that x
is processing.
The epoch of a step taken by x
is x.t
.
Each message m
has a field m.t
called
the epoch of the message.
When an agent x
sends a message m
it sets
m.t
to x.t
.
When an agent y
receives a message m
the
agent processes the message only if m.t = y.t
.
If m.t
\(\neq\) y.t
then m
is
discarded without being processed, and m
has the same
effect as a lost message. So all steps and messages in a transaction
have the epoch of the transaction.
A server waits to get requests from clients. Let's look at three cases
for steps taken by a server q
when it receives a request
r
.
r.t
\(<\) q.t
:
If a server q
receives a request r
from a
transaction with an epoch r.t
that is smaller than the epoch q.t
of the transaction
that q
is processing then q
discards the
request without processing it. A discarded request has the same
effect as a request that is lost.
r.t
\(=\) q.t
:
If a server receives a request from the transaction that the server is processing then the server responds to the request.
r.t
\(>\) q.t
:
If a server q
receives a request from a transaction
with an epoch
r.t
that is greater than the epoch q.t
of
the transaction that the server is processing then
the server stops processing the transaction with epoch
q.t
; it increases q.t
to r.t
;
and it starts processing the transaction with the increased value of
q.t
by responding to the request.
q.t
of a step in which a server
q
responds to a request r
is the same as
r.t
-- the epoch of the request.
A reply sent by a server has two fields, v
and
t
.
When server q
sends a reply to a read request it sets field
v
to q.v
and field t
to
q.t
.
A write request has two fields, v
and
t
where v
is the value to be written and
t
is the epoch of the request.
A server q
responds to a request to write a value
v
by assigning v
to q.v
.
# initialization q.v, q.t = init, 0 start() def receive(request, client): if request.t >= q.t q.t = request.t if isinstance(request, ReadRequest): # send reply to client send(Reply(q.v, q.t), client) else: // message is a WriteRequest q.v = request.v
Each client p
has a variable p.copy
into
which p
copies replies to its read requests.
p.copy
is a dictionary and p
copies the
reply that it gets from a server q
into
p.copy[q]
.
Each client gets a sequence of messages called clock ticks. A clock tick message has no fields. A client initiates a new transaction when the client gets a clock tick message. This is the only way in which a transaction is created. A clock tick has no function other than to make a client stop the transaction that it is processing and start a new transaction.
When a client p
gets a clock tick message it executes:
p.t = p.t + pos()
where pos
returns an arbitrary positive value, and then
p
initiates a new transaction with epoch
p.t
. Client p
continues executing the
transaction with epoch p.t
until p
gets its
next clock tick message at which point p
stops executing
the transaction with with epoch p.t
, it increments
p.t
by pos()
, and starts a new transaction
with the increased value
of p.t
as the epoch of the new transaction.
A client p
starts a new transaction by setting
p.copy
to
empty and sending read requests to all servers.
A read request has a single field which is the epoch
of the transaction that p
is executing.
A client waits to receive replies from servers and waits simultaneously for the
next clock tick.
If p
receives a reply, message
, sent in the
transaction that p
is processing, i.e, if
message.t
\(=\) p.t
, then p
copies the value in the reply into p.copy
.
If the client receives at least M
replies before it
receives its next clock tick then the client proceeds to the write
step. If the client receives its next clock tick before it receives
M
replies then the client does not execute a write step
and the transaction terminates.
In the write step, p
broadcasts write requests to servers.
The list of replies,
r.v for r in R
, that p
received in this
transaction, in Pythonic notation is list(p.copy.values())
A write request has two fields, the value v
to be written
and the epoch of the transaction in which the request is made.
# Initialization p.t = 0 start() def receive(message, sender): if isinstance(message, ClockTick): # Received clock tick p.t = p.t + pos() # Start new transaction with epoch p.t p.copy = {} # Send read request to q for q in Q: send(ReadRequest(p.t), q) else: # received a reply to a read request if message.t == p.t: p.copy[sender] = message.v if len(p.copy) >= M: # v = f_p(r.v for r in R) v = f_p(list(p.copy.values())) # Send request to q to write v. for q in Q: send(WriteRequest(v, p.t), q)
Steps may occur out of order in \(C\): A step \(s\) with epoch \(t\) at an agent \(X\) may occur before a step \(s'\) with epoch \(t'\) at an agent \(X'\), where \(t > t'\), if \(X'\) is different from \(X\). By contrast, in \(C'\) steps occur in order: Step \(s'\) occurs before step \(s\).
The intuition of the proof about epochs is as follows. If \(C\) has adjacent steps where a step with epoch \(t\) at an agent \(X\) occurs before a step with epoch \(t'\) at an agent \(X'\), and where \(t > t'\), then the steps are independent. So, switching the order of steps results in a sequence \(C'\) that is also computation.
The sequence of steps at agent \(X\) remains the same in \(C\) and \(C'\). Likewise, the sequence of steps at agent \(X'\) remains the same in \(C\) and \(C'\). The sequence of steps at each agent is the same in \(C\) and \(C'\). The ordering of steps between different agents is altered to go from \(C\) to \(C'\).
Let t[i]
be the transaction in \(C\) with the
i
-th largest epoch for i
> 0.
Let p[i]
be the client that executes the transaction
and let R[i]
be the set of servers from which
p[i]
receives replies to its read requests, and
W[i]
the set of servers that receive write
requests in the transaction. So, R[i]
is the set of
servers from which
p[i]
receives reply r
where r.t =
t[i]
, and W[i]
is the set of
servers that receive write requests w
where m.t =
t[i]
.
The sequence of values q.v
for each server
q
in a computation \(C\) is the same as
in the following execution of transactions in ascending epoch
order.
for i in positive_integers: if len(R[i]) >= M: for q in W[i]: q.v = f_p(r.v for r in R[i])
Note: for i in positive_integers
is the pseudo-code notation
that generates the infinite sequence of positive integers, 1,
2, 3.....
Next we describe an
algorithm that records the epoch in which a server variable is
assigned a value. A server variable q.v
a tuple (q.v.s, q.v.t)
where q.v.t
is the latest epoch in which
q
assigned a value to
q.v
.
Client and server algorithms are as given earlier
except for one change to the client algorithm: the statement
v = p.f(...)
is replaced by
v.s, v.t = p.f(...), p.t
The values assigned to server variables in a computation of the
distributed algorithm is the same as in the following sequential
for-loop in which t[i]
is the i
-th largest
epoch in the computation.
for i in positive_integers: if len(R[i]) >= M: for q in W[i]: q.v.s, q.v.t = f_p[i](r.v for r in R[i]), t[i]
K. Mani Chandy, Emeritus Simon Ramo Professor, California Institute of Technology