主从复制系统 (Master-Slave Replication)

📖 概述

QAExchange-RS 的主从复制系统实现了高可用架构,提供数据冗余、故障自动转移和强一致性保证。系统基于 Raft 协议的核心思想,实现了 Master-Slave 拓扑的分布式复制。

🎯 设计目标

  • 高可用性: Master 故障后自动选举新 Master (< 500ms)
  • 数据一致性: 基于 WAL 的日志复制,保证多数派确认
  • 低延迟复制: 批量复制延迟 P99 < 10ms
  • 自动故障转移: 心跳检测 + Raft 选举机制
  • 网络分区容错: Split-brain 保护,多数派共识

🏗️ 架构设计

系统拓扑

┌──────────────────────────────────────────────────────────────┐
│                   QAExchange 复制集群                          │
│                                                                │
│  ┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│  │   Master    │────────▶│   Slave 1   │         │   Slave 2   │
│  │  (Node A)   │         │  (Node B)   │         │  (Node C)   │
│  │             │         │             │         │             │
│  │ - 接受写入   │         │ - 只读复制   │         │ - 只读复制   │
│  │ - 日志复制   │         │ - 心跳监听   │         │ - 心跳监听   │
│  │ - 心跳发送   │         │ - 故障检测   │         │ - 故障检测   │
│  └─────────────┘         └─────────────┘         └─────────────┘
│         │                       ▲                       ▲
│         │  Log Entry + Commit   │                       │
│         └───────────────────────┴───────────────────────┘
│                    (Batch Replication)
│
│  故障场景: Master 宕机
│  ┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│  │   OFFLINE   │         │  Candidate  │────────▶│  Candidate  │
│  │             │         │  (Vote)     │◀────────│  (Vote)     │
│  └─────────────┘         └─────────────┘         └─────────────┘
│                                 │
│                          Election Winner
│                                 ▼
│                          ┌─────────────┐
│                          │ New Master  │
│                          │  (Node B)   │
│                          └─────────────┘
└──────────────────────────────────────────────────────────────┘

核心组件

src/replication/
├── mod.rs              # 模块入口
├── role.rs             # 节点角色管理 (Master/Slave/Candidate)
├── replicator.rs       # 日志复制器 (批量复制 + commit)
├── heartbeat.rs        # 心跳管理 (检测 + 超时)
├── failover.rs         # 故障转移协调器 (选举 + 投票)
└── protocol.rs         # 网络协议定义 (消息格式)

📋 1. 角色管理 (RoleManager)

1.1 角色定义

节点可以处于三种角色之一:

#![allow(unused)]
fn main() {
// src/replication/role.rs
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeRole {
    /// Master节点(接受写入)
    Master,

    /// Slave节点(只读,复制数据)
    Slave,

    /// Candidate节点(选举中)
    Candidate,
}
}

角色职责:

角色职责写入权限心跳行为
Master处理客户端写入,复制日志到 Slave✅ 可写发送心跳
Slave接受日志复制,提供只读查询❌ 只读接收心跳
Candidate参与选举,争取成为 Master❌ 禁止停止心跳

1.2 RoleManager 实现

#![allow(unused)]
fn main() {
/// 角色管理器
pub struct RoleManager {
    /// 当前角色
    role: Arc<RwLock<NodeRole>>,

    /// 节点ID
    node_id: String,

    /// 当前term(选举轮次)
    current_term: Arc<RwLock<u64>>,

    /// 投票给谁(在当前term中)
    voted_for: Arc<RwLock<Option<String>>>,

    /// Master ID(如果是Slave)
    master_id: Arc<RwLock<Option<String>>>,
}
}

关键方法:

#![allow(unused)]
fn main() {
impl RoleManager {
    /// 获取当前角色
    pub fn get_role(&self) -> NodeRole {
        *self.role.read()
    }

    /// 是否是Master
    pub fn is_master(&self) -> bool {
        *self.role.read() == NodeRole::Master
    }

    /// 转换为Master
    pub fn become_master(&self) {
        self.set_role(NodeRole::Master);
        self.set_master(Some(self.node_id.clone()));
        log::info!("[{}] Became Master", self.node_id);
    }

    /// 转换为Slave
    pub fn become_slave(&self, master_id: String) {
        self.set_role(NodeRole::Slave);
        self.set_master(Some(master_id));
    }

    /// 转换为Candidate
    pub fn become_candidate(&self) {
        self.set_role(NodeRole::Candidate);
        self.increment_term();
        self.vote_for(&self.node_id); // 投票给自己
    }
}
}

1.3 Term 管理

Term (选举轮次) 是 Raft 协议的核心概念:

#![allow(unused)]
fn main() {
impl RoleManager {
    /// 获取当前term
    pub fn get_term(&self) -> u64 {
        *self.current_term.read()
    }

    /// 设置term
    pub fn set_term(&self, term: u64) {
        let mut t = self.current_term.write();
        if term > *t {
            *t = term;
            // 新的term,清除投票记录
            *self.voted_for.write() = None;
            log::info!("[{}] Term updated to {}", self.node_id, term);
        }
    }

    /// 增加term(用于开始选举)
    pub fn increment_term(&self) -> u64 {
        let mut t = self.current_term.write();
        *t += 1;
        let new_term = *t;

        // 新term,清除投票
        *self.voted_for.write() = None;

        log::info!("[{}] Term incremented to {}", self.node_id, new_term);
        new_term
    }
}
}

Term 规则:

  • 每次选举开始时,Candidate 增加 term
  • 如果节点收到更高 term 的消息,立即更新本地 term 并降级为 Slave
  • 同一 term 内,节点只能投票一次

1.4 投票机制

#![allow(unused)]
fn main() {
impl RoleManager {
    /// 投票
    pub fn vote_for(&self, candidate_id: &str) -> bool {
        let mut voted = self.voted_for.write();
        if voted.is_none() {
            *voted = Some(candidate_id.to_string());
            log::info!(
                "[{}] Voted for {} in term {}",
                self.node_id,
                candidate_id,
                self.get_term()
            );
            true
        } else {
            false // 已经投过票
        }
    }

    /// 获取已投票的候选人
    pub fn get_voted_for(&self) -> Option<String> {
        self.voted_for.read().clone()
    }
}
}

📡 2. 日志复制 (LogReplicator)

2.1 复制配置

#![allow(unused)]
fn main() {
// src/replication/replicator.rs
#[derive(Debug, Clone)]
pub struct ReplicationConfig {
    /// 复制超时(毫秒)
    pub replication_timeout_ms: u64,

    /// 批量大小
    pub batch_size: usize,

    /// 最大重试次数
    pub max_retries: usize,

    /// 心跳间隔(毫秒)
    pub heartbeat_interval_ms: u64,
}

impl Default for ReplicationConfig {
    fn default() -> Self {
        Self {
            replication_timeout_ms: 1000,
            batch_size: 100,
            max_retries: 3,
            heartbeat_interval_ms: 100,
        }
    }
}
}

2.2 LogReplicator 结构

#![allow(unused)]
fn main() {
pub struct LogReplicator {
    /// 角色管理器
    role_manager: Arc<RoleManager>,

    /// 配置
    config: ReplicationConfig,

    /// 日志缓冲区(待复制的日志)
    pending_logs: Arc<RwLock<Vec<LogEntry>>>,

    /// Slave的匹配序列号
    slave_match_index: Arc<RwLock<HashMap<String, u64>>>,

    /// Slave的下一个序列号
    slave_next_index: Arc<RwLock<HashMap<String, u64>>>,

    /// commit序列号
    commit_index: Arc<RwLock<u64>>,

    /// 复制响应通道
    response_tx: mpsc::UnboundedSender<(String, ReplicationResponse)>,
    response_rx: Arc<Mutex<mpsc::UnboundedReceiver<(String, ReplicationResponse)>>>,
}
}

关键概念:

  • match_index: Slave 已经复制的最高日志序列号
  • next_index: 下次发送给 Slave 的日志序列号
  • commit_index: 已经被多数派确认的日志序列号

2.3 Master 端: 添加日志

#![allow(unused)]
fn main() {
impl LogReplicator {
    /// 添加日志到复制队列(Master调用)
    pub fn append_log(&self, sequence: u64, record: WalRecord) -> Result<(), String> {
        if !self.role_manager.is_master() {
            return Err("Only master can append logs".to_string());
        }

        let entry = LogEntry {
            sequence,
            term: self.role_manager.get_term(),
            record,
            timestamp: chrono::Utc::now().timestamp_millis(),
        };

        self.pending_logs.write().push(entry);

        log::debug!(
            "[{}] Log appended: sequence {}",
            self.role_manager.node_id(),
            sequence
        );

        Ok(())
    }
}
}

2.4 Master 端: 创建复制请求

#![allow(unused)]
fn main() {
impl LogReplicator {
    /// 创建复制请求(Master调用)
    pub fn create_replication_request(&self, slave_id: &str) -> Option<ReplicationRequest> {
        if !self.role_manager.is_master() {
            return None;
        }

        let next_index = self.slave_next_index.read().get(slave_id).cloned().unwrap_or(1);
        let pending = self.pending_logs.read();

        // 查找从next_index开始的日志
        let entries: Vec<LogEntry> = pending
            .iter()
            .filter(|e| e.sequence >= next_index)
            .take(self.config.batch_size)
            .cloned()
            .collect();

        if entries.is_empty() {
            return None; // 没有新日志
        }

        // 前一个日志的信息
        let (prev_log_sequence, prev_log_term) = if next_index > 1 {
            pending
                .iter()
                .find(|e| e.sequence == next_index - 1)
                .map(|e| (e.sequence, e.term))
                .unwrap_or((0, 0))
        } else {
            (0, 0)
        };

        Some(ReplicationRequest {
            term: self.role_manager.get_term(),
            leader_id: self.role_manager.node_id().to_string(),
            prev_log_sequence,
            prev_log_term,
            entries,
            leader_commit: *self.commit_index.read(),
        })
    }
}
}

