压缩策略 (Compression Strategy)

@yutiansut @quantaxis

📖 概述

QAExchange-RS 存储系统支持按数据类型配置压缩策略,针对不同数据特性选择最优的压缩算法,在存储空间、压缩速度和解压速度之间取得最佳平衡。

🎯 设计目标

  • 差异化压缩: 不同数据类型使用不同压缩算法
  • 性能优先: 热数据使用快速压缩(LZ4/Snappy)
  • 空间优先: 冷数据使用高压缩率(Zstd Level 9)
  • 零配置: 提供开箱即用的默认策略
  • 可扩展: 支持自定义压缩等级

🏗️ 架构设计

压缩算法层次

┌─────────────────────────────────────────────────────────────┐
│                    CompressionStrategy                       │
│  ┌─────────────┬─────────────┬─────────────┬──────────────┐ │
│  │ Account     │ MarketData  │ KLine       │ Factor       │ │
│  │ ZSTD(6)     │ LZ4         │ ZSTD(3)     │ ZSTD(3)      │ │
│  └─────────────┴─────────────┴─────────────┴──────────────┘ │
│  ┌─────────────┬─────────────┬─────────────┬──────────────┐ │
│  │ Order       │ Trade       │ Position    │ System       │ │
│  │ ZSTD(3)     │ ZSTD(3)     │ ZSTD(6)     │ Snappy       │ │
│  └─────────────┴─────────────┴─────────────┴──────────────┘ │
└─────────────────────────────────────────────────────────────┘
                              ↓
                    CompressionAlgorithm
                              ↓
              ┌───────────────────────────────┐
              │       Parquet Options         │
              │  CompressionOptions::Zstd()   │
              │  CompressionOptions::Lz4Raw   │
              │  CompressionOptions::Snappy   │
              └───────────────────────────────┘

数据类型分类

类别数据类型推荐算法原因
账户数据Account, PositionZSTD(6)更新频率低,优先压缩率
市场数据Tick, OrderBookLZ4高频写入,优先速度
K线数据KLineZSTD(3)中等频率,平衡策略
交易数据Order, TradeZSTD(3)需要快速恢复,平衡策略
因子数据FactorZSTD(3)数值型数据,压缩效果好
系统数据System, LogSnappy低压缩率但极快

🔧 核心实现

压缩算法枚举

#![allow(unused)]
fn main() {
// src/storage/sstable/compression.rs

use parquet2::compression::{CompressionOptions, ZstdLevel};

/// 压缩算法类型
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionAlgorithm {
    /// 无压缩 - 最快写入,最大空间占用
    Uncompressed,

    /// Snappy - 极快压缩/解压,低压缩率 (2-3x)
    Snappy,

    /// LZ4 - 非常快的压缩/解压,适合热数据
    Lz4,

    /// ZSTD Level 1 - 快速压缩,中等压缩率
    Zstd1,

    /// ZSTD Level 3 - 平衡策略,推荐大多数场景
    Zstd3,

    /// ZSTD Level 6 - 较高压缩率,适合冷数据
    Zstd6,

    /// ZSTD Level 9 - 最高压缩率,归档数据
    Zstd9,
}

impl CompressionAlgorithm {
    /// 转换为 Parquet 压缩选项
    pub fn to_parquet_options(self) -> CompressionOptions {
        match self {
            Self::Uncompressed => CompressionOptions::Uncompressed,
            Self::Snappy => CompressionOptions::Snappy,
            Self::Lz4 => CompressionOptions::Lz4Raw,
            Self::Zstd1 => CompressionOptions::Zstd(
                Some(ZstdLevel::try_new(1).unwrap())
            ),
            Self::Zstd3 => CompressionOptions::Zstd(
                Some(ZstdLevel::try_new(3).unwrap())
            ),
            Self::Zstd6 => CompressionOptions::Zstd(
                Some(ZstdLevel::try_new(6).unwrap())
            ),
            Self::Zstd9 => CompressionOptions::Zstd(
                Some(ZstdLevel::try_new(9).unwrap())
            ),
        }
    }

    /// 获取压缩算法名称
    pub fn name(&self) -> &'static str {
        match self {
            Self::Uncompressed => "uncompressed",
            Self::Snappy => "snappy",
            Self::Lz4 => "lz4",
            Self::Zstd1 => "zstd-1",
            Self::Zstd3 => "zstd-3",
            Self::Zstd6 => "zstd-6",
            Self::Zstd9 => "zstd-9",
        }
    }

    /// 获取预估压缩率 (数据大小 / 压缩后大小)
    pub fn estimated_ratio(&self) -> f64 {
        match self {
            Self::Uncompressed => 1.0,
            Self::Snappy => 2.5,
            Self::Lz4 => 3.0,
            Self::Zstd1 => 4.0,
            Self::Zstd3 => 5.0,
            Self::Zstd6 => 6.5,
            Self::Zstd9 => 8.0,
        }
    }
}
}

