通知系统架构 (Notification System)

📖 概述

QAExchange-RS 的通知系统提供高性能、零拷贝的实时消息推送能力,支持 WebSocket 客户端订阅交易事件、账户更新、持仓变化和风控预警。系统基于 Broker-Gateway 架构,实现了消息路由、优先级队列、去重、批量推送和订阅过滤。

🎯 设计目标

  • 高性能: P99延迟 < 1ms(P0消息),支持 10K+ 并发用户
  • 零拷贝: 使用 rkyv 零拷贝序列化,避免内存分配
  • 优先级队列: P0(最高)到 P3(最低)四级优先级
  • 消息去重: 基于 message_id 的去重缓存(最近 10K 消息)
  • 批量推送: 批量大小 100,批量间隔 100ms
  • 订阅过滤: 按频道(trade/account/position/risk/system)过滤消息
  • 会话管理: 自动清理超时会话(5分钟)

🏗️ 架构设计

系统拓扑

┌──────────────────────────────────────────────────────────────────────┐
│                   QAExchange 通知系统                                  │
│                                                                        │
│  ┌──────────────────────────────────────────────────────────────┐    │
│  │  业务模块 (Business Modules)                                   │    │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐      │    │
│  │  │ Matching │  │ Account  │  │ Position │  │   Risk   │      │    │
│  │  │  Engine  │  │  System  │  │  Tracker │  │  Control │      │    │
│  │  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘      │    │
│  └───────┼──────────────┼──────────────┼──────────────┼──────────┘    │
│          │              │              │              │                │
│          ▼              ▼              ▼              ▼                │
│  ┌───────────────────────────────────────────────────────────────┐   │
│  │              Notification (消息对象)                            │   │
│  │  - message_id (UUID)                                           │   │
│  │  - message_type (OrderAccepted/TradeExecuted/...)             │   │
│  │  - user_id (目标用户)                                           │   │
│  │  - priority (0-3)                                              │   │
│  │  - payload (具体内容)                                           │   │
│  └────────────────────────┬─────────────────────────────────────┘   │
│                           │                                           │
│                           ▼                                           │
│  ┌───────────────────────────────────────────────────────────────┐   │
│  │         NotificationBroker (路由中心)                           │   │
│  │                                                                 │   │
│  │  1. 消息去重 (DashMap<message_id, bool>)                        │   │
│  │  2. 优先级队列 (P0/P1/P2/P3)                                    │   │
│  │     - P0: 10K 容量 (RiskAlert, MarginCall)                     │   │
│  │     - P1: 50K 容量 (OrderAccepted, TradeExecuted)             │   │
│  │     - P2: 100K 容量 (AccountUpdate, PositionUpdate)           │   │
│  │     - P3: 50K 容量 (SystemNotice)                              │   │
│  │  3. 路由表 (user_id → Vec<gateway_id>)                         │   │
│  │  4. 优先级处理器 (100μs 间隔)                                   │   │
│  │                                                                 │   │
│  └────────────────────────┬──────────────┬────────────────────┘   │
│                           │              │                           │
│          ┌────────────────┘              └────────────────┐          │
│          ▼                                                ▼          │
│  ┌──────────────────┐                            ┌──────────────────┐│
│  │ NotificationGateway                           NotificationGateway││
│  │   (Gateway 1)                                   (Gateway 2)      ││
│  │                                                                  ││
│  │ 1. 会话管理 (session_id → SessionInfo)                           ││
│  │ 2. 用户索引 (user_id → Vec<session_id>)                          ││
│  │ 3. 订阅过滤 (channel: trade/account/position/risk/system)       ││
│  │ 4. 批量推送 (100条/批,100ms间隔)                                 ││
│  │ 5. 心跳检测 (5分钟超时)                                          ││
│  │                                                                  ││
│  └───┬──────────────┘                            └───┬──────────────┘│
│      │                                               │                │
│      ▼                                               ▼                │
│  ┌──────────────────┐                       ┌──────────────────┐    │
│  │  WebSocket       │                       │  WebSocket       │    │
│  │  Session 1       │                       │  Session 2       │    │
│  │  (user_01)       │                       │  (user_02)       │    │
│  └──────────────────┘                       └──────────────────┘    │
│                                                                        │
└──────────────────────────────────────────────────────────────────────┘

核心组件

