市场数据模块 (Market Data Module)

市场数据模块负责处理交易所的行情数据生成、分发和查询,是 QAExchange 的核心数据服务层。


模块组成

组件文件描述状态
快照生成器snapshot_generator.rs每秒级别市场快照生成✅ 完成
市场数据服务mod.rs业务逻辑层,统一数据访问接口✅ 完成
数据广播器broadcaster.rsLevel2 行情广播(订单簿、Tick)✅ 完成
快照广播服务snapshot_broadcaster.rsTokio 异步快照广播✅ 完成
数据缓存cache.rsL1 缓存(DashMap,100ms TTL)✅ 完成
数据恢复recovery.rs从 WAL 恢复市场数据✅ 完成

架构设计

┌─────────────────────────────────────────────────────────────┐
│                     MarketDataService                        │
│                   (业务逻辑层 - 统一入口)                       │
├─────────────────────────────────────────────────────────────┤
│  - 订单簿查询 (get_orderbook_snapshot)                       │
│  - Tick 数据查询 (get_tick_data)                             │
│  - 合约列表查询 (get_instruments)                            │
│  - 成交记录查询 (get_recent_trades)                          │
└────────────┬────────────────────────────────────────────────┘
             │
    ┌────────┴────────┐
    ▼                 ▼
┌──────────────┐  ┌──────────────────┐
│  L1 Cache    │  │ SnapshotGenerator│
│  (DashMap)   │  │  (每秒级别)       │
│  100ms TTL   │  │  - OHLC          │
│              │  │  - 买卖5档        │
└──────────────┘  │  - 成交统计       │
                  └──────────────────┘
         ▲                 │
         │                 ▼
    ┌────┴─────────────────────┐
    │   MarketDataBroadcaster   │ (实时行情广播)
    │   - OrderBookSnapshot     │
    │   - Tick                  │
    │   - LastPrice             │
    └───────────────────────────┘
              │
              ▼
     ┌────────────────┐
     │  Subscribers   │ (WebSocket/IPC)
     └────────────────┘

核心功能

1. 市场快照生成

快照生成器 (snapshot_generator.rs) 提供每秒级别的完整市场行情快照:

  • 35+ 字段: OHLC、买卖五档、成交量额、涨跌幅
  • 自动统计: 日内 OHLC、累计成交量/额
  • 零拷贝订阅: 基于 crossbeam channel 的发布-订阅

文档: 快照生成器详细文档

#![allow(unused)]
fn main() {
// 快速开始
let market_data_service = MarketDataService::new(matching_engine)
    .with_snapshot_generator(vec!["IF2501".to_string()], 1000);

market_data_service.start_snapshot_generator();

if let Some(snapshot_rx) = market_data_service.subscribe_snapshots() {
    while let Ok(snapshot) = snapshot_rx.recv() {
        println!("快照: {} @ {:.2}", snapshot.instrument_id, snapshot.last_price);
    }
}
}

2. 订单簿查询

提供三级缓存架构查询订单簿快照:

#![allow(unused)]
fn main() {
// L1: DashMap 缓存(<10μs)
// L2: WAL 存储恢复(<5ms)
// L3: 实时计算(<50μs)
let snapshot = market_data_service.get_orderbook_snapshot("IF2501", 5)?;
}

缓存策略:

  • TTL: 100ms(可配置)
  • 缓存命中率: >95%(生产环境)
  • 缓存未命中自动回源

3. 行情广播

MarketDataBroadcaster 支持实时推送订单簿变化和成交数据:

#![allow(unused)]
fn main() {
// 订阅市场数据
let receiver = broadcaster.subscribe(
    "session_id".to_string(),
    vec!["IF2501".to_string()],  // 订阅合约
    vec!["orderbook", "tick"],   // 订阅频道
);

// 接收事件
while let Ok(event) = receiver.recv() {
    match event {
        MarketDataEvent::OrderBookSnapshot { bids, asks, .. } => {
            println!("订单簿更新: {} bids, {} asks", bids.len(), asks.len());
        }
        MarketDataEvent::Tick { price, volume, .. } => {
            println!("成交: {} @ {}", volume, price);
        }
        _ => {}
    }
}
}

4. 数据恢复

从 WAL 恢复最近 N 分钟的市场数据:

#![allow(unused)]
fn main() {
// 恢复最近 10 分钟数据到缓存
market_data_service.recover_recent_market_data(10)?;
}

恢复统计:

✅ [Market Data Recovery] Recovered 1234 ticks, 567 orderbooks in 124ms

性能指标

指标目标值实际值备注
Tick 查询延迟 (L1)< 10μs~5μsDashMap 缓存
订单簿查询延迟 (L1)< 50μs~20μsDashMap 缓存
WAL 恢复速度< 5s~0.1s/分钟10分钟数据 < 1s
快照生成延迟< 1ms~200μs5档深度
缓存命中率> 90%95%+生产环境
并发订阅者> 1000无限制crossbeam