压缩策略配置

#![allow(unused)]
fn main() {
/// 按记录类型配置的压缩策略
#[derive(Debug, Clone)]
pub struct CompressionStrategy {
    /// 账户数据压缩算法
    pub account: CompressionAlgorithm,

    /// 市场数据(Tick、OrderBook)压缩算法
    pub market_data: CompressionAlgorithm,

    /// K线数据压缩算法
    pub kline: CompressionAlgorithm,

    /// 交易数据(Order、Trade)压缩算法
    pub trading: CompressionAlgorithm,

    /// 因子数据压缩算法
    pub factor: CompressionAlgorithm,

    /// 系统/日志数据压缩算法
    pub system: CompressionAlgorithm,

    /// 默认压缩算法(未分类数据)
    pub default: CompressionAlgorithm,
}

impl CompressionStrategy {
    /// 创建新的压缩策略
    pub fn new() -> Self {
        Self::default()
    }

    /// 高性能策略:优先压缩/解压速度
    pub fn high_performance() -> Self {
        Self {
            account: CompressionAlgorithm::Zstd3,
            market_data: CompressionAlgorithm::Lz4,
            kline: CompressionAlgorithm::Lz4,
            trading: CompressionAlgorithm::Lz4,
            factor: CompressionAlgorithm::Lz4,
            system: CompressionAlgorithm::Snappy,
            default: CompressionAlgorithm::Lz4,
        }
    }

    /// 高压缩策略:优先存储空间
    pub fn high_compression() -> Self {
        Self {
            account: CompressionAlgorithm::Zstd9,
            market_data: CompressionAlgorithm::Zstd6,
            kline: CompressionAlgorithm::Zstd6,
            trading: CompressionAlgorithm::Zstd6,
            factor: CompressionAlgorithm::Zstd6,
            system: CompressionAlgorithm::Zstd3,
            default: CompressionAlgorithm::Zstd6,
        }
    }

    /// 根据记录类别获取压缩算法
    pub fn get_for_category(&self, category: RecordCategory) -> CompressionAlgorithm {
        match category {
            RecordCategory::Account => self.account,
            RecordCategory::MarketData => self.market_data,
            RecordCategory::KLine => self.kline,
            RecordCategory::Trading => self.trading,
            RecordCategory::Factor => self.factor,
            RecordCategory::System => self.system,
        }
    }
}

impl Default for CompressionStrategy {
    fn default() -> Self {
        Self {
            account: CompressionAlgorithm::Zstd6,      // 低频更新,高压缩
            market_data: CompressionAlgorithm::Lz4,    // 高频写入,快速压缩
            kline: CompressionAlgorithm::Zstd3,        // 平衡策略
            trading: CompressionAlgorithm::Zstd3,      // 平衡策略
            factor: CompressionAlgorithm::Zstd3,       // 数值数据
            system: CompressionAlgorithm::Snappy,      // 极快压缩
            default: CompressionAlgorithm::Zstd3,      // 默认平衡
        }
    }
}
}

记录类别定义

#![allow(unused)]
fn main() {
/// 记录类别(用于压缩策略选择)
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RecordCategory {
    /// 账户相关(Account, Position)
    Account,

    /// 市场数据(Tick, OrderBook)
    MarketData,

    /// K线数据
    KLine,

    /// 交易相关(Order, Trade)
    Trading,

    /// 因子数据
    Factor,

    /// 系统数据
    System,
}

impl RecordCategory {
    /// 从 RecordType 转换
    pub fn from_record_type(record_type: RecordType) -> Self {
        match record_type {
            RecordType::AccountOpen | RecordType::AccountUpdate |
            RecordType::PositionUpdate => Self::Account,

            RecordType::TickData | RecordType::OrderBookSnapshot |
            RecordType::OrderBookDelta => Self::MarketData,

            RecordType::KLineFinished => Self::KLine,

            RecordType::OrderInsert | RecordType::OrderUpdate |
            RecordType::TradeExecuted => Self::Trading,

            RecordType::FactorUpdate | RecordType::FactorSnapshot => Self::Factor,

            _ => Self::System,
        }
    }
}
}

📊 与 Parquet 集成

动态压缩写入

#![allow(unused)]
fn main() {
// src/storage/sstable/olap_parquet.rs

impl ParquetSSTableWriter {
    /// 创建带动态压缩的写入器
    pub fn create_with_compression<P: AsRef<Path>>(
        file_path: P,
        schema: Arc<Schema>,
        strategy: CompressionStrategy,
        category: Option<RecordCategory>,
    ) -> Result<Self, String> {
        // 根据数据类别选择压缩算法
        let compression_alg = match category {
            Some(cat) => strategy.get_for_category(cat),
            None => strategy.default,
        };

        let compression = compression_alg.to_parquet_options();

        let options = WriteOptions {
            write_statistics: true,
            compression,
            version: Version::V2,
            data_page_size_limit: Some(1024 * 1024), // 1MB
        };

        // 创建写入器...
        Self::create_with_options(file_path, schema, options)
    }
}
}