src/notification/
├── mod.rs          # 模块入口和架构说明
├── message.rs      # 消息定义 (Notification + NotificationPayload)
├── broker.rs       # 路由中心 (NotificationBroker)
└── gateway.rs      # 推送网关 (NotificationGateway)

📋 1. 消息结构 (Notification)

1.1 核心结构

#![allow(unused)]
fn main() {
// src/notification/message.rs
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Archive, RkyvSerialize, RkyvDeserialize)]
#[archive(check_bytes)]
pub struct Notification {
    /// 消息ID(全局唯一,用于去重)
    pub message_id: Arc<str>,

    /// 消息类型
    pub message_type: NotificationType,

    /// 用户ID
    pub user_id: Arc<str>,

    /// 优先级(0=最高,3=最低)
    pub priority: u8,

    /// 消息负载
    pub payload: NotificationPayload,

    /// 时间戳(纳秒)
    pub timestamp: i64,

    /// 来源(MatchingEngine/AccountSystem/RiskControl)
    #[serde(skip)]
    pub source: String,
}
}

设计原则:

  • 零成本抽象: 使用 Arc<str> 避免字符串克隆
  • 类型安全: 使用强类型 NotificationType 枚举
  • 零拷贝序列化: 支持 rkyv 零拷贝反序列化

1.2 消息类型 (NotificationType)

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum NotificationType {
    // 订单相关(P1 - 高优先级)
    OrderAccepted,
    OrderRejected,
    OrderPartiallyFilled,
    OrderFilled,
    OrderCanceled,
    OrderExpired,

    // 成交相关(P1 - 高优先级)
    TradeExecuted,
    TradeCanceled,

    // 账户相关(P2 - 中优先级)
    AccountOpen,
    AccountUpdate,

    // 持仓相关(P2 - 中优先级)
    PositionUpdate,
    PositionProfit,

    // 风控相关(P0 - 最高优先级)
    RiskAlert,
    MarginCall,
    PositionLimit,

    // 系统相关(P3 - 低优先级)
    SystemNotice,
    TradingSessionStart,
    TradingSessionEnd,
    MarketHalt,
}
}

1.3 默认优先级

#![allow(unused)]
fn main() {
impl NotificationType {
    pub fn default_priority(&self) -> u8 {
        match self {
            // P0 - 最高优先级(<1ms)
            Self::RiskAlert | Self::MarginCall | Self::OrderRejected => 0,

            // P1 - 高优先级(<5ms)
            Self::OrderAccepted
            | Self::OrderPartiallyFilled
            | Self::OrderFilled
            | Self::OrderCanceled
            | Self::TradeExecuted => 1,

            // P2 - 中优先级(<100ms)
            Self::AccountOpen | Self::AccountUpdate | Self::PositionUpdate => 2,

            // P3 - 低优先级(<1s)
            Self::SystemNotice
            | Self::TradingSessionStart
            | Self::TradingSessionEnd
            | Self::MarketHalt
            | Self::OrderExpired => 3,
        }
    }
}
}

1.4 订阅频道映射

#![allow(unused)]
fn main() {
impl NotificationType {
    /// 返回订阅频道名称
    pub fn channel(&self) -> &'static str {
        match self {
            // 交易频道
            Self::OrderAccepted
            | Self::OrderRejected
            | Self::OrderPartiallyFilled
            | Self::OrderFilled
            | Self::OrderCanceled
            | Self::OrderExpired
            | Self::TradeExecuted
            | Self::TradeCanceled => "trade",

            // 账户频道
            Self::AccountOpen | Self::AccountUpdate => "account",

            // 持仓频道
            Self::PositionUpdate | Self::PositionProfit => "position",

            // 风控频道
            Self::RiskAlert | Self::MarginCall | Self::PositionLimit => "risk",

            // 系统频道
            Self::SystemNotice
            | Self::TradingSessionStart
            | Self::TradingSessionEnd
            | Self::MarketHalt => "system",
        }
    }
}
}

1.5 零拷贝序列化

