因子 WAL 集成 (Factor WAL Persister)

@yutiansut @quantaxis

📖 概述

因子 WAL 集成模块实现了因子计算结果的异步持久化,将流式因子引擎 (StreamFactorEngine) 与 WAL (Write-Ahead Log) 系统无缝对接,确保因子计算状态的持久性和可恢复性。

🎯 设计目标

  • 异步持久化: 不阻塞因子计算主流程
  • 批量写入: 聚合多次更新,减少 I/O 操作
  • 零丢失: Group Commit 保证数据完整性
  • 快速恢复: 支持从 WAL 恢复因子状态
  • 低延迟: 异步 Channel 解耦计算与存储

🏗️ 架构设计

整体数据流

┌─────────────────────────────────────────────────────────────┐
│                    因子 WAL 集成架构                          │
│                                                             │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │ Tick Data   │───▶│ StreamFactor│───▶│FactorWal    │     │
│  │ (行情输入)  │    │  Engine     │    │  Persister  │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
│                            │                  │             │
│                            │                  │ async       │
│                            ▼                  ▼             │
│                    ┌─────────────┐    ┌─────────────┐      │
│                    │ Factor Cache│    │    WAL      │      │
│                    │ (内存视图)  │    │ (持久化)    │      │
│                    └─────────────┘    └─────────────┘      │
│                                              │              │
│                                              ▼              │
│                                       ┌─────────────┐      │
│                                       │  SSTable    │      │
│                                       │ (归档存储)  │      │
│                                       └─────────────┘      │
└─────────────────────────────────────────────────────────────┘

核心组件

组件职责线程模型
FactorWalPersister异步持久化协调器后台线程
WalStreamFactorEngine带持久化的因子引擎主线程
FactorWalConsumerWAL 消费者/恢复器恢复线程
FactorWalMessage持久化消息类型-

🔧 核心实现

1. 消息类型定义

#![allow(unused)]
fn main() {
// src/factor/wal_persister.rs

/// 因子 WAL 消息类型
#[derive(Debug, Clone)]
pub enum FactorWalMessage {
    /// 单值更新
    Update {
        instrument_id: String,
        factor_id: String,
        value: f64,
        source_timestamp: i64,
    },

    /// 向量更新(批量因子)
    VectorUpdate {
        instrument_id: String,
        factor_id: String,
        values: Vec<f64>,
        source_timestamp: i64,
    },

    /// 因子快照
    Snapshot {
        instrument_id: String,
        factors: HashMap<String, f64>,
        checkpoint_id: u64,
    },

    /// 优雅关闭
    Shutdown,
}
}

2. 异步持久化器

