快照生成器 (Snapshot Generator)

概述

快照生成器(MarketSnapshotGenerator)是 QAExchange 的市场数据服务核心组件,负责每秒级别生成完整的市场行情快照,并通过零拷贝的发布-订阅模式分发给多个消费者。

核心功能

  • 定时生成:独立线程,可配置间隔(默认 1 秒)
  • 完整快照:35+ 字段,包含 OHLC、买卖五档、成交量额、涨跌幅等
  • 实时统计:自动跟踪日内 OHLC、累计成交量/额
  • 多订阅者:支持无限制的并发消费者(基于 crossbeam channel)
  • 零拷贝:订阅者间共享快照数据,无需重复序列化

数据结构

MarketSnapshot

#![allow(unused)]
fn main() {
pub struct MarketSnapshot {
    // 基础信息
    pub instrument_id: String,      // 合约代码
    pub timestamp: i64,              // 快照时间戳(纳秒)
    pub trading_day: String,         // 交易日期(YYYYMMDD)

    // 价格信息
    pub last_price: f64,             // 最新价
    pub change_percent: f64,         // 涨跌幅(%)
    pub change_amount: f64,          // 涨跌额
    pub pre_close: f64,              // 昨收盘价

    // 买卖五档
    pub bid_price1: f64,  pub bid_volume1: i64,
    pub bid_price2: f64,  pub bid_volume2: i64,
    pub bid_price3: f64,  pub bid_volume3: i64,
    pub bid_price4: f64,  pub bid_volume4: i64,
    pub bid_price5: f64,  pub bid_volume5: i64,

    pub ask_price1: f64,  pub ask_volume1: i64,
    pub ask_price2: f64,  pub ask_volume2: i64,
    pub ask_price3: f64,  pub ask_volume3: i64,
    pub ask_price4: f64,  pub ask_volume4: i64,
    pub ask_price5: f64,  pub ask_volume5: i64,

    // OHLC
    pub open: f64,                   // 今日开盘价
    pub high: f64,                   // 今日最高价
    pub low: f64,                    // 今日最低价

    // 成交统计
    pub volume: i64,                 // 成交量(手)
    pub turnover: f64,               // 成交额(元)
    pub open_interest: i64,          // 持仓量

    // 涨跌停
    pub upper_limit: f64,            // 涨停价
    pub lower_limit: f64,            // 跌停价
}
}

SnapshotGeneratorConfig

#![allow(unused)]
fn main() {
pub struct SnapshotGeneratorConfig {
    pub interval_ms: u64,            // 快照生成间隔(毫秒)
    pub enable_persistence: bool,    // 是否启用持久化(暂未实现)
    pub instruments: Vec<String>,    // 订阅的合约列表
}
}

快速开始

1. 基础用法

#![allow(unused)]
fn main() {
use qaexchange::market::snapshot_generator::{
    MarketSnapshotGenerator,
    SnapshotGeneratorConfig
};
use std::sync::Arc;

// 1. 创建配置
let config = SnapshotGeneratorConfig {
    interval_ms: 1000,  // 每秒生成一次
    enable_persistence: false,
    instruments: vec!["IF2501".to_string(), "IC2501".to_string()],
};

// 2. 创建生成器
let generator = Arc::new(MarketSnapshotGenerator::new(
    matching_engine.clone(),
    config,
));

// 3. 设置昨收盘价(用于涨跌幅计算)
generator.set_pre_close("IF2501", 3800.0);
generator.set_pre_close("IC2501", 5600.0);

// 4. 启动后台线程
let handle = generator.clone().start();

// 5. 订阅快照
let snapshot_rx = generator.subscribe();

// 6. 消费快照
tokio::spawn(async move {
    while let Ok(snapshot) = snapshot_rx.recv() {
        println!("快照: {} @ {:.2} (涨跌: {:.2}%)",
            snapshot.instrument_id,
            snapshot.last_price,
            snapshot.change_percent,
        );
    }
});
}

2. 通过 MarketDataService 使用

#![allow(unused)]
fn main() {
use qaexchange::market::MarketDataService;

// 1. 创建服务并配置快照生成器
let market_data_service = MarketDataService::new(matching_engine.clone())
    .with_snapshot_generator(
        vec!["IF2501".to_string()],  // 订阅合约
        1000,                         // 1秒间隔
    );

// 2. 启动生成器
market_data_service.start_snapshot_generator();

// 3. 订阅快照
if let Some(snapshot_rx) = market_data_service.subscribe_snapshots() {
    // 消费快照...
}

// 4. 成交时更新统计(由 TradeGateway 自动调用)
market_data_service.update_trade_stats("IF2501", 100, 380000.0);
}