#![allow(unused)]
fn main() {
impl Notification {
    /// 序列化为 rkyv 字节流(零拷贝)
    pub fn to_rkyv_bytes(&self) -> Result<Vec<u8>, String> {
        rkyv::to_bytes::<_, 1024>(self)
            .map(|bytes| bytes.to_vec())
            .map_err(|e| format!("rkyv serialization failed: {}", e))
    }

    /// 从 rkyv 字节流反序列化(零拷贝)
    pub fn from_rkyv_bytes(bytes: &[u8]) -> Result<&ArchivedNotification, String> {
        rkyv::check_archived_root::<Notification>(bytes)
            .map_err(|e| format!("rkyv deserialization failed: {}", e))
    }

    /// 手动构造 JSON(避免 Arc<str> 序列化问题)
    pub fn to_json(&self) -> String {
        format!(
            r#"{{"message_id":"{}","message_type":"{}","user_id":"{}","priority":{},"timestamp":{},"source":"{}","payload":{}}}"#,
            self.message_id.as_ref(),
            self.message_type.as_str(),
            self.user_id.as_ref(),
            self.priority,
            self.timestamp,
            self.source.as_str(),
            self.payload.to_json()
        )
    }
}
}

性能数据:

  • 序列化延迟: ~300 ns/消息
  • 零拷贝反序列化: ~20 ns/消息(125x vs JSON)
  • 内存分配: 0(反序列化时)

📡 2. 路由中心 (NotificationBroker)

2.1 核心结构

#![allow(unused)]
fn main() {
// src/notification/broker.rs
pub struct NotificationBroker {
    /// 用户订阅表:user_id → Vec<gateway_id>
    user_gateways: DashMap<Arc<str>, Vec<Arc<str>>>,

    /// Gateway通道:gateway_id → Sender
    gateway_senders: DashMap<Arc<str>, mpsc::UnboundedSender<Notification>>,

    /// 全局订阅者(存储系统、监控系统)
    global_subscribers: DashMap<Arc<str>, mpsc::UnboundedSender<Notification>>,

    /// 消息去重缓存(最近10K消息)
    dedup_cache: Arc<Mutex<HashSet<Arc<str>>>>,

    /// 优先级队列(P0/P1/P2/P3)
    priority_queues: [Arc<ArrayQueue<Notification>>; 4],

    /// 统计信息
    stats: Arc<BrokerStats>,
}
}

并发设计:

  • DashMap: 无锁并发哈希表(无读锁开销)
  • ArrayQueue: crossbeam 无锁队列(Lock-free)
  • Mutex: 短期锁定的去重缓存

2.2 优先级队列配置

#![allow(unused)]
fn main() {
impl NotificationBroker {
    pub fn new() -> Self {
        Self {
            // ... 其他字段
            priority_queues: [
                Arc::new(ArrayQueue::new(10000)),  // P0队列
                Arc::new(ArrayQueue::new(50000)),  // P1队列
                Arc::new(ArrayQueue::new(100000)), // P2队列
                Arc::new(ArrayQueue::new(50000)),  // P3队列
            ],
            // ...
        }
    }
}
}

队列容量设计: | 优先级 | 容量 | 消息类型 | 延迟目标 | |-------|------|---------|---------| | P0 | 10K | RiskAlert, MarginCall | < 1ms | | P1 | 50K | OrderAccepted, TradeExecuted | < 5ms | | P2 | 100K | AccountUpdate, PositionUpdate | < 100ms | | P3 | 50K | SystemNotice | < 1s |

2.3 注册 Gateway

#![allow(unused)]
fn main() {
impl NotificationBroker {
    pub fn register_gateway(
        &self,
        gateway_id: impl Into<Arc<str>>,
        sender: mpsc::UnboundedSender<Notification>,
    ) {
        let gateway_id = gateway_id.into();
        self.gateway_senders.insert(gateway_id.clone(), sender);
        log::info!("Gateway registered: {}", gateway_id);
    }

    pub fn unregister_gateway(&self, gateway_id: &str) {
        self.gateway_senders.remove(gateway_id);

        // 清理该Gateway的所有用户订阅
        self.user_gateways.retain(|_user_id, gateways| {
            gateways.retain(|gid| gid.as_ref() != gateway_id);
            !gateways.is_empty()
        });

        log::info!("Gateway unregistered: {}", gateway_id);
    }
}
}

2.4 订阅管理