#![allow(unused)]
fn main() {
use crossbeam::channel::{Sender, Receiver, unbounded};
use std::sync::Arc;
use std::thread;

/// 因子 WAL 异步持久化器
pub struct FactorWalPersister {
    /// 消息发送端
    tx: Sender<FactorWalMessage>,

    /// 后台线程句柄
    worker_handle: Option<thread::JoinHandle<()>>,

    /// 运行状态
    running: Arc<AtomicBool>,

    /// 配置
    config: FactorWalConfig,
}

/// 持久化配置
#[derive(Debug, Clone)]
pub struct FactorWalConfig {
    /// 批量写入大小
    pub batch_size: usize,

    /// 最大延迟(毫秒)
    pub max_delay_ms: u64,

    /// 是否启用 Group Commit
    pub enable_group_commit: bool,
}

impl Default for FactorWalConfig {
    fn default() -> Self {
        Self {
            batch_size: 100,
            max_delay_ms: 10,
            enable_group_commit: true,
        }
    }
}

impl FactorWalPersister {
    /// 创建新的持久化器
    pub fn new(wal: Arc<RwLock<WalManager>>, config: FactorWalConfig) -> Self {
        let (tx, rx) = unbounded();
        let running = Arc::new(AtomicBool::new(true));
        let running_clone = Arc::clone(&running);
        let config_clone = config.clone();

        // 启动后台工作线程
        let worker_handle = thread::spawn(move || {
            Self::worker_loop(rx, wal, running_clone, config_clone);
        });

        Self {
            tx,
            worker_handle: Some(worker_handle),
            running,
            config,
        }
    }

    /// 发送更新消息(非阻塞)
    #[inline]
    pub fn send_update(
        &self,
        instrument_id: &str,
        factor_id: &str,
        value: f64,
        source_timestamp: i64,
    ) -> Result<(), FactorWalError> {
        self.tx
            .send(FactorWalMessage::Update {
                instrument_id: instrument_id.to_string(),
                factor_id: factor_id.to_string(),
                value,
                source_timestamp,
            })
            .map_err(|_| FactorWalError::ChannelClosed)
    }

    /// 发送快照消息
    pub fn send_snapshot(
        &self,
        instrument_id: &str,
        factors: HashMap<String, f64>,
        checkpoint_id: u64,
    ) -> Result<(), FactorWalError> {
        self.tx
            .send(FactorWalMessage::Snapshot {
                instrument_id: instrument_id.to_string(),
                factors,
                checkpoint_id,
            })
            .map_err(|_| FactorWalError::ChannelClosed)
    }

    /// 后台工作循环
    fn worker_loop(
        rx: Receiver<FactorWalMessage>,
        wal: Arc<RwLock<WalManager>>,
        running: Arc<AtomicBool>,
        config: FactorWalConfig,
    ) {
        let mut batch = Vec::with_capacity(config.batch_size);
        let mut last_flush = Instant::now();

        while running.load(Ordering::Relaxed) {
            // 尝试接收消息(带超时)
            match rx.recv_timeout(Duration::from_millis(config.max_delay_ms)) {
                Ok(FactorWalMessage::Shutdown) => {
                    // 优雅关闭:先 flush 剩余数据
                    if !batch.is_empty() {
                        Self::flush_batch(&wal, &mut batch, config.enable_group_commit);
                    }
                    break;
                }
                Ok(msg) => {
                    batch.push(msg);

                    // 批量满或超时,执行 flush
                    if batch.len() >= config.batch_size
                        || last_flush.elapsed().as_millis() > config.max_delay_ms as u128
                    {
                        Self::flush_batch(&wal, &mut batch, config.enable_group_commit);
                        last_flush = Instant::now();
                    }
                }
                Err(RecvTimeoutError::Timeout) => {
                    // 超时也 flush
                    if !batch.is_empty() {
                        Self::flush_batch(&wal, &mut batch, config.enable_group_commit);
                        last_flush = Instant::now();
                    }
                }
                Err(RecvTimeoutError::Disconnected) => {
                    break;
                }
            }
        }
    }

    /// 批量 flush 到 WAL
    fn flush_batch(
        wal: &Arc<RwLock<WalManager>>,
        batch: &mut Vec<FactorWalMessage>,
        enable_group_commit: bool,
    ) {
        let mut wal_guard = wal.write();

        for msg in batch.drain(..) {
            match msg {
                FactorWalMessage::Update {
                    instrument_id,
                    factor_id,
                    value,
                    source_timestamp,
                } => {
                    let record = WalRecord::FactorUpdate {
                        timestamp: source_timestamp,
                        instrument_id: instrument_id.into_bytes(),
                        factor_id: factor_id.into_bytes(),
                        value,
                    };

                    if let Err(e) = wal_guard.append(&record) {
                        tracing::error!("Failed to append factor update to WAL: {:?}", e);
                    }
                }
                FactorWalMessage::VectorUpdate {
                    instrument_id,
                    factor_id,
                    values,
                    source_timestamp,
                } => {
                    let record = WalRecord::FactorVectorUpdate {
                        timestamp: source_timestamp,
                        instrument_id: instrument_id.into_bytes(),
                        factor_id: factor_id.into_bytes(),
                        values,
                    };

                    if let Err(e) = wal_guard.append(&record) {
                        tracing::error!("Failed to append factor vector update to WAL: {:?}", e);
                    }
                }
                FactorWalMessage::Snapshot {
                    instrument_id,
                    factors,
                    checkpoint_id,
                } => {
                    let record = WalRecord::FactorSnapshot {
                        timestamp: chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
                        instrument_id: instrument_id.into_bytes(),
                        factors: factors
                            .into_iter()
                            .map(|(k, v)| (k.into_bytes(), v))
                            .collect(),
                        checkpoint_id,
                    };

                    if let Err(e) = wal_guard.append(&record) {
                        tracing::error!("Failed to append factor snapshot to WAL: {:?}", e);
                    }
                }
                FactorWalMessage::Shutdown => {}
            }
        }

        // Group Commit
        if enable_group_commit {
            if let Err(e) = wal_guard.flush_group_commit() {
                tracing::error!("Failed to flush WAL: {:?}", e);
            }
        }
    }

    /// 优雅关闭
    pub fn shutdown(&mut self) {
        self.running.store(false, Ordering::SeqCst);
        let _ = self.tx.send(FactorWalMessage::Shutdown);

        if let Some(handle) = self.worker_handle.take() {
            let _ = handle.join();
        }
    }
}

impl Drop for FactorWalPersister {
    fn drop(&mut self) {
        self.shutdown();
    }
}
}