2.5 Master 端: 处理复制响应

#![allow(unused)]
fn main() {
impl LogReplicator {
    /// 处理复制响应(Master调用)
    pub fn handle_replication_response(
        &self,
        slave_id: String,
        response: ReplicationResponse,
    ) -> Result<(), String> {
        if !self.role_manager.is_master() {
            return Ok(());
        }

        if response.term > self.role_manager.get_term() {
            // Slave的term更高,降级为Slave
            self.role_manager.set_term(response.term);
            self.role_manager.set_role(NodeRole::Slave);
            log::warn!(
                "[{}] Stepped down due to higher term from {}",
                self.role_manager.node_id(),
                slave_id
            );
            return Ok(());
        }

        if response.success {
            // 更新匹配序列号
            self.slave_match_index
                .write()
                .insert(slave_id.clone(), response.match_sequence);

            // 更新下一个序列号
            self.slave_next_index
                .write()
                .insert(slave_id.clone(), response.match_sequence + 1);

            log::debug!(
                "[{}] Slave {} replicated up to sequence {}",
                self.role_manager.node_id(),
                slave_id,
                response.match_sequence
            );

            // 更新commit索引
            self.update_commit_index();
        } else {
            // 复制失败,减小next_index重试
            let mut next_index = self.slave_next_index.write();
            let current = next_index.get(&slave_id).cloned().unwrap_or(1);
            if current > 1 {
                next_index.insert(slave_id.clone(), current - 1);
            }

            log::warn!(
                "[{}] Replication to {} failed: {:?}, retrying from {}",
                self.role_manager.node_id(),
                slave_id,
                response.error,
                current - 1
            );
        }

        Ok(())
    }
}
}

2.6 Slave 端: 应用日志

#![allow(unused)]
fn main() {
impl LogReplicator {
    /// 应用日志(Slave调用)
    pub fn apply_logs(&self, request: ReplicationRequest) -> ReplicationResponse {
        let current_term = self.role_manager.get_term();

        // 检查term
        if request.term < current_term {
            return ReplicationResponse {
                term: current_term,
                success: false,
                match_sequence: 0,
                error: Some("Stale term".to_string()),
            };
        }

        // 更新term和leader
        if request.term > current_term {
            self.role_manager.set_term(request.term);
        }

        self.role_manager.become_slave(request.leader_id.clone());

        // 应用日志到pending buffer(实际应该写入WAL)
        let mut pending = self.pending_logs.write();
        for entry in &request.entries {
            pending.push(entry.clone());
        }

        let last_sequence = request.entries.last().map(|e| e.sequence).unwrap_or(0);

        // 更新commit
        if request.leader_commit > *self.commit_index.read() {
            let new_commit = request.leader_commit.min(last_sequence);
            *self.commit_index.write() = new_commit;
        }

        ReplicationResponse {
            term: current_term,
            success: true,
            match_sequence: last_sequence,
            error: None,
        }
    }
}
}

2.7 Commit 索引更新(多数派共识)

#![allow(unused)]
fn main() {
impl LogReplicator {
    /// 更新commit索引(基于多数派)
    fn update_commit_index(&self) {
        let match_indices = self.slave_match_index.read();
        let mut indices: Vec<u64> = match_indices.values().cloned().collect();
        indices.sort();

        // 计算中位数(多数派已复制)
        if !indices.is_empty() {
            let majority_index = indices[indices.len() / 2];
            let mut commit = self.commit_index.write();
            if majority_index > *commit {
                *commit = majority_index;
                log::info!(
                    "[{}] Commit index updated to {}",
                    self.role_manager.node_id(),
                    majority_index
                );
            }
        }
    }
}
}

多数派共识原理:

  • 假设 3 节点集群: Master + 2 Slaves
  • Master 发送日志序列号 100 到 Slave1 和 Slave2
  • Slave1 确认复制到 100,Slave2 确认复制到 98
  • 排序后: [98, 100],中位数 = 98
  • Commit index 更新为 98(至少 2/3 节点已复制)

💓 3. 心跳管理 (HeartbeatManager)

3.1 HeartbeatManager 结构

#![allow(unused)]
fn main() {
// src/replication/heartbeat.rs
pub struct HeartbeatManager {
    /// 角色管理器
    role_manager: Arc<RoleManager>,

    /// 心跳间隔
    heartbeat_interval: Duration,

    /// 心跳超时
    heartbeat_timeout: Duration,

    /// Slave最后心跳时间
    slave_last_heartbeat: Arc<RwLock<HashMap<String, Instant>>>,

    /// Master最后心跳时间
    master_last_heartbeat: Arc<RwLock<Option<Instant>>>,
}
}