#![allow(unused)]
fn main() {
impl NotificationBroker {
    /// 订阅用户消息
    pub fn subscribe(&self, user_id: impl Into<Arc<str>>, gateway_id: impl Into<Arc<str>>) {
        let user_id = user_id.into();
        let gateway_id = gateway_id.into();

        self.user_gateways
            .entry(user_id.clone())
            .or_insert_with(Vec::new)
            .push(gateway_id.clone());

        log::debug!("User {} subscribed to gateway {}", user_id, gateway_id);
    }

    /// 取消订阅
    pub fn unsubscribe(&self, user_id: &str, gateway_id: &str) {
        if let Some(mut gateways) = self.user_gateways.get_mut(user_id) {
            gateways.retain(|gid| gid.as_ref() != gateway_id);
        }
    }

    /// 全局订阅(接收所有通知)
    pub fn subscribe_global(
        &self,
        subscriber_id: impl Into<Arc<str>>,
        sender: mpsc::UnboundedSender<Notification>,
    ) {
        let subscriber_id = subscriber_id.into();
        self.global_subscribers.insert(subscriber_id.clone(), sender);
        log::info!("Global subscriber registered: {}", subscriber_id);
    }
}
}

2.5 发布消息

#![allow(unused)]
fn main() {
impl NotificationBroker {
    pub fn publish(&self, notification: Notification) -> Result<(), String> {
        // 1. 消息去重
        if self.is_duplicate(&notification.message_id) {
            self.stats.messages_deduplicated.fetch_add(1, Ordering::Relaxed);
            return Ok(());
        }

        // 2. 按优先级入队
        let priority = notification.priority.min(3) as usize;
        if let Err(_) = self.priority_queues[priority].push(notification.clone()) {
            // 队列满,丢弃消息
            self.stats.messages_dropped.fetch_add(1, Ordering::Relaxed);
            log::warn!("Priority queue {} is full, message dropped", priority);
            return Err(format!("Priority queue {} is full", priority));
        }

        // 3. 统计
        self.stats.messages_sent.fetch_add(1, Ordering::Relaxed);
        Ok(())
    }
}
}

2.6 消息去重

#![allow(unused)]
fn main() {
impl NotificationBroker {
    fn is_duplicate(&self, message_id: &Arc<str>) -> bool {
        let mut cache = self.dedup_cache.lock();

        if cache.contains(message_id) {
            return true;
        }

        // 添加到去重缓存
        cache.insert(message_id.clone());

        // 限制缓存大小(保留最近10000条)
        if cache.len() > 10000 {
            // 清空一半缓存(简化实现,生产环境应使用LRU)
            let to_remove: Vec<Arc<str>> = cache.iter()
                .take(5000)
                .cloned()
                .collect();
            for id in to_remove {
                cache.remove(&id);
            }
        }

        false
    }
}
}

2.7 优先级处理器

#![allow(unused)]
fn main() {
impl NotificationBroker {
    pub fn start_priority_processor(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(Duration::from_micros(100));

            loop {
                interval.tick().await;

                // P0: 处理所有
                while let Some(notif) = self.priority_queues[0].pop() {
                    self.route_notification(&notif);
                }

                // P1: 处理所有
                while let Some(notif) = self.priority_queues[1].pop() {
                    self.route_notification(&notif);
                }

                // P2: 批量处理(最多100条)
                for _ in 0..100 {
                    if let Some(notif) = self.priority_queues[2].pop() {
                        self.route_notification(&notif);
                    } else {
                        break;
                    }
                }

                // P3: 批量处理(最多50条,避免饥饿)
                for _ in 0..50 {
                    if let Some(notif) = self.priority_queues[3].pop() {
                        self.route_notification(&notif);
                    } else {
                        break;
                    }
                }
            }
        })
    }
}
}

处理策略:

  • P0/P1: 处理所有消息(最高优先级)
  • P2: 每轮最多 100 条(避免阻塞 P0/P1)
  • P3: 每轮最多 50 条(避免饥饿)
  • 间隔: 100μs(10000 次/秒)

2.8 消息路由

#![allow(unused)]
fn main() {
impl NotificationBroker {
    fn route_notification(&self, notification: &Notification) {
        // 1. 发送到用户特定的 Gateway
        if let Some(gateways) = self.user_gateways.get(notification.user_id.as_ref()) {
            for gateway_id in gateways.iter() {
                if let Some(sender) = self.gateway_senders.get(gateway_id.as_ref()) {
                    if let Err(e) = sender.send(notification.clone()) {
                        log::error!("Failed to send notification to gateway {}: {}", gateway_id, e);
                    }
                }
            }
        }

        // 2. 发送到所有全局订阅者
        for entry in self.global_subscribers.iter() {
            let subscriber_id = entry.key();
            let sender = entry.value();
            if let Err(e) = sender.send(notification.clone()) {
                log::error!("Failed to send notification to global subscriber {}: {}", subscriber_id, e);
            }
        }
    }
}
}