3. 带持久化的流式因子引擎

#![allow(unused)]
fn main() {
/// 带 WAL 持久化的流式因子引擎
pub struct WalStreamFactorEngine {
    /// 底层因子引擎
    engine: StreamFactorEngine,

    /// WAL 持久化器
    persister: Arc<FactorWalPersister>,

    /// 当前合约 ID
    instrument_id: String,

    /// 是否自动持久化
    auto_persist: bool,
}

impl WalStreamFactorEngine {
    /// 创建新的带持久化的因子引擎
    pub fn new(
        instrument_id: &str,
        persister: Arc<FactorWalPersister>,
        auto_persist: bool,
    ) -> Self {
        Self {
            engine: StreamFactorEngine::new(),
            persister,
            instrument_id: instrument_id.to_string(),
            auto_persist,
        }
    }

    /// 处理 Tick 数据并持久化
    pub fn process_tick(&mut self, tick: &TickData) -> Result<HashMap<String, f64>, FactorError> {
        let timestamp = tick.timestamp;

        // 计算因子
        let factors = self.engine.process_tick(tick)?;

        // 自动持久化
        if self.auto_persist {
            for (factor_id, value) in &factors {
                if let Err(e) = self.persister.send_update(
                    &self.instrument_id,
                    factor_id,
                    *value,
                    timestamp,
                ) {
                    tracing::warn!("Failed to persist factor {}: {:?}", factor_id, e);
                }
            }
        }

        Ok(factors)
    }

    /// 注册因子
    pub fn register_factor(&mut self, factor_id: &str, operator: Box<dyn IncrementalOperator>) {
        self.engine.register(factor_id, operator);
    }

    /// 获取因子值
    pub fn get_factor(&self, factor_id: &str) -> Option<f64> {
        self.engine.get(factor_id)
    }

    /// 创建快照并持久化
    pub fn create_and_persist_snapshot(&self, checkpoint_id: u64) -> Result<(), FactorWalError> {
        let factors = self.engine.get_all_factors();

        self.persister.send_snapshot(&self.instrument_id, factors, checkpoint_id)
    }
}
}

4. WAL 消费者/恢复器