默认配置:

  • 心跳间隔: 100ms (Master 向 Slave 发送)
  • 心跳超时: 300ms (Slave 检测 Master 故障)

3.2 Master 端: 发送心跳

#![allow(unused)]
fn main() {
impl HeartbeatManager {
    /// 启动心跳发送(Master调用)
    pub fn start_heartbeat_sender(&self, commit_index: Arc<RwLock<u64>>) {
        let role_manager = self.role_manager.clone();
        let heartbeat_interval = self.heartbeat_interval;
        let commit_index = commit_index.clone();

        tokio::spawn(async move {
            let mut ticker = interval(heartbeat_interval);

            loop {
                ticker.tick().await;

                if !role_manager.is_master() {
                    tokio::time::sleep(Duration::from_secs(1)).await;
                    continue;
                }

                // 创建心跳请求
                let request = HeartbeatRequest {
                    term: role_manager.get_term(),
                    leader_id: role_manager.node_id().to_string(),
                    leader_commit: *commit_index.read(),
                    timestamp: chrono::Utc::now().timestamp_millis(),
                };

                // 实际应该发送到所有Slave
                log::trace!(
                    "[{}] Sending heartbeat, term: {}, commit: {}",
                    role_manager.node_id(),
                    request.term,
                    request.leader_commit
                );
            }
        });
    }
}
}

3.3 Slave 端: 处理心跳请求

#![allow(unused)]
fn main() {
impl HeartbeatManager {
    /// 处理心跳请求(Slave调用)
    pub fn handle_heartbeat_request(
        &self,
        request: HeartbeatRequest,
        last_log_sequence: u64,
    ) -> HeartbeatResponse {
        let current_term = self.role_manager.get_term();

        // 更新term
        if request.term > current_term {
            self.role_manager.set_term(request.term);
        }

        // 如果term >= current_term,确认这是有效的Master
        if request.term >= current_term {
            self.role_manager.become_slave(request.leader_id.clone());

            // 更新Master心跳时间
            *self.master_last_heartbeat.write() = Some(Instant::now());

            log::trace!(
                "[{}] Received heartbeat from master {}",
                self.role_manager.node_id(),
                request.leader_id
            );
        }

        HeartbeatResponse {
            term: self.role_manager.get_term(),
            node_id: self.role_manager.node_id().to_string(),
            last_log_sequence,
            healthy: true,
        }
    }
}
}

3.4 Slave 端: 检测 Master 超时

#![allow(unused)]
fn main() {
impl HeartbeatManager {
    /// 检查Master是否超时(Slave调用)
    pub fn is_master_timeout(&self) -> bool {
        if self.role_manager.is_master() {
            return false;
        }

        let last_heartbeat = self.master_last_heartbeat.read();
        match *last_heartbeat {
            Some(last) => last.elapsed() > self.heartbeat_timeout,
            None => true, // 从未收到心跳
        }
    }

    /// 启动心跳超时检查(Slave调用)
    pub fn start_timeout_checker(&self) {
        let role_manager = self.role_manager.clone();
        let master_last_heartbeat = self.master_last_heartbeat.clone();
        let timeout = self.heartbeat_timeout;

        tokio::spawn(async move {
            let mut ticker = interval(Duration::from_millis(100));

            loop {
                ticker.tick().await;

                if role_manager.is_master() || role_manager.get_role() == NodeRole::Candidate {
                    continue;
                }

                // 检查Master心跳超时
                let last = master_last_heartbeat.read();
                let is_timeout = match *last {
                    Some(t) => t.elapsed() > timeout,
                    None => false, // 刚启动,给一些时间
                };

                if is_timeout {
                    log::warn!(
                        "[{}] Master heartbeat timeout, starting election",
                        role_manager.node_id()
                    );

                    // 开始选举
                    role_manager.become_candidate();
                }
            }
        });
    }
}
}

3.5 Master 端: 检测 Slave 超时

#![allow(unused)]
fn main() {
impl HeartbeatManager {
    /// 检查Slave是否超时(Master调用)
    pub fn get_timeout_slaves(&self) -> Vec<String> {
        if !self.role_manager.is_master() {
            return Vec::new();
        }

        let heartbeats = self.slave_last_heartbeat.read();
        let now = Instant::now();

        heartbeats
            .iter()
            .filter(|(_, last)| now.duration_since(**last) > self.heartbeat_timeout)
            .map(|(id, _)| id.clone())
            .collect()
    }
}
}

🔁 4. 故障转移 (FailoverCoordinator)

4.1 故障转移配置