🌐 3. 推送网关 (NotificationGateway)

3.1 核心结构

#![allow(unused)]
fn main() {
// src/notification/gateway.rs
pub struct NotificationGateway {
    /// Gateway ID
    gateway_id: Arc<str>,

    /// 会话管理:session_id → SessionInfo
    sessions: DashMap<Arc<str>, SessionInfo>,

    /// 用户会话索引:user_id → Vec<session_id>
    user_sessions: DashMap<Arc<str>, Vec<Arc<str>>>,

    /// 接收来自Broker的通知
    notification_receiver: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<Notification>>>,

    /// 批量推送配置
    batch_size: usize,
    batch_interval_ms: u64,

    /// 统计信息
    stats: Arc<GatewayStats>,
}
}

3.2 会话信息

#![allow(unused)]
fn main() {
#[derive(Debug, Clone)]
pub struct SessionInfo {
    /// 会话ID
    pub session_id: Arc<str>,

    /// 用户ID
    pub user_id: Arc<str>,

    /// 消息发送通道(发送到WebSocket客户端)
    pub sender: mpsc::UnboundedSender<String>,

    /// 订阅的频道(trade, orderbook, account, position)
    pub subscriptions: Arc<RwLock<HashSet<String>>>,

    /// 连接时间
    pub connected_at: i64,

    /// 最后活跃时间
    pub last_active: Arc<AtomicI64>,
}
}

3.3 注册会话

#![allow(unused)]
fn main() {
impl NotificationGateway {
    pub fn register_session(
        &self,
        session_id: impl Into<Arc<str>>,
        user_id: impl Into<Arc<str>>,
        sender: mpsc::UnboundedSender<String>,
    ) {
        let session_id = session_id.into();
        let user_id = user_id.into();

        let session_info = SessionInfo {
            session_id: session_id.clone(),
            user_id: user_id.clone(),
            sender,
            subscriptions: Arc::new(RwLock::new(HashSet::new())),
            connected_at: chrono::Utc::now().timestamp(),
            last_active: Arc::new(AtomicI64::new(chrono::Utc::now().timestamp())),
        };

        // 添加到会话表
        self.sessions.insert(session_id.clone(), session_info);

        // 添加到用户索引
        self.user_sessions
            .entry(user_id.clone())
            .or_insert_with(Vec::new)
            .push(session_id.clone());

        self.stats.active_sessions.fetch_add(1, Ordering::Relaxed);

        log::info!("Session registered: {} for user {}", session_id, user_id);
    }

    pub fn unregister_session(&self, session_id: &str) {
        if let Some((_, session_info)) = self.sessions.remove(session_id) {
            // 从用户索引中移除
            if let Some(mut sessions) = self.user_sessions.get_mut(&session_info.user_id) {
                sessions.retain(|sid| sid.as_ref() != session_id);
            }

            self.stats.active_sessions.fetch_sub(1, Ordering::Relaxed);
            log::info!("Session unregistered: {}", session_id);
        }
    }
}
}

3.4 订阅管理

#![allow(unused)]
fn main() {
impl NotificationGateway {
    /// 订阅频道
    pub fn subscribe_channel(&self, session_id: &str, channel: impl Into<String>) {
        if let Some(session) = self.sessions.get(session_id) {
            session.subscriptions.write().insert(channel.into());
        }
    }

    /// 批量订阅频道
    pub fn subscribe_channels(&self, session_id: &str, channels: Vec<String>) {
        if let Some(session) = self.sessions.get(session_id) {
            let mut subs = session.subscriptions.write();
            for channel in channels {
                subs.insert(channel);
            }
        }
    }

    /// 取消所有订阅
    pub fn unsubscribe_all(&self, session_id: &str) {
        if let Some(session) = self.sessions.get(session_id) {
            session.subscriptions.write().clear();
        }
    }

    /// 检查会话是否订阅了特定频道
    pub fn is_subscribed(&self, session_id: &str, channel: &str) -> bool {
        if let Some(session) = self.sessions.get(session_id) {
            session.subscriptions.read().contains(channel)
        } else {
            false
        }
    }
}
}

