因子计算系统

版本: v1.0.0 作者: @yutiansut @quantaxis 最后更新: 2025-11-24


概述

因子计算系统是 QAExchange-RS 的核心量化分析模块,提供流批一体化的因子计算能力。系统同时支持:

  • 流式计算 (Stream): O(1) 增量更新,适用于实时行情
  • 批量计算 (Batch): Polars 向量化,适用于历史回测
┌─────────────────────────────────────────────────────────────┐
│                    因子计算系统架构                           │
│                                                             │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │   DSL 层    │    │  DAG 管理   │    │  状态存储   │     │
│  │ (语法解析)  │───▶│ (依赖拓扑)  │───▶│ (物化视图)  │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
│         │                  │                  │             │
│         ▼                  ▼                  ▼             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              流批一体执行引擎                         │   │
│  │  ┌──────────────┐        ┌──────────────┐          │   │
│  │  │  Stream Engine│        │  Batch Engine │          │   │
│  │  │  (增量算子)   │        │  (Polars SQL) │          │   │
│  │  └──────────────┘        └──────────────┘          │   │
│  └─────────────────────────────────────────────────────┘   │
│         │                                    │             │
│         ▼                                    ▼             │
│  ┌─────────────┐                    ┌─────────────┐       │
│  │ 增量算子库  │                    │   Polars    │       │
│  │ Ring Buffer │                    │  LazyFrame  │       │
│  │   Welford   │                    │             │       │
│  └─────────────┘                    └─────────────┘       │
└─────────────────────────────────────────────────────────────┘

核心模块

1. 增量算子库 (operators/)

高性能 O(1) 增量计算算子,适用于实时流处理:

算子功能时间复杂度空间复杂度
RollingMean滚动均值O(1)O(n)
RollingStd滚动标准差 (Welford)O(1)O(n)
RollingCorr滚动相关系数O(1)O(n)
EMA指数移动平均O(1)O(1)
DEMA双指数移动平均O(1)O(1)
RSI相对强弱指数O(1)O(1)
MACD指数平滑异同线O(1)O(1)
BollingerBands布林带O(1)O(n)
ATR平均真实范围O(1)O(1)

示例:RollingMean

#![allow(unused)]
fn main() {
use qaexchange::factor::operators::rolling::RollingMean;

let mut ma = RollingMean::new(5); // 5日均线

// 流式更新
for price in [10.0, 11.0, 12.0, 13.0, 14.0, 15.0] {
    ma.update(price);
    println!("MA5: {}", ma.value());
}
// 输出: MA5: 13.0 (最后5个值的平均)
}

示例:RSI

#![allow(unused)]
fn main() {
use qaexchange::factor::operators::rolling::RSI;

let mut rsi = RSI::new(14); // 14日RSI

// 模拟上涨趋势
for i in 0..20 {
    rsi.update(100.0 + i as f64);
}

if let Some(value) = rsi.value() {
    println!("RSI: {:.2}", value); // RSI > 50 (上涨趋势)
}
}

2. 环形缓冲区 (ring_buffer.rs)

滑动窗口的高效实现,支持 O(1) 的增量统计:

#![allow(unused)]
fn main() {
use qaexchange::factor::operators::ring_buffer::NumericRingBuffer;

let mut buffer = NumericRingBuffer::new(100);

// 推入数据
buffer.push(1.0);
buffer.push(2.0);
buffer.push(3.0);

// O(1) 统计
println!("Sum: {}", buffer.sum());      // 增量累加
println!("Mean: {}", buffer.mean());    // sum / count
println!("Min: {}", buffer.min());      // 滑动最小值
println!("Max: {}", buffer.max());      // 滑动最大值
}

特性:

  • 预分配内存,无运行时分配
  • 支持 PairedRingBuffer 用于协方差/相关系数计算
  • 线程安全版本 SyncRingBuffer 可选

3. Welford 算法 (welford.rs)

数值稳定的在线统计算法,避免浮点溢出:

#![allow(unused)]
fn main() {
use qaexchange::factor::operators::welford::WelfordState;

let mut state = WelfordState::new();

// 在线更新
for value in [2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0] {
    state.update(value);
}

println!("Mean: {}", state.mean());         // 5.0
println!("Variance: {}", state.variance()); // 4.0
println!("Std: {}", state.std());           // 2.0
println!("Skewness: {}", state.skewness()); // 偏度
println!("Kurtosis: {}", state.kurtosis()); // 峰度
}

优势:

  • 数值稳定:避免大数相减导致的精度损失
  • 单遍扫描:只需遍历数据一次
  • 支持窗口化:WindowedWelfordState 支持滑动窗口

4. 因子 DAG 管理器 (dag.rs)

管理因子之间的依赖关系,支持并行计算:

#![allow(unused)]
fn main() {
use qaexchange::factor::dag::{FactorDag, FactorNode, FactorType};

let dag = FactorDag::new();

// 添加因子节点
dag.add_node(FactorNode {
    id: "close".to_string(),
    factor_type: FactorType::Source,
    dependencies: vec![],
    depth: 0,
});

dag.add_node(FactorNode {
    id: "ma5".to_string(),
    factor_type: FactorType::Rolling { window: 5, func: "mean".to_string() },
    dependencies: vec!["close".to_string()],
    depth: 1,
});

dag.add_node(FactorNode {
    id: "ma_diff".to_string(),
    factor_type: FactorType::BinaryOp { op: "sub".to_string() },
    dependencies: vec!["ma5".to_string(), "ma10".to_string()],
    depth: 2,
});

// 拓扑排序
let order = dag.topological_sort();
// ["close", "ma5", "ma10", "ma_diff"]

// 获取并行层级
let levels = dag.get_parallel_levels();
// [[close], [ma5, ma10], [ma_diff]]
}

5. 流批一体引擎 (engine.rs)

统一的因子计算接口,自动选择最优执行路径:

#![allow(unused)]
fn main() {
use qaexchange::factor::engine::{FactorEngine, FactorDef, RollingFunc};

let mut engine = FactorEngine::new();

// 注册因子
engine.register(
    "volatility",
    "20日波动率",
    FactorDef::Rolling {
        source: "close".to_string(),
        window: 20,
        func: RollingFunc::Std,
    },
    "20日收盘价标准差",
);

// 流式计算 (实时)
engine.init_stream_factor("volatility").unwrap();
for price in prices {
    let vol = engine.stream_update("volatility", price)?;
    println!("实时波动率: {}", vol);
}

// 批量计算 (历史)
let df = polars::prelude::df!{
    "close" => &[10.0, 11.0, 12.0, 13.0, 14.0]
}?.lazy();

let result = engine.batch_compute(df, &["volatility"])?;
}

6. 物化视图状态管理 (state.rs & view.rs)

因子计算状态的持久化和快照管理:

#![allow(unused)]
fn main() {
use qaexchange::factor::state::{GlobalStateSnapshot, CheckpointStore, CheckpointConfig};
use qaexchange::factor::view::{MaterializedView, ViewConfig};

// 创建物化视图
let view = MaterializedView::new(ViewConfig {
    ttl: Duration::from_secs(300), // 5分钟过期
    auto_init: true,
});

// 更新 Tick 数据
view.update_tick("SHFE.cu2501", TickData {
    last_price: 85000.0,
    volume: 1000,
    timestamp: 1732456789000,
});

// 获取因子状态
if let Some(state) = view.get("SHFE.cu2501") {
    println!("MA5: {}", state.ma_5.value());
    println!("RSI: {:?}", state.rsi_14.value());
}

// 检查点存储
let store = CheckpointStore::new(CheckpointConfig {
    base_path: PathBuf::from("/data/checkpoints"),
    max_checkpoints: 10,
    compress: true,
});

// 保存检查点
let snapshot = view.create_snapshot();
let checkpoint_id = store.save_checkpoint(&snapshot)?;

// 恢复检查点
let restored = store.load_checkpoint(checkpoint_id)?;
view.restore_from_snapshot(&restored);
}

DSL 语法 (可选)

因子定义的 DSL 语法,便于配置化管理:

// grammar.pest
ma5 = rolling_mean(close, 5)
ma20 = rolling_mean(close, 20)
ma_diff = ma5 - ma20
volatility = rolling_std(close, 20)
rsi = RSI(close, 14)
macd = MACD(close, 12, 26, 9)
signal = if rsi > 70 then -1 else if rsi < 30 then 1 else 0
#![allow(unused)]
fn main() {
use qaexchange::dsl::parser::AstBuilder;

let program = AstBuilder::parse(r#"
    ma5 = rolling_mean(close, 5)
    signal = if ma5 > ma20 then 1 else -1
"#)?;

for stmt in program.statements {
    println!("{:?}", stmt);
}
}

性能指标

操作延迟吞吐量
RollingMean.update()~15 ns66M ops/s
RollingStd.update() (Welford)~25 ns40M ops/s
RSI.update()~20 ns50M ops/s
MACD.update()~30 ns33M ops/s
环形缓冲区 push~5 ns200M ops/s
Polars 批量计算 (1M rows)~50 ms20M rows/s

相关文档


文件结构

src/factor/
├── mod.rs              # 模块导出
├── engine.rs           # 流批一体引擎
├── dag.rs              # 因子 DAG 管理
├── state.rs            # 状态存储与检查点
├── view.rs             # 物化视图管理
├── wal_persister.rs    # WAL 异步持久化 ✨ 新增
└── operators/
    ├── mod.rs          # 算子模块导出
    ├── basic.rs        # 基础算子
    ├── ring_buffer.rs  # 环形缓冲区
    ├── rolling.rs      # 滚动窗口算子
    └── welford.rs      # Welford 统计算法