核心方法

生成器方法

方法描述示例
new()创建生成器MarketSnapshotGenerator::new(engine, config)
start()启动后台线程generator.clone().start()
subscribe()订阅快照let rx = generator.subscribe()
set_pre_close()设置昨收盘价generator.set_pre_close("IF2501", 3800.0)
update_trade_stats()更新成交统计generator.update_trade_stats("IF2501", 100, 380000.0)
reset_daily_stats()重置日内统计generator.reset_daily_stats()
get_snapshot_count()获取已生成快照数let count = generator.get_snapshot_count()

MarketDataService 方法

方法描述
with_snapshot_generator()配置快照生成器
start_snapshot_generator()启动生成器
subscribe_snapshots()订阅快照
update_trade_stats()更新成交统计
set_pre_close()设置昨收盘价

性能特性

生成性能

指标数值说明
生成延迟< 1ms从订单簿读取到快照生成
订阅者开销~10μs每个订阅者的转发延迟
内存占用~500 bytes/snapshot单个快照内存大小
并发订阅者无限制基于 crossbeam 无锁 channel

生成流程

┌─────────────┐
│ 定时触发器   │ (每 interval_ms)
└──────┬──────┘
       │
       ▼
┌─────────────────────────────────┐
│ 1. 读取订单簿 (Orderbook.read)  │ ~100μs
├─────────────────────────────────┤
│ 2. 提取买卖5档                   │ ~50μs
├─────────────────────────────────┤
│ 3. 读取日内统计 (DailyStats)    │ ~10μs
├─────────────────────────────────┤
│ 4. 计算涨跌幅/OHLC               │ ~10μs
├─────────────────────────────────┤
│ 5. 构建快照对象                  │ ~50μs
├─────────────────────────────────┤
│ 6. 广播到所有订阅者              │ ~10μs/订阅者
└─────────────────────────────────┘
         │
         ▼
   ┌──────────┐
   │ 订阅者 1  │
   ├──────────┤
   │ 订阅者 2  │
   ├──────────┤
   │ 订阅者 N  │
   └──────────┘

统计更新机制

自动更新

成交事件发生时,TradeGateway 会自动调用 update_trade_stats() 更新统计:

#![allow(unused)]
fn main() {
// src/exchange/trade_gateway.rs:727-733
if let Some(mds) = &self.market_data_service {
    let turnover = price * volume;
    mds.update_trade_stats(instrument_id, volume as i64, turnover);
}
}

日内统计结构

#![allow(unused)]
fn main() {
struct DailyStats {
    open: f64,       // 开盘价(首笔成交价)
    high: f64,       // 最高价(自动更新)
    low: f64,        // 最低价(自动更新)
    pre_close: f64,  // 昨收盘价(手动设置)
    volume: i64,     // 累计成交量(自动累加)
    turnover: f64,   // 累计成交额(自动累加)
}
}

重置时机

#![allow(unused)]
fn main() {
// 每日开盘前调用
generator.reset_daily_stats();
}

订阅模式

转发机制

快照生成器使用主通道 + 转发线程模式实现多订阅者:

#![allow(unused)]
fn main() {
pub fn subscribe(&self) -> Receiver<MarketSnapshot> {
    let (tx, rx) = unbounded();  // 为订阅者创建专用通道

    // 启动转发线程
    let snapshot_rx = self.snapshot_rx.clone();
    std::thread::spawn(move || {
        loop {
            let rx_guard = snapshot_rx.read();
            if let Ok(snapshot) = rx_guard.try_recv() {
                drop(rx_guard);  // 尽早释放锁
                if tx.send(snapshot).is_err() {
                    break;  // 订阅者断开连接
                }
            } else {
                drop(rx_guard);
                std::thread::sleep(Duration::from_millis(10));
            }
        }
    });

    rx  // 返回订阅者专用接收器
}
}

订阅者断开检测

  • 当订阅者的 Receiver 被 drop 时,转发线程会自动退出
  • 无需手动管理订阅者生命周期

集成示例

WebSocket 实时推送