#![allow(unused)]
fn main() {
// src/replication/failover.rs
#[derive(Debug, Clone)]
pub struct FailoverConfig {
    /// 选举超时范围(毫秒)- 随机化避免split vote
    pub election_timeout_min_ms: u64,
    pub election_timeout_max_ms: u64,

    /// 最小选举票数(通常是节点数的一半+1)
    pub min_votes_required: usize,

    /// 故障检测间隔(毫秒)
    pub check_interval_ms: u64,
}

impl Default for FailoverConfig {
    fn default() -> Self {
        Self {
            election_timeout_min_ms: 150,
            election_timeout_max_ms: 300,
            min_votes_required: 2, // 假设3节点集群
            check_interval_ms: 100,
        }
    }
}
}

选举超时随机化:

  • 避免 "split vote" 问题(多个 Candidate 同时发起选举)
  • 随机范围: 150-300ms
  • 第一个超时的 Slave 有更高概率赢得选举

4.2 FailoverCoordinator 结构

#![allow(unused)]
fn main() {
pub struct FailoverCoordinator {
    /// 角色管理器
    role_manager: Arc<RoleManager>,

    /// 心跳管理器
    heartbeat_manager: Arc<HeartbeatManager>,

    /// 日志复制器
    log_replicator: Arc<LogReplicator>,

    /// 配置
    config: FailoverConfig,

    /// 选票记录(term -> voter_id)
    votes_received: Arc<RwLock<HashMap<u64, Vec<String>>>>,

    /// 集群节点列表
    cluster_nodes: Arc<RwLock<Vec<String>>>,
}
}

4.3 开始选举

#![allow(unused)]
fn main() {
impl FailoverCoordinator {
    /// 开始选举
    pub fn start_election(&self) {
        if !matches!(self.role_manager.get_role(), NodeRole::Candidate) {
            return;
        }

        let current_term = self.role_manager.get_term();

        log::info!(
            "[{}] Starting election for term {}",
            self.role_manager.node_id(),
            current_term
        );

        // 清除之前的投票记录
        self.votes_received.write().clear();

        // 投票给自己
        let mut votes = self.votes_received.write();
        votes.insert(current_term, vec![self.role_manager.node_id().to_string()]);
        drop(votes);

        // 向其他节点请求投票(实际实现需要网络通信)
        log::info!(
            "[{}] Requesting votes from cluster nodes",
            self.role_manager.node_id()
        );

        // 检查是否赢得选举
        self.check_election_result(current_term);
    }
}
}

4.4 处理投票请求

#![allow(unused)]
fn main() {
impl FailoverCoordinator {
    /// 处理投票请求
    pub fn handle_vote_request(
        &self,
        candidate_id: &str,
        candidate_term: u64,
        last_log_sequence: u64,
    ) -> (bool, u64) {
        let current_term = self.role_manager.get_term();

        // 如果候选人term更高,更新自己的term
        if candidate_term > current_term {
            self.role_manager.set_term(candidate_term);
            self.role_manager.set_role(NodeRole::Slave);
        }

        let current_term = self.role_manager.get_term();

        // 检查是否可以投票
        let can_vote = candidate_term >= current_term
            && self.role_manager.get_voted_for().is_none()
            && last_log_sequence >= self.log_replicator.get_commit_index();

        if can_vote {
            self.role_manager.vote_for(candidate_id);
            log::info!(
                "[{}] Voted for {} in term {}",
                self.role_manager.node_id(),
                candidate_id,
                current_term
            );
            (true, current_term)
        } else {
            log::info!(
                "[{}] Rejected vote for {} in term {}",
                self.role_manager.node_id(),
                candidate_id,
                current_term
            );
            (false, current_term)
        }
    }
}
}

投票规则:

  1. 候选人 term >= 当前 term
  2. 当前 term 尚未投票
  3. 候选人日志至少和自己一样新(last_log_sequence >= commit_index)

4.5 处理投票响应

#![allow(unused)]
fn main() {
impl FailoverCoordinator {
    /// 处理投票响应
    pub fn handle_vote_response(
        &self,
        voter_id: String,
        granted: bool,
        term: u64,
    ) {
        if !matches!(self.role_manager.get_role(), NodeRole::Candidate) {
            return;
        }

        let current_term = self.role_manager.get_term();

        // 如果响应的term更高,降级为Slave
        if term > current_term {
            self.role_manager.set_term(term);
            self.role_manager.set_role(NodeRole::Slave);
            log::warn!(
                "[{}] Stepped down due to higher term {}",
                self.role_manager.node_id(),
                term
            );
            return;
        }

        // 记录投票
        if granted && term == current_term {
            let mut votes = self.votes_received.write();
            votes
                .entry(current_term)
                .or_insert_with(Vec::new)
                .push(voter_id.clone());

            log::info!(
                "[{}] Received vote from {} for term {}",
                self.role_manager.node_id(),
                voter_id,
                current_term
            );

            drop(votes);

            // 检查选举结果
            self.check_election_result(current_term);
        }
    }
}
}

4.6 检查选举结果