3.5 通知推送任务

#![allow(unused)]
fn main() {
impl NotificationGateway {
    pub fn start_notification_pusher(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
        tokio::spawn(async move {
            let mut batch: Vec<Notification> = Vec::with_capacity(self.batch_size);
            let mut interval = tokio::time::interval(Duration::from_millis(self.batch_interval_ms));

            loop {
                tokio::select! {
                    // 接收通知消息
                    notification = async {
                        let mut receiver = self.notification_receiver.lock().await;
                        receiver.recv().await
                    } => {
                        if let Some(notif) = notification {
                            // 高优先级消息立即推送
                            if notif.priority == 0 {
                                self.push_notification(&notif).await;
                            } else {
                                // 其他消息批量推送
                                batch.push(notif);

                                if batch.len() >= self.batch_size {
                                    self.push_batch(&batch).await;
                                    batch.clear();
                                }
                            }
                        } else {
                            // 通道关闭,退出
                            break;
                        }
                    }

                    // 定时器触发(批量推送)
                    _ = interval.tick() => {
                        if !batch.is_empty() {
                            self.push_batch(&batch).await;
                            batch.clear();
                        }
                    }
                }
            }

            log::info!("Notification pusher stopped for gateway {}", self.gateway_id);
        })
    }
}
}

3.6 推送单条通知

#![allow(unused)]
fn main() {
impl NotificationGateway {
    async fn push_notification(&self, notification: &Notification) {
        // 查找该用户的所有会话
        if let Some(session_ids) = self.user_sessions.get(&notification.user_id) {
            for session_id in session_ids.iter() {
                if let Some(session) = self.sessions.get(session_id.as_ref()) {
                    // 检查订阅过滤
                    let subscriptions = session.subscriptions.read();
                    let notification_channel = notification.message_type.channel();

                    // 如果会话设置了订阅过滤,则只推送订阅的频道
                    if !subscriptions.is_empty() && !subscriptions.contains(notification_channel) {
                        continue; // 跳过未订阅的通知
                    }

                    drop(subscriptions); // 释放读锁

                    // 手动构造 JSON
                    let json = notification.to_json();

                    // 发送到WebSocket
                    if let Err(e) = session.sender.send(json) {
                        log::error!("Failed to send notification to session {}: {}", session_id, e);
                        self.stats.messages_failed.fetch_add(1, Ordering::Relaxed);
                    } else {
                        self.stats.messages_pushed.fetch_add(1, Ordering::Relaxed);

                        // 更新最后活跃时间
                        session.last_active.store(
                            chrono::Utc::now().timestamp(),
                            Ordering::Relaxed
                        );
                    }
                }
            }
        }
    }
}
}

3.7 批量推送通知

#![allow(unused)]
fn main() {
impl NotificationGateway {
    async fn push_batch(&self, notifications: &[Notification]) {
        // 按用户分组
        let mut grouped: HashMap<Arc<str>, Vec<&Notification>> = HashMap::new();

        for notif in notifications {
            grouped.entry(notif.user_id.clone())
                   .or_insert_with(Vec::new)
                   .push(notif);
        }

        // 并行推送(每个用户)
        for (_user_id, user_notifs) in grouped {
            for notif in user_notifs {
                self.push_notification(notif).await;
            }
        }
    }
}
}

3.8 心跳检测

#![allow(unused)]
fn main() {
impl NotificationGateway {
    pub fn start_heartbeat_checker(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(Duration::from_secs(30));

            loop {
                interval.tick().await;

                let now = chrono::Utc::now().timestamp();
                let timeout = 300; // 5分钟超时

                // 查找超时的会话
                let mut to_remove = Vec::new();
                for entry in self.sessions.iter() {
                    let session_id = entry.key();
                    let session = entry.value();

                    let last_active = session.last_active.load(Ordering::Relaxed);
                    if now - last_active > timeout {
                        to_remove.push(session_id.clone());
                    }
                }

                // 移除超时会话
                for session_id in to_remove {
                    log::warn!("Session {} timeout, removing", session_id);
                    self.unregister_session(&session_id);
                }
            }
        })
    }
}
}

📊 4. 性能指标

4.1 延迟

