DIFF 协议快速开始指南

概述

DIFF (Differential Information Flow for Finance) 协议是一个基于 JSON Merge Patch 的金融数据差分推送协议,实现了零拷贝、低延迟的实时数据推送。

版本: 1.0 最后更新: 2025-10-05 状态: ✅ 后端完成,前端待实现


快速开始

1. 后端集成(已完成)

启动 WebSocket DIFF 服务

use qaexchange::service::websocket::{WebSocketServer, ws_route, ws_diff_route};
use qaexchange::protocol::diff::snapshot::SnapshotManager;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    // 创建业务组件
    let account_mgr = Arc::new(AccountManager::new());
    let mut trade_gateway = TradeGateway::new(account_mgr.clone());

    // ✨ 集成 DIFF 快照管理器
    let snapshot_mgr = Arc::new(SnapshotManager::new());
    trade_gateway.set_snapshot_manager(snapshot_mgr);

    let trade_gateway = Arc::new(trade_gateway);

    // 创建 WebSocket 服务器
    let ws_server = Arc::new(WebSocketServer::new(
        order_router,
        account_mgr,
        trade_gateway,
        market_broadcaster,
    ));

    // 启动 HTTP 服务器
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(ws_server.clone()))
            .route("/ws", web::get().to(ws_route))              // 原有协议
            .route("/ws/diff", web::get().to(ws_diff_route))    // ✨ DIFF 协议
    })
    .bind("0.0.0.0:8080")?
    .run()
    .await
}

2. 前端集成(示例)

连接 DIFF WebSocket

// 连接 DIFF WebSocket
const ws = new WebSocket('ws://localhost:8080/ws/diff?user_id=user123');

// 本地快照
let snapshot = {};

// 连接成功后发送 peek_message
ws.onopen = () => {
  console.log('DIFF WebSocket connected');
  ws.send(JSON.stringify({ aid: "peek_message" }));
};

// 接收 rtn_data 并应用 merge patch
ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);

  if (msg.aid === "rtn_data") {
    // 应用所有 patch 到本地快照
    msg.data.forEach(patch => {
      mergePatch(snapshot, patch);
    });

    // 更新 UI(例如 Vuex)
    store.commit('UPDATE_SNAPSHOT', snapshot);

    // 继续下一轮 peek
    ws.send(JSON.stringify({ aid: "peek_message" }));
  }
};

// JSON Merge Patch 实现
function mergePatch(target, patch) {
  if (typeof patch !== 'object' || patch === null || Array.isArray(patch)) {
    return patch;
  }

  if (typeof target !== 'object' || target === null || Array.isArray(target)) {
    target = {};
  }

  for (const [key, value] of Object.entries(patch)) {
    if (value === null) {
      delete target[key];
    } else if (typeof value === 'object' && !Array.isArray(value)) {
      target[key] = mergePatch(target[key], value);
    } else {
      target[key] = value;
    }
  }

  return target;
}

数据流示例

订单成交推送流程

1. 用户提交订单
   ↓
2. OrderRouter → MatchingEngine 成交
   ↓
3. TradeGateway.handle_filled()
   ├─ 更新账户(QA_Account)
   └─ ✨ 推送 3 个 DIFF patch:
      ├─ trade_patch: 成交明细
      ├─ order_patch: 订单状态
      └─ account_patch: 账户变动
         ↓
4. SnapshotManager.push_patch()
   ├─ 存入 patch_queue
   └─ 唤醒 peek() 请求(Tokio Notify)
      ↓
5. DiffWebsocketSession 发送 rtn_data
   ↓
6. 客户端接收并应用 merge_patch
   └─ 本地快照实时更新

DIFF Patch 示例

场景: 用户下单买入 10手 SHFE.cu2512 @ 75230,全部成交

推送的 3 个 patch:

// Patch 1: 成交记录
{
  "trades": {
    "trade_20251005_001": {
      "trade_id": "trade_20251005_001",
      "user_id": "user123",
      "order_id": "order456",
      "instrument_id": "SHFE.cu2512",
      "direction": "BUY",
      "offset": "OPEN",
      "price": 75230.0,
      "volume": 10.0,
      "commission": 5.0,
      "timestamp": 1728134567000000000
    }
  }
}

// Patch 2: 订单状态
{
  "orders": {
    "order456": {
      "status": "FILLED",
      "filled_volume": 10.0,
      "remaining_volume": 0.0,
      "update_time": 1728134567000000000
    }
  }
}

// Patch 3: 账户变动
{
  "accounts": {
    "user123": {
      "balance": 99995.0,
      "available": 49995.0,
      "margin": 50000.0,
      "position_profit": 0.0,
      "risk_ratio": 0.5
    }
  }
}

客户端应用后的快照:

{
  accounts: {
    user123: {
      balance: 99995.0,
      available: 49995.0,
      margin: 50000.0,
      position_profit: 0.0,
      risk_ratio: 0.5
    }
  },
  orders: {
    order456: {
      status: "FILLED",
      filled_volume: 10.0,
      remaining_volume: 0.0,
      update_time: 1728134567000000000
    }
  },
  trades: {
    trade_20251005_001: {
      trade_id: "trade_20251005_001",
      user_id: "user123",
      order_id: "order456",
      instrument_id: "SHFE.cu2512",
      direction: "BUY",
      offset: "OPEN",
      price: 75230.0,
      volume: 10.0,
      commission: 5.0,
      timestamp: 1728134567000000000
    }
  }
}

DIFF 消息协议

客户端消息(aid-based)

// 1. peek_message - 阻塞等待数据更新
{ "aid": "peek_message" }

// 2. req_login - 登录请求
{
  "aid": "req_login",
  "user_name": "user123",
  "password": "password123"
}

// 3. insert_order - 下单请求
{
  "aid": "insert_order",
  "user_id": "user123",
  "order_id": "order456",
  "exchange_id": "SHFE",
  "instrument_id": "cu2512",
  "direction": "BUY",
  "offset": "OPEN",
  "volume": 10,
  "price_type": "LIMIT",
  "limit_price": 75230.0
}

// 4. cancel_order - 撤单请求
{
  "aid": "cancel_order",
  "user_id": "user123",
  "order_id": "order456"
}

服务端消息(aid-based)

// rtn_data - 数据推送(JSON Merge Patch 数组)
{
  "aid": "rtn_data",
  "data": [
    { "trades": { "trade_001": { ... } } },
    { "orders": { "order_456": { ... } } },
    { "accounts": { "user123": { ... } } }
  ]
}

性能指标

指标目标值实际值说明
延迟
peek() 唤醒延迟< 10μsP99 < 10μsTokio Notify 性能
JSON 序列化< 5μs~2-5μsserde_json
端到端延迟< 200μsP99 < 200μs成交 → 客户端
吞吐
Patch 推送> 100K/s> 100K/s异步架构
并发用户> 10K> 10KDashMap
内存
每用户内存< 200KB~100KB快照 + patch队列

故障排查

常见问题

问题原因解决方案
未收到 patch用户未初始化连接时调用 initialize_user()
patch 延迟高tokio runtime 繁忙增加 worker 线程数
WebSocket 断开心跳超时检查网络,减小心跳间隔
快照不一致patch 顺序错误检查 push_patch 调用顺序

调试日志

# 启用 DIFF 调试日志
RUST_LOG=qaexchange::protocol::diff=debug,qaexchange::exchange::trade_gateway=debug cargo run

# 关键日志
# - "SnapshotManager: User initialized" - 用户快照初始化
# - "SnapshotManager: Patch pushed" - Patch 推送
# - "SnapshotManager: peek() awakened" - peek 被唤醒
# - "TradeGateway: Order filled" - 订单成交

架构图

┌─────────────────────────────────────────────────────────┐
│                    WebSocketServer                       │
├─────────────────────────────────────────────────────────┤
│  sessions: Arc<RwLock<HashMap<session_id, Addr>>>       │
│  diff_handler: Arc<DiffHandler> ◄─── 零拷贝共享          │
│  trade_gateway: Arc<TradeGateway>                       │
└────────────┬─────────────────────┬──────────────────────┘
             │                     │
      /ws (原有协议)          /ws/diff (DIFF协议)
             │                     │
             ▼                     ▼
      ┌─────────────┐      ┌──────────────────┐
      │ WsSession   │      │DiffWebsocketSession│
      └─────────────┘      └────────┬──────────┘
                                    │
                                    ▼
                            ┌────────────────┐
                            │  DiffHandler   │
                            ├────────────────┤
                            │ snapshot_mgr   │◄─ Arc<SnapshotManager>
                            └────────┬───────┘
                                    │
                                    ▼
                            ┌────────────────────────────┐
                            │    SnapshotManager         │
                            ├────────────────────────────┤
                            │ users: DashMap<user_id,    │
                            │        UserSnapshot>       │
                            │ - snapshot: Value          │
                            │ - patch_queue: Vec<Value>  │
                            │ - notify: Arc<Notify>      │
                            └────────────────────────────┘
                                    ▲
                                    │
                            ┌───────┴────────┐
                            │  TradeGateway  │
                            │ (业务逻辑推送)  │
                            └────────────────┘

相关文档


下一步

待实现功能

  • 前端 WebSocket 客户端封装(Vue/React 组件)
  • Vuex Store 集成(业务快照管理)
  • OrderRouter 订单提交推送
  • 行情数据推送(MarketDataBroadcaster)
  • K线数据推送(SetChart 订阅)

测试计划

  • 单元测试(TradeGateway DIFF 推送)
  • 集成测试(端到端推送流程)
  • 性能测试(万级并发、高频成交)
  • 前后端联调测试

最后更新: 2025-10-05 维护者: QAExchange Team