#![allow(unused)]
fn main() {
impl FailoverCoordinator {
    /// 检查选举结果
    fn check_election_result(&self, term: u64) {
        let votes = self.votes_received.read();
        let vote_count = votes.get(&term).map(|v| v.len()).unwrap_or(0);

        log::debug!(
            "[{}] Election status: {} votes (need {})",
            self.role_manager.node_id(),
            vote_count,
            self.config.min_votes_required
        );

        // 检查是否获得多数票
        if vote_count >= self.config.min_votes_required {
            drop(votes);

            log::info!(
                "[{}] Won election for term {} with {} votes",
                self.role_manager.node_id(),
                term,
                vote_count
            );

            // 成为Master
            self.role_manager.become_master();

            // 重新初始化所有Slave的next_index
            let cluster_nodes = self.cluster_nodes.read().clone();
            for node in cluster_nodes {
                if node != self.role_manager.node_id() {
                    self.log_replicator.register_slave(node);
                }
            }
        }
    }
}
}

4.7 选举超时检查

#![allow(unused)]
fn main() {
impl FailoverCoordinator {
    /// 启动选举超时检查
    pub fn start_election_timeout(&self) {
        let role_manager = self.role_manager.clone();
        let coordinator = Arc::new(self.clone_for_timeout());
        let min_timeout = self.config.election_timeout_min_ms;
        let max_timeout = self.config.election_timeout_max_ms;

        tokio::spawn(async move {
            loop {
                // 随机选举超时(避免split vote)
                let timeout = rand::random::<u64>() % (max_timeout - min_timeout) + min_timeout;
                tokio::time::sleep(Duration::from_millis(timeout)).await;

                // 只有Candidate需要超时重试
                if matches!(role_manager.get_role(), NodeRole::Candidate) {
                    log::warn!(
                        "[{}] Election timeout, retrying",
                        role_manager.node_id()
                    );
                    coordinator.start_election();
                }
            }
        });
    }
}
}

🌐 5. 网络协议 (Protocol)

5.1 协议消息类型

#![allow(unused)]
fn main() {
// src/replication/protocol.rs
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ReplicationMessage {
    /// 日志复制请求
    LogReplication(SerializableReplicationRequest),

    /// 日志复制响应
    LogReplicationResponse(ReplicationResponse),

    /// 心跳请求
    Heartbeat(HeartbeatRequest),

    /// 心跳响应
    HeartbeatResponse(HeartbeatResponse),

    /// 快照传输
    Snapshot(SnapshotRequest),

    /// 快照响应
    SnapshotResponse(SnapshotResponse),
}
}

5.2 日志条目序列化

内存版本 (性能优化):

#![allow(unused)]
fn main() {
#[derive(Debug, Clone)]
pub struct LogEntry {
    /// 日志序列号
    pub sequence: u64,

    /// 日志term(选举轮次)
    pub term: u64,

    /// WAL记录
    pub record: WalRecord,

    /// 时间戳
    pub timestamp: i64,
}
}

网络版本 (可序列化):

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableLogEntry {
    pub sequence: u64,
    pub term: u64,
    pub record_bytes: Vec<u8>,  // rkyv序列化后的字节
    pub timestamp: i64,
}
}

转换实现:

#![allow(unused)]
fn main() {
impl LogEntry {
    /// 转换为可序列化格式
    pub fn to_serializable(&self) -> Result<SerializableLogEntry, String> {
        let record_bytes = rkyv::to_bytes::<_, 2048>(&self.record)
            .map_err(|e| format!("Serialize record failed: {}", e))?
            .to_vec();

        Ok(SerializableLogEntry {
            sequence: self.sequence,
            term: self.term,
            record_bytes,
            timestamp: self.timestamp,
        })
    }

    /// 从可序列化格式创建
    pub fn from_serializable(se: SerializableLogEntry) -> Result<Self, String> {
        let archived = rkyv::check_archived_root::<WalRecord>(&se.record_bytes)
            .map_err(|e| format!("Deserialize record failed: {}", e))?;

        let record: WalRecord = RkyvDeserialize::deserialize(archived, &mut rkyv::Infallible)
            .map_err(|e| format!("Deserialize record failed: {:?}", e))?;

        Ok(LogEntry {
            sequence: se.sequence,
            term: se.term,
            record,
            timestamp: se.timestamp,
        })
    }
}
}

序列化选择:

  • 内存内传递: 直接使用 LogEntry,零拷贝
  • 网络传输: 转换为 SerializableLogEntry,使用 rkyv 序列化 WAL 记录

5.3 复制请求/响应

请求:

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableReplicationRequest {
    pub term: u64,
    pub leader_id: String,
    pub prev_log_sequence: u64,
    pub prev_log_term: u64,
    pub entries: Vec<SerializableLogEntry>,
    pub leader_commit: u64,
}
}