#![allow(unused)]
fn main() {
/// 因子 WAL 消费者(用于恢复)
pub struct FactorWalConsumer {
    /// WAL 管理器
    wal: Arc<RwLock<WalManager>>,

    /// 恢复的因子状态
    recovered_factors: HashMap<String, HashMap<String, f64>>,

    /// 最后恢复的时间戳
    last_recovered_timestamp: i64,
}

impl FactorWalConsumer {
    /// 创建消费者
    pub fn new(wal: Arc<RwLock<WalManager>>) -> Self {
        Self {
            wal,
            recovered_factors: HashMap::new(),
            last_recovered_timestamp: 0,
        }
    }

    /// 从 WAL 恢复因子状态
    pub fn recover(&mut self) -> Result<(), FactorWalError> {
        let wal_guard = self.wal.read();
        let records = wal_guard.read_all_records()?;

        for record in records {
            match record {
                WalRecord::FactorUpdate {
                    timestamp,
                    instrument_id,
                    factor_id,
                    value,
                } => {
                    let inst_id = String::from_utf8_lossy(&instrument_id).to_string();
                    let fact_id = String::from_utf8_lossy(&factor_id).to_string();

                    self.recovered_factors
                        .entry(inst_id)
                        .or_insert_with(HashMap::new)
                        .insert(fact_id, value);

                    self.last_recovered_timestamp = self.last_recovered_timestamp.max(timestamp);
                }
                WalRecord::FactorSnapshot {
                    timestamp,
                    instrument_id,
                    factors,
                    ..
                } => {
                    let inst_id = String::from_utf8_lossy(&instrument_id).to_string();

                    let mut factor_map = HashMap::new();
                    for (k, v) in factors {
                        let factor_id = String::from_utf8_lossy(&k).to_string();
                        factor_map.insert(factor_id, v);
                    }

                    self.recovered_factors.insert(inst_id, factor_map);
                    self.last_recovered_timestamp = self.last_recovered_timestamp.max(timestamp);
                }
                _ => {} // 跳过其他记录类型
            }
        }

        Ok(())
    }

    /// 获取恢复的因子状态
    pub fn get_recovered_factors(&self) -> &HashMap<String, HashMap<String, f64>> {
        &self.recovered_factors
    }

    /// 获取指定合约的因子
    pub fn get_instrument_factors(&self, instrument_id: &str) -> Option<&HashMap<String, f64>> {
        self.recovered_factors.get(instrument_id)
    }

    /// 获取最后恢复的时间戳
    pub fn last_timestamp(&self) -> i64 {
        self.last_recovered_timestamp
    }
}
}

📊 使用示例

基本使用

#![allow(unused)]
fn main() {
use qaexchange::factor::wal_persister::{
    FactorWalPersister, WalStreamFactorEngine, FactorWalConfig
};
use qaexchange::factor::operators::rolling::{RollingMean, RSI};

// 1. 创建 WAL 管理器
let wal = Arc::new(RwLock::new(WalManager::new("/data/factor_wal")?));

// 2. 创建持久化器
let config = FactorWalConfig {
    batch_size: 100,
    max_delay_ms: 10,
    enable_group_commit: true,
};
let persister = Arc::new(FactorWalPersister::new(Arc::clone(&wal), config));

// 3. 创建带持久化的因子引擎
let mut engine = WalStreamFactorEngine::new("SHFE.cu2501", Arc::clone(&persister), true);

// 4. 注册因子
engine.register_factor("ma5", Box::new(RollingMean::new(5)));
engine.register_factor("ma20", Box::new(RollingMean::new(20)));
engine.register_factor("rsi14", Box::new(RSI::new(14)));

// 5. 处理行情数据
for tick in tick_stream {
    let factors = engine.process_tick(&tick)?;
    println!("Factors: {:?}", factors);
}

// 6. 定期创建快照
engine.create_and_persist_snapshot(checkpoint_id)?;
}

从 WAL 恢复

