This page describes a simple model and notation for distributed algorithms. Other models are introduced later.
Distributed systems are complex. Banking systems support tens of thousands of ATM machines, have thousands of branches offices, and serve millions of customers. An earthquake monitoring system has thousands of sensors and agents carrying out complex calculations. Distributed systems provide many services continuously at multiple locations. This course describes algorithms that help in building robust and efficient distributed systems.
Models
A model of a system is an abstraction of the system. A model ignores some features of the system and retains features that are necessary to solve the problem at hand. The choice of a model is an engineering decision: We choose a model that is adequate to solve our problem. Different models are used to design algorithms in different settings; for example, a model of a system in which messages may be corrupted is different from one in which messages are incorruptible.
Notation
The model and its mathematical properties are critical for describing and proving the correctness of algorithms; a programming notation is not important. We need a notation, however, to provide examples. Examples of some algorithms are given in Python using a simulator of distributed systems and also by using software libraries such as a Python implementation of the Advanced Message Queuing Protocol (AMQP). Algorithms are described in plain text or pseudo code in a manner that allows for straightforward implementations using message queuing libraries.
We begin with a description of a simple actor model and notation. More powerful models are introduced later.
In the simple model, a distributed system consists of a set of agents and a set of channels. A channel is directed from exactly one agent to exactly one agent. The network of agents and channels that constitute a distributed system can be represented by a directed graph in which the vertices represent agents and the directed edges represent channels.
A distributed system is initiated with sets of agents and channels that remain unchanged. Agents and channels are not created or deleted.
In the simple model, a channel is directed from exactly one agent
to exactly one agent.
A channel from an agent P to an agent Q
is represented by the ordered pair (P, Q).
In the simple model, there exists at most one channel from an
agent P to an agent Q.
There are models that allow more than one channel from one agent
to another; however, for the time being we restrict ourselves to
the simple model.
A channel (P, Q) is called an output of agent
P and an input of agent Q.
The sender and receiver of a channel (P,
Q) are P and Q, respectively.
An agent can send messages only on its output channels and receive
messages only on its input channels.
The state of a channel is a queue of messages -- these are the messages that have been sent on the channel and that have not been received.
An agent sends a message on a channel by appending the message to the tail of the queue of messages in transit along the channel. A message from a nonempty channel is delivered to an agent by removing the message from the head of the queue of messages in the channel, and passing the message to the agent.
In the simple model, channels have unlimited capacity. The queue that represents the state of a channel has unbounded size. A sender is never blocked from sending messages on a channel.
Messages sent on the same channel are delivered in the order sent.
If an agent P sends a message M1 on a
channel (P, Q), and later sends a message M2
on the same channel, then agent Q receives
M1 on channel (P, Q) before Q
receives M2 on the channel.
An agent is an object that can send messages to, and receives messages from agents. An agent is either waiting to receive a message or is processing a message that it has received. An agent that is waiting starts processing a message when the agent receives a message. When an agent finishes processing a message the agent waits for the next message to be delivered to it.
Each agent has a set of variables. An agent's variables are local to the agent -- they cannot be accessed by other agents. A message sent by an agent cannot be a pointer into the agent's memory.
An agent is specified by (1) statements that initialize the agent's variables and (2) a function
receive(message, sender)called a callback function in message queuing libraries.
If the system has a waiting agent u with a nonempty
input channel (v, u) then the system may
remove the message m at the head of
(v, u), and call
the receive(message, sender) function of u
where message is m and sender
is v.
In the simple model, an agent does not choose the channel from which the agent receives its next message. The system chooses any nonempty channel from which it delivers a message to the receiver of that channel. A waiting agent cannot refuse to accept a message.
A receive function must not be recursive: an agent cannot
receive a new message while it is processing a message.
Every execution of receive must terminate.
An agent is not interrupted while it is executing a
receive.
An agent may have many nonempty input channels but the agent processes
only one message at a time. Messages that arrive while an
agent is executing a receive remain in channels.
After an agent completes execution of a receive the agent
returns to waiting state.
An agent may send messages in its receive function.
An agent sends a message by executing
send(message, receiver)The first parameter of
send is the message that is sent,
and the second paramenter is the agent to which the message is sent.
Execution of this statement places the message in the output channel
directed from the sender to the receiver.
Agents implemented in most programming
languages are deterministic.
The statements that initialize an agent are deterministic
and so an agent has a single initial state.
Multiple executions of a receive function with the same
input produce the same output.
The theory of distributed systems does not require agents to be
deterministic.
For the time being we consider only deterministic agents.
See examples and
code
We discuss nondeterministic agents later.
pos, neg,
total, and X.
The channels in the system are
(pos, pos),
(neg, neg),
(pos, total),
(neg, total), and
(total, X).
See "Figure Agents and Channels" which shows the network of
agents and channels represented as a directed graph.
total,
that receives messages from agents
pos and neg,
and sends messages to agent X.
See an example
of an implementation in Python using an Agent class.
The example below does not use classes so that the algorithm is easier
to understand by readers who are not familiar with Python.
An agent is specified by a block of statements that initialize the
variables of the agent and the initial messages (if any) in
the agent's output channels, and the receive function.
The block of statements before the receive function
initialize the agent.
In this example, the agent total has a single
variable sum which has value 0 initially.
Initially, the output channel of total is empty
# Algorithm for total
# ------- Initialization --------------
sum = 0
# ------- receive function ----------
def receive(message, sender):
if sender == pos:
sum = sum + message
else:
sum = sum - message
if sum < 0: sum = 0
send(sum, X)
If total receives a message from pos then
total
increments sum by the contents of the message.
If total receives a message from an agent other than
pos then total decrements sum by the
contents of the message, and if the resulting value of
sum is negative then sum is set to 0.
total sends the
resulting value of sum to agent X.
Example of pos
This is an example of agent pos.
# Algorithm for pos
# -------- Initialization --------------
my_data = [3, 5]
n = 0
send(message="wakeup", receiver=pos)
# ------- receive function ----------
def receive(message, sender):
send(message=my_data[n], receiver=total)
n = n + 1
if n < len(my_data):
send(message="wakeup", receiver=pos)
pos has variables my_data and n
which are initially [3, 5] and 0,
respectively.
Initially there is a message "wakeup" in the channel from
pos to itself.
When pos receives a message it sends
my_data[n] to agent total and then
increments n.
If n is less than the number of elements in
my_data then pos sends itself a "wakeup" message.
The system is initialized with a "wakeup" message in the channel from
pos to itself.
When pos receives the first message it sends
my_data[0] to agent total,
increments n to 1
and pos sends another "wakeup" message to itself.
When pos receives the next message it sends
my_data[1] to agent total, and
increments n to 2, and does not send itself a message.
So, in this example, pos waits forever after this point
because it receives no further messages.
In this example, agent neg is identical to
pos except that it
may have different values of my_data,
Agent X has no variables. It merely prints
messages that it receives.
K. Mani Chandy, Emeritus Simon Ramo Professor, California Institute of Technology