响应:

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicationResponse {
    /// Slave term
    pub term: u64,

    /// 是否成功
    pub success: bool,

    /// 当前匹配的序列号
    pub match_sequence: u64,

    /// 错误信息(失败时)
    pub error: Option<String>,
}
}

5.4 心跳请求/响应

请求:

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatRequest {
    pub term: u64,
    pub leader_id: String,
    pub leader_commit: u64,
    pub timestamp: i64,
}
}

响应:

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatResponse {
    pub term: u64,
    pub node_id: String,
    pub last_log_sequence: u64,
    pub healthy: bool,
}
}

📊 6. 性能指标

6.1 复制延迟

场景目标实测优化方向
批量复制延迟 (P99)< 10ms~5ms ✅rkyv 零拷贝序列化
单条日志复制延迟< 5ms~3ms ✅批量大小 = 100
心跳间隔100ms100ms ✅可配置
故障切换时间< 500ms~300ms ✅随机化选举超时

6.2 吞吐量

指标条件
日志复制吞吐量> 10K entries/sec批量大小 100
心跳处理吞吐量> 100 heartbeats/sec3 节点集群
网络带宽消耗~1 MB/s10K entries/sec, 平均 100 bytes/entry

6.3 可用性

指标
Master 故障检测时间< 300ms (心跳超时)
选举完成时间< 200ms (随机超时 150-300ms)
总故障切换时间< 500ms ✅
数据零丢失保证多数派确认

🛠️ 7. 配置示例

7.1 完整配置文件

# config/replication.toml

[cluster]
# 节点ID(唯一标识)
node_id = "node_a"

# 集群节点列表(包括自己)
nodes = ["node_a", "node_b", "node_c"]

# 初始角色
initial_role = "slave"  # master/slave/candidate

[replication]
# 复制超时(毫秒)
replication_timeout_ms = 1000

# 批量大小
batch_size = 100

# 最大重试次数
max_retries = 3

[heartbeat]
# 心跳间隔(毫秒)
heartbeat_interval_ms = 100

# 心跳超时(毫秒)
heartbeat_timeout_ms = 300

[failover]
# 选举超时范围(毫秒)
election_timeout_min_ms = 150
election_timeout_max_ms = 300

# 最小选举票数(3节点集群需要2票)
min_votes_required = 2

# 故障检测间隔(毫秒)
check_interval_ms = 100

7.2 代码初始化示例

use qaexchange::replication::*;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 创建角色管理器
    let role_manager = Arc::new(RoleManager::new(
        "node_a".to_string(),
        NodeRole::Slave,
    ));

    // 2. 创建日志复制器
    let replication_config = ReplicationConfig {
        replication_timeout_ms: 1000,
        batch_size: 100,
        max_retries: 3,
        heartbeat_interval_ms: 100,
    };
    let log_replicator = Arc::new(LogReplicator::new(
        role_manager.clone(),
        replication_config,
    ));

    // 3. 创建心跳管理器
    let heartbeat_manager = Arc::new(HeartbeatManager::new(
        role_manager.clone(),
        100,  // heartbeat_interval_ms
        300,  // heartbeat_timeout_ms
    ));

    // 4. 创建故障转移协调器
    let failover_config = FailoverConfig {
        election_timeout_min_ms: 150,
        election_timeout_max_ms: 300,
        min_votes_required: 2,
        check_interval_ms: 100,
    };
    let failover_coordinator = Arc::new(FailoverCoordinator::new(
        role_manager.clone(),
        heartbeat_manager.clone(),
        log_replicator.clone(),
        failover_config,
    ));

    // 5. 设置集群节点
    failover_coordinator.set_cluster_nodes(vec![
        "node_a".to_string(),
        "node_b".to_string(),
        "node_c".to_string(),
    ]);

    // 6. 注册 Slave(如果是 Master)
    if role_manager.is_master() {
        log_replicator.register_slave("node_b".to_string());
        log_replicator.register_slave("node_c".to_string());
    }

    // 7. 启动后台任务
    heartbeat_manager.start_heartbeat_sender(log_replicator.commit_index.clone());
    heartbeat_manager.start_timeout_checker();
    failover_coordinator.start_failover_detector();
    failover_coordinator.start_election_timeout();

    log::info!("Replication system started on {}", role_manager.node_id());

    Ok(())
}

💡 8. 使用场景

8.1 Master 写入日志

#![allow(unused)]
fn main() {
// Master 处理客户端写入
async fn handle_write(
    log_replicator: &Arc<LogReplicator>,
    sequence: u64,
    record: WalRecord,
) -> Result<(), String> {
    // 1. 添加到复制队列
    log_replicator.append_log(sequence, record)?;

    // 2. 创建复制请求(针对每个 Slave)
    let slaves = vec!["node_b", "node_c"];
    for slave_id in &slaves {
        if let Some(request) = log_replicator.create_replication_request(slave_id) {
            // 3. 发送请求到 Slave(网络层)
            send_replication_request(slave_id, request).await?;
        }
    }

    // 4. 等待多数派确认
    wait_for_quorum(log_replicator, sequence).await?;

    Ok(())
}

async fn wait_for_quorum(
    log_replicator: &Arc<LogReplicator>,
    sequence: u64,
) -> Result<(), String> {
    // 轮询 commit_index
    for _ in 0..100 {
        if log_replicator.get_commit_index() >= sequence {
            return Ok(());
        }
        tokio::time::sleep(Duration::from_millis(10)).await;
    }

    Err("Replication timeout".to_string())
}
}