数据流

行情数据生成流程

┌──────────────┐
│ TradeGateway │ (成交事件)
└──────┬───────┘
       │
       ▼
  update_trade_stats()
       │
       ▼
┌────────────────────┐
│ SnapshotGenerator  │ (统计更新)
│  - volume += v     │
│  - turnover += t   │
│  - high = max()    │
│  - low = min()     │
└────────┬───────────┘
         │
         ▼ (每秒触发)
   generate_snapshot()
         │
         ▼
┌────────────────────┐
│  MarketSnapshot    │ (完整快照)
│  - OHLC            │
│  - 买卖5档          │
│  - 成交统计         │
└────────┬───────────┘
         │
         ▼
   broadcast()
         │
    ┌────┴────┐
    ▼         ▼
[订阅者1] [订阅者N]

查询流程

Client Request
    │
    ▼
get_orderbook_snapshot()
    │
    ├─ L1 Cache Hit? ──Yes──> Return (5μs)
    │       │
    │      No
    │       ▼
    ├─ L2 WAL Hit? ──Yes──> Update Cache + Return (5ms)
    │       │
    │      No
    │       ▼
    └─ L3 Compute ────────> Update Cache + Return (50μs)

配置

MarketDataService 配置

#![allow(unused)]
fn main() {
let market_data_service = MarketDataService::new(matching_engine)
    // 设置存储(用于 L2 恢复)
    .with_storage(market_data_storage)
    // 设置 iceoryx2(零拷贝 IPC)
    .with_iceoryx(iceoryx_manager)
    // 配置快照生成器
    .with_snapshot_generator(
        vec!["IF2501".to_string()],  // 订阅合约
        1000,                         // 1秒间隔
    );
}

快照生成器配置

#![allow(unused)]
fn main() {
let config = SnapshotGeneratorConfig {
    interval_ms: 1000,            // 快照间隔(毫秒)
    enable_persistence: false,    // WAL 持久化(待实现)
    instruments: vec![
        "IF2501".to_string(),
        "IC2501".to_string(),
    ],
};
}

API 参考

MarketDataService

方法描述复杂度
get_orderbook_snapshot(id, depth)查询订单簿快照O(depth)
get_tick_data(id)查询 Tick 数据O(1)
get_instruments()查询合约列表O(n)
get_recent_trades(id, limit)查询成交记录O(limit)
subscribe_snapshots()订阅市场快照O(1)
update_trade_stats(id, vol, amt)更新成交统计O(1)
set_pre_close(id, price)设置昨收盘价O(1)
recover_recent_market_data(mins)WAL 恢复数据O(n)

MarketSnapshotGenerator

方法描述复杂度
new(engine, config)创建生成器O(1)
start()启动后台线程O(1)
subscribe()订阅快照O(1)
set_pre_close(id, price)设置昨收盘价O(1)
update_trade_stats(id, vol, amt)更新统计O(1)
reset_daily_stats()重置统计O(n)
get_snapshot_count()获取生成数O(1)

测试

运行测试

# 单元测试
cargo test --lib market::

# 集成测试
cargo run --example test_snapshot_generator

# 性能测试
cargo run --example test_snapshot_generator --release

测试覆盖

  • ✅ 快照生成正确性
  • ✅ 多订阅者并发消费
  • ✅ 统计累加正确性
  • ✅ 缓存命中/未命中
  • ✅ WAL 恢复功能
  • ⏳ WebSocket 推送测试
  • ⏳ 压力测试(1000+ 订阅者)

常见问题

1. 如何订阅实时行情?

#![allow(unused)]
fn main() {
// 方案1: 订阅快照(每秒级别)
let snapshot_rx = market_data_service.subscribe_snapshots()?;

// 方案2: 订阅广播事件(毫秒级别)
let event_rx = market_broadcaster.subscribe(
    "session_id".to_string(),
    vec!["IF2501".to_string()],
    vec!["orderbook", "tick"],
);
}

2. 如何提高查询性能?

  • 使用 L1 缓存(默认启用,100ms TTL)
  • 批量查询合约列表
  • 启用 WAL 恢复预热缓存

3. 如何持久化快照数据?

目前快照暂未持久化,计划在 Phase 5 实现:

#![allow(unused)]
fn main() {
// 未来支持
let config = SnapshotGeneratorConfig {
    enable_persistence: true,  // 启用 WAL 持久化
    // ...
};
}

路线图

  • Phase 1-3: 基础快照生成器 + MarketDataService 集成
  • Phase 4: WebSocket 订阅端点
  • Phase 5: WAL 持久化快照
  • Phase 6: iceoryx2 零拷贝 IPC
  • Phase 7: K线数据生成器
  • Phase 8: 实时技术指标计算

相关文档


@yutiansut @quantaxis - 2025-01-07