使用示例

#![allow(unused)]
fn main() {
use crate::storage::sstable::compression::{CompressionStrategy, RecordCategory};

// 使用默认平衡策略
let strategy = CompressionStrategy::default();
let writer = ParquetSSTableWriter::create_with_compression(
    "market_data.parquet",
    schema,
    strategy,
    Some(RecordCategory::MarketData), // 将使用 LZ4
)?;

// 使用高压缩策略(归档场景)
let archive_strategy = CompressionStrategy::high_compression();
let archive_writer = ParquetSSTableWriter::create_with_compression(
    "archive_2024.parquet",
    schema,
    archive_strategy,
    Some(RecordCategory::Trading), // 将使用 ZSTD(6)
)?;
}

📈 性能基准

压缩速度对比

算法压缩速度解压速度压缩率适用场景
Uncompressed--1.0x调试、临时数据
Snappy500 MB/s1.5 GB/s2.5x日志、系统数据
LZ4800 MB/s4.0 GB/s3.0x热数据、高频写入
ZSTD(1)400 MB/s1.2 GB/s4.0x温数据
ZSTD(3)250 MB/s1.0 GB/s5.0x默认平衡
ZSTD(6)100 MB/s900 MB/s6.5x冷数据
ZSTD(9)40 MB/s850 MB/s8.0x归档数据

实际测试结果

测试数据:100 万条 TickData 记录(约 150 MB 原始数据)

策略压缩后大小压缩时间读取时间
high_performance50 MB0.2s0.04s
default30 MB0.6s0.15s
high_compression19 MB3.8s0.18s

🛠️ 配置示例

TOML 配置

# config/storage.toml

[compression]
# 策略模式: "balanced", "performance", "compression"
mode = "balanced"

[compression.custom]
# 自定义各类型压缩算法
account = "zstd-6"
market_data = "lz4"
kline = "zstd-3"
trading = "zstd-3"
factor = "zstd-3"
system = "snappy"
default = "zstd-3"

代码配置

#![allow(unused)]
fn main() {
// 方式 1: 使用预设策略
let strategy = CompressionStrategy::default();

// 方式 2: 使用预设 + 自定义
let mut strategy = CompressionStrategy::high_performance();
strategy.account = CompressionAlgorithm::Zstd6; // 账户数据用高压缩

// 方式 3: 完全自定义
let strategy = CompressionStrategy {
    account: CompressionAlgorithm::Zstd9,
    market_data: CompressionAlgorithm::Lz4,
    kline: CompressionAlgorithm::Zstd3,
    trading: CompressionAlgorithm::Zstd3,
    factor: CompressionAlgorithm::Zstd6,
    system: CompressionAlgorithm::Snappy,
    default: CompressionAlgorithm::Zstd3,
};
}

💡 最佳实践

1. 场景选择

#![allow(unused)]
fn main() {
// 实时交易系统:优先速度
let strategy = CompressionStrategy::high_performance();

// 历史数据归档:优先空间
let strategy = CompressionStrategy::high_compression();

// 一般场景:默认平衡
let strategy = CompressionStrategy::default();
}

2. 热/冷数据分层

#![allow(unused)]
fn main() {
// 热数据(最近 1 天):快速压缩
let hot_strategy = CompressionStrategy::high_performance();

// 温数据(最近 1 周):平衡压缩
let warm_strategy = CompressionStrategy::default();

// 冷数据(历史归档):高压缩
let cold_strategy = CompressionStrategy::high_compression();
}

3. 避免常见错误

#![allow(unused)]
fn main() {
// ❌ 错误:高频数据使用高压缩
let strategy = CompressionStrategy {
    market_data: CompressionAlgorithm::Zstd9, // 会导致写入延迟
    ..Default::default()
};

// ✅ 正确:高频数据使用快速压缩
let strategy = CompressionStrategy {
    market_data: CompressionAlgorithm::Lz4,
    ..Default::default()
};
}

🔍 故障排查

问题 1: 写入延迟过高

症状: Parquet 写入 P99 > 100ms

排查:

  1. 检查是否使用了 ZSTD(9) 等高压缩级别
  2. 检查数据量是否过大

解决:

#![allow(unused)]
fn main() {
// 降低压缩级别
strategy.market_data = CompressionAlgorithm::Lz4;
strategy.trading = CompressionAlgorithm::Zstd1;
}

问题 2: 磁盘空间不足

症状: 磁盘使用率持续增长

排查:

  1. 检查是否使用了 Uncompressed 或 Snappy
  2. 检查 Compaction 是否正常运行

解决:

#![allow(unused)]
fn main() {
// 提高压缩率
let strategy = CompressionStrategy::high_compression();
}

📚 相关文档


返回核心模块 | 返回文档中心