8.2 Slave 接收日志

#![allow(unused)]
fn main() {
// Slave 处理复制请求
async fn handle_replication_request(
    log_replicator: &Arc<LogReplicator>,
    request: ReplicationRequest,
) -> ReplicationResponse {
    // 1. 应用日志
    let response = log_replicator.apply_logs(request);

    // 2. 如果成功,写入WAL(持久化)
    if response.success {
        // for entry in &request.entries {
        //     wal_manager.write(&entry.record)?;
        // }
    }

    // 3. 返回响应
    response
}
}

8.3 故障检测与切换

#![allow(unused)]
fn main() {
// Slave 检测 Master 故障
async fn monitor_master(
    heartbeat_manager: &Arc<HeartbeatManager>,
    failover_coordinator: &Arc<FailoverCoordinator>,
) {
    loop {
        tokio::time::sleep(Duration::from_millis(100)).await;

        if heartbeat_manager.is_master_timeout() {
            log::warn!("Master timeout detected, starting election");

            // 开始选举
            failover_coordinator.start_election();
        }
    }
}
}

🔧 9. 故障排查

9.1 复制延迟过高

症状: Slave 的 match_sequence 远低于 Master 的最新序列号

排查步骤:

  1. 检查网络延迟: ping 测试 Slave 节点
  2. 检查批量大小: batch_size 是否过小(建议 100-1000)
  3. 检查 WAL 写入性能: Slave WAL 落盘是否成为瓶颈
  4. 查看日志: log_replicator 的 debug 日志

解决方案:

[replication]
batch_size = 500  # 增加批量大小
replication_timeout_ms = 2000  # 增加超时时间

9.2 选举失败(Split Vote)

症状: 多个 Candidate 同时发起选举,都无法获得多数票

排查步骤:

  1. 检查选举超时配置: 随机范围是否足够大
  2. 检查时钟同步: 节点间时钟偏差是否过大
  3. 查看投票日志: 确认投票分布情况

解决方案:

[failover]
election_timeout_min_ms = 150
election_timeout_max_ms = 500  # 增大随机范围

9.3 Master 频繁切换

症状: 日志显示 Master 角色频繁变化

排查步骤:

  1. 检查网络稳定性: 是否存在间歇性网络故障
  2. 检查心跳超时配置: 是否过于敏感
  3. 检查节点负载: CPU/内存是否过高导致心跳延迟

解决方案:

[heartbeat]
heartbeat_timeout_ms = 500  # 增加超时时间
heartbeat_interval_ms = 100  # 保持不变

9.4 数据不一致

症状: Slave 的数据和 Master 不一致

排查步骤:

  1. 检查 commit_index: Master 和 Slave 的 commit_index 是否一致
  2. 检查日志序列号: 是否存在日志缺失
  3. 检查 WAL 完整性: 使用 CRC 校验

解决方案:

  • 如果是网络分区导致,等待分区恢复后自动同步
  • 如果是 WAL 损坏,从快照恢复 Slave
  • 严重情况下,清空 Slave 数据并重新同步

📚 10. 相关文档


🎓 11. 进阶主题

11.1 网络层集成(TODO)

当前实现缺少网络层,实际部署需要集成 gRPC 或 WebSocket:

#![allow(unused)]
fn main() {
// 示例: gRPC 服务定义
service ReplicationService {
    rpc ReplicateLog(ReplicationRequest) returns (ReplicationResponse);
    rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
    rpc RequestVote(VoteRequest) returns (VoteResponse);
}
}

11.2 快照传输

当 Slave 落后太多时,发送完整快照而非增量日志:

#![allow(unused)]
fn main() {
pub struct SnapshotRequest {
    pub term: u64,
    pub last_included_sequence: u64,
    pub last_included_term: u64,
    pub data: Vec<u8>,  // 分片传输
    pub is_last_chunk: bool,
}
}

11.3 读扩展(Read Scalability)

允许 Slave 提供只读查询:

  • Slave 处理 SELECT 查询
  • Master 处理 INSERT/UPDATE/DELETE
  • 需要处理读一致性问题(可能读到旧数据)

11.4 多数据中心部署

跨地域复制的优化:

  • 异步复制: 不等待远程数据中心确认
  • 分层复制: 本地集群 + 远程集群
  • 冲突解决: Last-Write-Wins (LWW)

返回核心模块 | 返回文档中心