#![allow(unused)]
fn main() {
use actix_web_actors::ws;

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for SnapshotSession {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(ws::Message::Text(text)) => {
                if text == "subscribe_snapshot" {
                    // 订阅快照
                    let snapshot_rx = self.market_data_service.subscribe_snapshots().unwrap();

                    // 启动推送任务
                    ctx.spawn(wrap_future(async move {
                        while let Ok(snapshot) = snapshot_rx.recv() {
                            let json = serde_json::to_string(&snapshot).unwrap();
                            ctx.text(json);
                        }
                    }));
                }
            }
            _ => {}
        }
    }
}
}

日志记录

#![allow(unused)]
fn main() {
use log::{info, debug};

let snapshot_rx = generator.subscribe();

tokio::spawn(async move {
    while let Ok(snapshot) = snapshot_rx.recv() {
        info!("快照: {} @ {:.2} | 买一: {:.2} x {} | 卖一: {:.2} x {}",
            snapshot.instrument_id,
            snapshot.last_price,
            snapshot.bid_price1, snapshot.bid_volume1,
            snapshot.ask_price1, snapshot.ask_volume1,
        );

        debug!("成交统计: volume={}, turnover={:.2}",
            snapshot.volume, snapshot.turnover);
    }
});
}

测试

运行集成测试

# 编译测试示例
cargo build --example test_snapshot_generator

# 运行测试(带日志)
RUST_LOG=info cargo run --example test_snapshot_generator

预期输出

=== 快照生成器测试 ===

1️⃣  初始化撮合引擎...
   ✅ 注册合约: IF2501 @ 3800

2️⃣  创建快照生成器...
   ✅ 快照生成器已创建 (间隔: 1s)

3️⃣  创建订阅者...
   ✅ 创建了 3 个订阅者

4️⃣  启动快照生成器...
   ✅ 后台线程已启动

5️⃣  提交测试订单...
   ✅ 已提交 10 个订单(买5/卖5)

6️⃣  模拟成交事件...
   ✅ 第1笔成交: volume=100, turnover=380,000
   ✅ 第2笔成交: volume=50, turnover=190,000

7️⃣  订阅者开始消费快照...
   (等待 5 秒,每秒接收一次快照)

   [订阅者1] 收到快照 #1: IF2501 @ 3800.00 (涨跌: 0.00%, 成交量: 150)
   [订阅者2] 买一: 3800.00 x 10, 卖一: 3800.20 x 10
   [订阅者3] OHLC: O=3800.00 H=3800.00 L=3800.00 (成交额: 570000.00)
   ...

8️⃣  测试统计:
   总快照数: 5
   运行时长: 5.01s
   快照频率: ~1.0/s

✅ 测试完成!

单元测试

# 运行快照生成器单元测试
cargo test --lib snapshot_generator

常见问题

1. 快照中的最新价为 0?

原因: 未设置昨收盘价或订单簿无成交。

解决方案:

#![allow(unused)]
fn main() {
// 启动时设置昨收盘价
generator.set_pre_close("IF2501", 3800.0);
}

2. 成交量/额始终为 0?

原因: 未调用 update_trade_stats() 更新统计。

解决方案:

#![allow(unused)]
fn main() {
// TradeGateway 集成后会自动调用
// 或手动调用
market_data_service.update_trade_stats("IF2501", volume, turnover);
}

3. 订阅者收不到快照?

原因: 生成器未启动或订阅时机过早。

解决方案:

#![allow(unused)]
fn main() {
// 确保先启动生成器
generator.clone().start();

// 再订阅
let snapshot_rx = generator.subscribe();
}

4. 如何修改快照频率?

#![allow(unused)]
fn main() {
let config = SnapshotGeneratorConfig {
    interval_ms: 500,  // 改为 500ms(0.5秒)
    // ...
};
}

路线图

  • Phase 1: 基础快照生成器(OHLC、买卖5档)
  • Phase 2: 集成到 MarketDataService
  • Phase 3: TradeGateway 自动统计更新
  • Phase 4: WebSocket 订阅端点
  • Phase 5: WAL 持久化快照
  • Phase 6: iceoryx2 零拷贝 IPC 分发
  • Phase 7: K线数据生成(1分钟、5分钟、日K等)
  • Phase 8: 实时指标计算(MACD、RSI等)

参考资料


@yutiansut @quantaxis - 2025-01-07