优先级目标延迟实测延迟条件
P0< 1ms~0.5ms ✅立即推送
P1< 5ms~2ms ✅批量推送(100条/批)
P2< 100ms~50ms ✅批量推送 + 100ms间隔
P3< 1s~500ms ✅批量推送 + 避免饥饿

4.2 吞吐量

指标条件
消息处理吞吐量> 10K messages/secBroker 优先级处理器
WebSocket 推送吞吐量> 5K messages/sec/gateway批量推送
并发会话数> 10K sessions/gatewayDashMap 无锁访问
消息去重命中率~5%10K LRU 缓存

4.3 内存占用

组件占用条件
Notification~200 bytesrkyv 序列化
P0 队列~2 MB10K * 200 bytes
P1 队列~10 MB50K * 200 bytes
P2 队列~20 MB100K * 200 bytes
P3 队列~10 MB50K * 200 bytes
去重缓存~400 KB10K * 40 bytes (Arc)
总计~42.4 MB满载状态

🛠️ 5. 使用示例

5.1 初始化系统

use qaexchange::notification::*;
use std::sync::Arc;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 1. 创建Broker
    let broker = Arc::new(NotificationBroker::new());

    // 2. 创建Gateway
    let (gateway_tx, gateway_rx) = mpsc::unbounded_channel();
    let gateway = Arc::new(NotificationGateway::new("gateway_01", gateway_rx));

    // 3. 注册Gateway到Broker
    broker.register_gateway("gateway_01", gateway_tx);

    // 4. 订阅用户消息
    broker.subscribe("user_01", "gateway_01");

    // 5. 注册WebSocket会话
    let (session_tx, mut session_rx) = mpsc::unbounded_channel();
    gateway.register_session("session_01", "user_01", session_tx);

    // 6. 启动后台任务
    let _broker_processor = broker.clone().start_priority_processor();
    let _gateway_pusher = gateway.clone().start_notification_pusher();
    let _gateway_heartbeat = gateway.clone().start_heartbeat_checker();

    log::info!("Notification system started");
}

5.2 发布通知

#![allow(unused)]
fn main() {
// 业务模块发布通知
async fn on_trade_executed(
    broker: &Arc<NotificationBroker>,
    user_id: &str,
    trade_id: &str,
    order_id: &str,
    price: f64,
    volume: f64,
) {
    let payload = NotificationPayload::TradeExecuted(TradeExecutedNotify {
        trade_id: trade_id.to_string(),
        order_id: order_id.to_string(),
        exchange_order_id: format!("EX_{}_{}", trade_id, "IX2401"),
        instrument_id: "IX2401".to_string(),
        direction: "BUY".to_string(),
        offset: "OPEN".to_string(),
        price,
        volume,
        commission: price * volume * 0.0001,
        fill_type: "FULL".to_string(),
        timestamp: chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
    });

    let notification = Notification::new(
        NotificationType::TradeExecuted,
        Arc::from(user_id),
        payload,
        "MatchingEngine",
    );

    broker.publish(notification).unwrap();
}
}

5.3 订阅频道

#![allow(unused)]
fn main() {
// WebSocket客户端订阅特定频道
async fn subscribe_channels(
    gateway: &Arc<NotificationGateway>,
    session_id: &str,
    channels: Vec<&str>,
) {
    let channels: Vec<String> = channels.iter().map(|s| s.to_string()).collect();
    gateway.subscribe_channels(session_id, channels);

    log::info!("Session {} subscribed to channels", session_id);
}

// 示例:只订阅交易和风控通知
subscribe_channels(&gateway, "session_01", vec!["trade", "risk"]).await;
}

5.4 接收 WebSocket 消息

#![allow(unused)]
fn main() {
// WebSocket 服务端接收消息
async fn handle_websocket_session(
    mut session_rx: mpsc::UnboundedReceiver<String>,
) {
    while let Some(json) = session_rx.recv().await {
        // 解析JSON
        let notification: serde_json::Value = serde_json::from_str(&json).unwrap();

        println!("Received notification: {}", notification);

        // 根据消息类型处理
        let message_type = notification["message_type"].as_str().unwrap();
        match message_type {
            "trade_executed" => {
                // 处理成交回报
            },
            "risk_alert" => {
                // 处理风控预警
            },
            _ => {}
        }
    }
}
}

📚 6. 相关文档


返回核心模块 | 返回文档中心