#![allow(unused)]
fn main() {
use qaexchange::factor::wal_persister::FactorWalConsumer;

// 1. 创建消费者
let mut consumer = FactorWalConsumer::new(Arc::clone(&wal));

// 2. 执行恢复
consumer.recover()?;

// 3. 获取恢复的因子
if let Some(factors) = consumer.get_instrument_factors("SHFE.cu2501") {
    println!("Recovered MA5: {:?}", factors.get("ma5"));
    println!("Recovered RSI14: {:?}", factors.get("rsi14"));
}

// 4. 获取最后时间戳(用于增量恢复)
let last_ts = consumer.last_timestamp();
println!("Recovery up to: {}", last_ts);
}

📈 性能基准

写入性能

配置吞吐量延迟 (P99)
batch_size=10, no group commit50K ops/s5ms
batch_size=100, group commit200K ops/s12ms
batch_size=1000, group commit500K ops/s50ms

恢复性能

记录数恢复时间
10K~50ms
100K~500ms
1M~5s

内存使用

组件内存占用
Channel buffer (1K messages)~80KB
Batch buffer (100 messages)~8KB
Worker thread stack~2MB

🛠️ 配置指南

高吞吐配置

#![allow(unused)]
fn main() {
let config = FactorWalConfig {
    batch_size: 1000,       // 大批量
    max_delay_ms: 50,       // 允许更大延迟
    enable_group_commit: true,
};
}

低延迟配置

#![allow(unused)]
fn main() {
let config = FactorWalConfig {
    batch_size: 10,         // 小批量
    max_delay_ms: 1,        // 最小延迟
    enable_group_commit: false, // 禁用 group commit
};
}

平衡配置(默认)

#![allow(unused)]
fn main() {
let config = FactorWalConfig::default();
// batch_size: 100
// max_delay_ms: 10
// enable_group_commit: true
}

💡 最佳实践

1. 合理设置批量大小

#![allow(unused)]
fn main() {
// ✅ 正确:根据更新频率调整
// 高频行情(每秒 1000+ tick)
let config = FactorWalConfig { batch_size: 500, ..Default::default() };

// 低频行情(每秒 < 100 tick)
let config = FactorWalConfig { batch_size: 50, ..Default::default() };
}

2. 定期创建快照

#![allow(unused)]
fn main() {
// ✅ 正确:定期快照加速恢复
let mut checkpoint_counter = 0;
for tick in tick_stream {
    engine.process_tick(&tick)?;

    checkpoint_counter += 1;
    if checkpoint_counter % 10000 == 0 {
        engine.create_and_persist_snapshot(checkpoint_counter / 10000)?;
    }
}
}

3. 优雅关闭

#![allow(unused)]
fn main() {
// ✅ 正确:确保数据完整性
{
    let mut persister = persister.lock();
    persister.shutdown(); // 等待所有数据写入
}
// persister 被 drop,所有数据已安全写入
}

🔍 故障排查

问题 1: 持久化延迟过高

症状: P99 延迟 > 100ms

排查:

  1. 检查 batch_size 是否过大
  2. 检查 WAL 磁盘是否 SSD

解决:

#![allow(unused)]
fn main() {
let config = FactorWalConfig {
    batch_size: 50,  // 减小批量
    max_delay_ms: 5, // 减小延迟
    ..Default::default()
};
}

问题 2: 恢复数据不完整

症状: 恢复后因子值为空

排查:

  1. 检查 WAL 文件是否损坏
  2. 检查是否正确调用 shutdown

解决:

#![allow(unused)]
fn main() {
// 确保优雅关闭
drop(persister); // 触发 Drop,等待 flush
}

问题 3: Channel 缓冲区溢出

症状: send_update 返回错误

排查:

  1. 检查后台线程是否存活
  2. 检查写入速度是否过快

解决:

#![allow(unused)]
fn main() {
// 使用有界 Channel 限流
// 或增加 batch_size 提高消费速度
}

📚 相关文档


返回因子模块 | 返回核心模块 | 返回文档中心