Keeping a copy of the same data on multiple machines that are connected via a network.
- To keep data geographically close to your users (and thus reduce latency)
- To allow the system to continue working even if some of its parts have failed (and thus increase availability)
- To scale out the number of machines that can serve read queries (and thus increase read throughput)
All of the difficulty in replication lies in handling changes to replicated data.
Three popular algorithms:
Leaders and Followers
leader-based replication (also known as active/passive or master–slave)
Each node that stores a copy of the database is called a replica. Every write to the database needs to be processed by every replica; otherwise, the replicas would no longer contain the same data.
- One of the replicas is designated the leader (also known as master or primary). When clients want to write to the database, they must send their requests to the leader, which first writes the new data to its local storage.
- The other replicas are known as followers (read replicas, slaves, secondaries, or hot standbys).i Whenever the leader writes new data to its local storage, it also sends the data change to all of its followers as part of a replication log or change stream. Each follower takes the log from the leader and updates its local copy of the database accordingly, by applying all writes in the same order as they were processed on the leader.
- When a client wants to read from the database, it can query either the leader or any of the followers. However, writes are only accepted on the leader (the followers are read-only from the client’s point of view).
This is the mode of replication for a lot SQL/No SQL databases as well as distributed message brokers.
Synchronous Versus Asynchronous Replication
- synchronous: the leader waits until followers has confirmed that it received the write before reporting success to the user.
- asynchronous: the leader sends the message, but doesn’t wait for a response from the follower.
- semi-synchronous: leader and one synchronous follower.
Normally, replication is quite fast: most database systems apply changes to followers in less than a second. However, there is no guarantee of how long it might take.
It is impractical for all followers to be synchronous: any one node outage would cause the whole system to grind to a halt. In practice, if you enable synchronous replication on a database, it usually means that one of the followers is synchronous, and the others are asynchronous. If the synchronous follower becomes unavailable or slow, one of the asynchronous followers is made synchronous. This guarantees that you have an up-to-date copy of the data on at least two nodes.
Often, leader-based replication is configured to be completely asynchronous. This means that a write is not guaranteed to be durable, even if it has been confirmed to the client. But asynchronous replication is nevertheless widely used, especially if there are many followers or if they are geographically distributed.
chain replication is a variant of synchronous replication that has been successfully implemented in a few systems such as Microsoft Azure Storage.
Setting Up New Followers
Simply copying data files from one node to another is typically not sufficient: clients are constantly writing to the database, and the data is always in flux, so a standard file copy would see different parts of the database at different points in time. The result might not make any sense.
- Take a consistent snapshot of the leader’s database at some point in time—if possible, without taking a lock on the entire database.
- Copy the snapshot to the new follower node.
- The follower connects to the leader and requests all the data changes that have happened since the snapshot was taken.
- When the follower has processed the backlog of data changes since the snapshot, we say it has caught up.
Handling Node Outages
- Follower failure: Catch-up recovery
- On its local disk, each follower keeps a log of the data changes it has received from the leader.
- If a follower crashes and is restarted, or if the network between the leader and the follower is temporarily interrupted, the follower can recover quite easily:
- from its log, it knows the last transaction that was processed before the fault occurred.
- Thus, the follower can connect to the leader and request all the data changes that occurred during the time when the follower was disconnected.
- When it has applied these changes, it has caught up to the leader and can continue receiving a stream of data changes as before.
- Leader failure: Failover
- one of the followers needs to be promoted to be the new leader, clients need to be reconfigured to send their writes to the new leader, and the other followers need to start consuming data changes from the new leader. This process is called failover.
- Determining that the leader has failed.
- most systems simply use a timeout: nodes frequently bounce messages back and forth between each other, and if a node doesn’t respond for some period of time—say, 30 seconds—it is assumed to be dead
- Choosing a new leader. The best candidate for leadership is usually the replica with the most up-to-date data changes from the old leader.
- Reconfiguring the system to use the new leader.
Problems with failover:
If asynchronous replication is used, the new leader may not have received all the writes from the old leader before it failed. If the former leader rejoins the cluster after a new leader has been chosen, what should happen to those writes? The new leader may have received conflicting writes in the meantime. The most common solution is for the old leader’s unreplicated writes to simply be discarded, which may violate clients’ durability expectations.
Discarding writes is especially dangerous if other storage systems outside of the database need to be coordinated with the database contents.
It could happen that two nodes both believe that they are the leader. This situation is called split brain, and it is dangerous: if both leaders accept writes, and there is no process for resolving conflicts, data is likely to be lost or corrupted.
If the system is already struggling with high load or network problems, an unnecessary failover is likely to make the situation worse, not better.
Implementation of Replication Logs
In the simplest case, the leader logs every write request (statement) that it executes and sends that statement log to its followers.
For a relational database, this means that every INSERT, UPDATE, or DELETE statement is forwarded to followers.
- Any statement that calls a nondeterministic function, such as NOW() to get the current date and time or RAND() to get a random number, is likely to generate a different value on each replica.
- If statements use an autoincrementing column, or if they depend on the existing data in the database (e.g., UPDATE … WHERE
), they must be executed in exactly the same order on each replica, or else they may have a different effect.
- Statements that have side effects (e.g., triggers, stored procedures, user-defined functions) may result in different side effects occurring on each replica, unless the side effects are absolutely deterministic.
Write-ahead log (WAL) shipping
besides writing the log to disk, the leader also sends it across the network to its followers.
When the follower processes this log, it builds a copy of the exact same data structures as found on the leader.
This method of replication is used in PostgreSQL and Oracle.
It is typically not possible to run different versions of the database software on the leader and the followers. If the database changes its storage format from one version to another, it is typically not possible to run different versions of the database software on the leader and the followers. That may seem like a minor implementation detail, but it can have a big operational impact.
Logical (row-based) log replication
An alternative is to use different log formats for replication and for the storage engine, which allows the replication log to be decoupled from the storage engine internals.
This kind of replication log is called a logical log, to distinguish it from the storage engine’s (physical) data representation.
A logical log for a relational database is usually a sequence of records describing writes to database tables at the granularity of a row:
- For an inserted row, the log contains the new values of all columns.
- or a deleted row, the log contains enough information to uniquely identify the row that was deleted. Typically this would be the primary key, but if there is no primary key on the table, the old values of all columns need to be logged.
- For an updated row, the log contains enough information to uniquely identify the updated row, and the new values of all columns (or at least the new values of all columns that changed).
MySQL’s binlog (when configured to use row-based replication) uses this approach
An alternative is to use features that are available in many relational databases: triggers and stored procedures.
Problems with Replication Lag
This inconsistency is just a temporary state—if you stop writing to the database and wait a while, the followers will eventually catch up and become consistent with the leader. For that reason, this effect is known as eventual consistency.
When the lag is so large, the inconsistencies it introduces are not just a theoretical issue but a real problem for applications.
Reading Your Own Writes
In this situation, we need read-after-write consistency, also known as read-your-writes consistency. This is a guarantee that if the user reloads the page, they will always see any updates they submitted themselves. It makes no promises about other users: other users’ updates may not be visible until some later time.
- When reading something that the user may have modified, read it from the leader; otherwise, read it from a follower. This requires that you have some way of knowing whether something might have been modified, without actually querying it.
- you could track the time of the last update and, for one minute after the last update, make all reads from the leader.
- You could also monitor the replication lag on followers and prevent queries on any follower that is more than one minute behind the leader.
- The client can remember the timestamp of its most recent write—then the system can ensure that the replica serving any reads for that user reflects updates at least until that timestamp.
- The timestamp could be a logical timestamp (something that indicates ordering of writes, such as the log sequence number) or the actual system clock (in which case clock synchronization becomes critical).
Another complication arises when the same user is accessing your service from multiple devices.
- Approaches that require remembering the timestamp of the user’s last update become more difficult, because the code running on one device doesn’t know what updates have happened on the other device. This metadata will need to be centralized.
- If your replicas are distributed across different datacenters, there is no guarantee that connections from different devices will be routed to the same datacenter.
when reading from asynchronous followers is that it’s possible for a user to see things moving backward in time.
monotonic reads means that if one user makes several reads in sequence, they will not see time go backward.
One way of achieving monotonic reads is to make sure that each user always makes their reads from the same replica. However, if that replica fails, the user’s queries will need to be rerouted to another replica.
Consistent Prefix Reads
consistent prefix reads guarantee says that if a sequence of writes happens in a certain order, then anyone reading those writes will see them appear in the same order.
One solution is to make sure that any writes that are causally related to each other are written to the same partition—but in some applications that cannot be done efficiently.
A natural extension of the leader-based replication model is to allow more than one node to accept writes. Replication still happens in the same way: each node that processes a write must forward that data change to all the other nodes. We call this a multi-leader configuration (also known as master–master or active/active replication). In this setup, each leader simultaneously acts as a follower to the other leaders.
Use Cases for Multi-Leader Replication
It rarely makes sense to use a multi-leader setup within a single datacenter, because the benefits rarely outweigh the added complexity.
Multi-datacenter operation In a multi-leader configuration, you can have a leader in each datacenter.
Some databases support multi-leader configurations by default, but it is also often implemented with external tools, such as Tungsten Replicator for MySQL, BDR for PostgreSQL, and GoldenGate for Oracle.
As multi-leader replication is a somewhat retrofitted feature in many databases, there are often subtle configuration pitfalls and surprising interactions with other database features. For example, autoincrementing keys, triggers, and integrity constraints can be problematic. For this reason, multi-leader replication is often considered dangerous territory that should be avoided if possible.
Clients with offline operation Another situation in which multi-leader replication is appropriate is if you have an application that needs to continue to work while it is disconnected from the internet.
In this case, every device has a local database that acts as a leader (it accepts write requests), and there is an asynchronous multi-leader replication process (sync) between the replicas of your calendar on all of your devices.
There are tools that aim to make this kind of multi-leader configuration easier. For example, CouchDB is designed for this mode of operation.
Collaborative editing When one user edits a document, the changes are instantly applied to their local replica (the state of the document in their web browser or client application) and asynchronously replicated to the server and any other users who are editing the same document.
If you want to guarantee that there will be no editing conflicts, the application must obtain a lock on the document before a user can edit it. If another user wants to edit the same document, they first have to wait until the first user has committed their changes and released the lock. This collaboration model is equivalent to single-leader replication with transactions on the leader.
However, for faster collaboration, you may want to make the unit of change very small (e.g., a single keystroke) and avoid locking. This approach allows multiple users to edit simultaneously, but it also brings all the challenges of multi-leader replication, including requiring conflict resolution.
Handling Write Conflicts
Conflict avoidance The simplest strategy for dealing with conflicts is to avoid them: if the application can ensure that all writes for a particular record go through the same leader, then conflicts cannot occur. Since many implementations of multi-leader replication handle conflicts quite poorly, avoiding conflicts is a frequently recommended approach.
convergent conflict resolution
- Give each write a unique ID (e.g., a timestamp, a long random number, a UUID, or a hash of the key and value), pick the write with the highest ID as the winner, and throw away the other writes. If a timestamp is used, this technique is known as last write wins (LWW).
- Give each replica a unique ID, and let writes that originated at a highernumbered replica always take precedence.
- Somehow merge the values together.
- Record the conflict in an explicit data structure that preserves all information, and write application code that resolves the conflict.
Custom conflict resolution logic
On write - As soon as the database system detects a conflict in the log of replicated changes, it calls the conflict handler. Bucardo. On read - When a conflict is detected, all the conflicting writes are stored. The next time the data is read, these multiple versions of the data are returned to the application. CouchDB
Automatic Conflict Resolution
- Conflict-free replicated datatypes
- Mergeable persistent data structures
- Operational transformation
Multi-Leader Replication Topologies
In circular and star topologies, a write may need to pass through several nodes before it reaches all replicas. Therefore, nodes need to forward data changes they receive from other nodes. To prevent infinite replication loops, each node is given a unique identifier, and in the replication log, each write is tagged with the identifiers of all the nodes it has passed through.
A problem with circular and star topologies is that if just one node fails, it can interrupt the flow of replication messages between other nodes, causing them to be unable to communicate until the node is fixed.
The fault tolerance of a more densely connected topology (such as all-to-all) is better because it allows messages to travel along different paths, avoiding a single point of failure.
On the other hand, all-to-all topologies can have issues too. In particular, some network links may be faster than others (e.g., due to network congestion), with the result that some replication messages may “overtake” others.
allowing any replica to directly accept writes from clients. Some of the earliest replicated data systems were leaderless.
It once again became a fashionable architecture for databases after Amazon used it for its in-house Dynamo system. vi Riak, Cassandra, and Voldemort are open source datastores with leaderless replication models inspired by Dynamo, so this kind of database is also known as Dynamo-style.
In some leaderless implementations, the client directly sends its writes to several replicas, while in others, a coordinator node does this on behalf of the client. However, unlike a leader database, that coordinator does not enforce a particular ordering of writes.
- Read repair - When a client makes a read from several nodes in parallel, it can detect any stale responses.
- Anti-entropy process - a background process that constantly looks for differences in the data between replicas and copies any missing data from one replica to another.
without an anti-entropy process, values that are rarely read may be missing from some replicas and thus have reduced durability.
quorum reads and writes if there are n replicas, every write must be confirmed by w nodes to be considered successful, and we must query at least r nodes for each read. (In our example, n = 3, w = 2, r = 2.) As long as w + r > n, we expect to get an up-to-date value when reading, because at least one of the r nodes we’re reading from must be up to date.
common choice is to make n an odd number (typically 3 or 5) and to set w = r = (n + 1) / 2 (rounded up).
The parameters w and r allow you to adjust the probability of stale values being read, but it’s wise to not take them as absolute guarantees.
For leader-based replication, the database typically exposes metrics for the replication lag.
with leaderless replication, there is no fixed order in which writes are applied, which makes monitoring more difficult.
try predicting the expected percentage of stale reads depending on the parameters n, w, and r?
Eventual consistency is a deliberately vague guarantee, but for operability it’s important to be able to quantify “eventual.”
- sloppy quorum writes and reads still require w and r successful responses, but those may include nodes that are not among the designated n “home” nodes for a value.
- hinted handoff Once the network interruption is fixed, any writes that one node temporarily accepted on behalf of another node are sent to the appropriate “home” nodes.
Leaderless replication is also suitable for multi-datacenter operation, since it is designed to tolerate conflicting concurrent writes, network interruptions, and latency spikes.
Detecting Concurrent Writes
- Last write wins.
- we can attach a timestamp to each write, pick the biggest timestamp as the most “recent,” and discard any writes with an earlier timestamp.