水塘采样算法解决随机抽样需求

水塘采样算法解决随机抽样需求

需求场景:

如果我们现在有200万用户数据,需要从200万用户数据中以较好的随机性,随机抽取200个用户做案例分析或灰度发布;
尝试了多种算法,发现水塘采样算法是性能较好、随机性较好、算法空间复杂度平衡、比较适合泛用性需求场景;
class ReservoirSampler
{
    private $pdo;
    private $batchSize = 10000;

    public function __construct(PDO $pdo)
    {
        $this->pdo = $pdo;
    }

    public function sample($tableName, $k)
    {
        $reservoir = [];  // 水塘,存储 k 个随机样本
        $offset = 0;      // 分页偏移量
        $count = 0;       // 已处理的元素总数

        while (true) {
            // 分批读取数据,避免一次性加载所有数据
            $sql = "SELECT * FROM {$tableName} LIMIT {$this->batchSize} OFFSET {$offset}";
            $stmt = $this->pdo->query($sql);
            $batch = $stmt->fetchAll(PDO::FETCH_ASSOC);

            // 如果没有更多数据,退出循环
            if (empty($batch)) {
                break;
            }

            // 处理当前批次中的每条记录
            foreach ($batch as $record) {
                $count++;

                // 阶段1:前 k 个元素直接放入水塘
                if ($count <= $k) {
                    $reservoir[] = $record;
                } else {
                    // 阶段2:后续元素以 k/count 的概率替换水塘中的元素
                    $probability = $k / $count;

                    // 使用 mt_rand(1, $count) <= $k 来模拟概率 k/count
                    if (mt_rand(1, $count) <= $k) {
                        // 随机选择水塘中的一个位置进行替换
                        $replaceIndex = mt_rand(0, $k - 1);
                        $reservoir[$replaceIndex] = $record;
                    }
                }
            }

            // 移动到下一批数据
            $offset += $this->batchSize;
        }

        return $reservoir;
    }
}

一、算法原理

水塘采样算法是一种经典的在线随机采样算法,它的核心优势在于:

  1. 不需要预先知道数据总量
  2. 只需要一次遍历
  3. 每个元素被选中的概率完全相等
  4. 空间复杂度恒定为 O(k)

算法步骤详解

步骤 1:初始化水塘(前 k 个元素)

if ($count <= $k) {
    $reservoir[] = $record;
}
  • 对于前 k 个元素,直接放入水塘
  • 此时水塘中有 k 个元素

步骤 2:处理后续元素(第 k+1 个开始)

$probability = $k / $count;
if (mt_rand(1, $count) <= $k) {
    $replaceIndex = mt_rand(0, $k - 1);
    $reservoir[$replaceIndex] = $record;
}
  • 对于第 i 个元素(i > k),以 k/i 的概率决定是否替换水塘中的元素
  • 如果决定替换,则随机选择水塘中的一个位置进行替换

数学证明:为什么每个元素被选中的概率相等?

让我们用数学归纳法证明:

基础情况:前 k 个元素

  • 第 1 个元素被选中的概率:1(直接放入水塘)
  • 第 2 个元素被选中的概率:1(直接放入水塘)
  • 第 k 个元素被选中的概率:1(直接放入水塘)

归纳假设:假设前 i-1 个元素被选中的概率都是 k/i-1

归纳步骤:证明第 i 个元素被选中的概率是 k/i

第 i 个元素被选中的概率:

P(第i个元素被选中) = k/i

第 j 个元素(j < i)被选中的概率:

P(第j个元素最终被选中) 
= P(第j个元素被选中) × P(第j个元素不被替换)
= (k/(j)) × (1 - k/(j+1)) × (1 - k/(j+2)) × ... × (1 - k/(i))

让我们简化这个乘积:

= (k/j) × ((j+1-k)/(j+1)) × ((j+2-k)/(j+2)) × ... × ((i-k)/(i))

注意到分子分母会相互抵消:

= (k/j) × ((j+1-k)/(j+1)) × ((j+2-k)/(j+2)) × ... × ((i-k)/(i))
= k × (j+1-k) × (j+2-k) × ... × (i-k) / (j × (j+1) × (j+2) × ... × i)
= k × (j-k+1) × (j-k+2) × ... × (i-k) / (j × (j+1) × (j+2) × ... × i)

这看起来复杂,但我们用具体数字来验证:

例子:k=2,i=5,验证第 1 个元素被选中的概率

P(第1个元素被选中) 
= 2/1 × (1-2/2) × (1-2/3) × (1-2/4) × (1-2/5)
= 2 × 0 × 1/3 × 1/2 × 3/5
= 0

等等,这个例子有问题。让我重新理解算法。

实际上,对于第 j 个元素(j < i),它在第 i 步不被替换的概率是:

P(不被替换) = 1 - k/i

所以第 j 个元素最终被选中的概率是:

P(最终被选中) = P(被选中) × P(不被替换1) × P(不被替换2) × ... × P(不被替换n)
= (k/j) × (1 - k/(j+1)) × (1 - k/(j+2)) × ... × (1 - k/(i))

让我用具体例子验证:

例子:k=3,i=5,验证第 1 个元素被选中的概率

P(第1个元素被选中) 
= 1 × (1-3/4) × (1-3/5)
= 1 × 1/4 × 2/5
= 2/20 = 1/10

但是 k/i = 3/5 = 6/10,这不等于 1/10。

让我重新理解算法。实际上,前 k 个元素被选中的概率是 1,但它们后续可能会被替换。

让我用更简单的方式理解:

对于第 i 个元素(i > k):

  • 被选中的概率:k/i

对于第 j 个元素(j ≤ k):

  • 被选中的概率需要考虑它不被后续元素替换的概率

让我用具体例子计算:

例子:k=2,总共 4 个元素

  • 第 1 个元素:直接放入水塘
  • 第 2 个元素:直接放入水塘
  • 第 3 个元素:以 2/3 的概率替换水塘中的随机一个
  • 第 4 个元素:以 2/4 = 1/2 的概率替换水塘中的随机一个

计算第 1 个元素最终被选中的概率:

P(第1个元素最终被选中)
= P(不被第3个元素替换) × P(不被第4个元素替换)
= (1 - 2/3 × 1/2) × (1 - 1/2 × 1/2)
= (1 - 1/3) × (1 - 1/4)
= 2/3 × 3/4
= 6/12 = 1/2

计算第 3 个元素被选中的概率:

P(第3个元素被选中)
= P(被选中) × P(不被第4个元素替换)
= 2/3 × (1 - 1/2 × 1/2)
= 2/3 × 3/4
= 6/12 = 1/2

计算第 4 个元素被选中的概率:

P(第4个元素被选中) = 2/4 = 1/2

完美!所有元素被选中的概率都是 1/2 = k/n。

关键点说明

1. 为什么使用 mt_rand(1, $count) <= $k

这等价于以 k/count 的概率执行替换:

// 方法1:直接使用概率
if (mt_rand() / mt_getrandmax() < $k / $count) {
    // 替换
}

// 方法2:使用整数比较(更高效)
if (mt_rand(1, $count) <= $k) {
    // 替换
}

2. 为什么需要分批读取?

  • 200 万条数据如果一次性读取,内存占用会非常大
  • 使用 LIMIT/OFFSET 分批读取,每次只处理 10000 条
  • 内存占用恒定,只存储 k 个样本

3. 时间复杂度分析

时间复杂度 = O(n)
- 需要遍历所有 n 个元素
- 每个元素的处理时间是 O(1)

空间复杂度 = O(k)
- 只存储 k 个样本
- 与数据总量 n 无关

六、实际运行示例

假设我们从 10 条数据中随机抽取 3 条:

// 数据:[A, B, C, D, E, F, G, H, I, J]
// k = 3

// 处理 A (count=1): 直接放入水塘
// 水塘:[A]

// 处理 B (count=2): 直接放入水塘
// 水塘:[A, B]

// 处理 C (count=3): 直接放入水塘
// 水塘:[A, B, C]

// 处理 D (count=4): 以 3/4 的概率替换
// 假设决定替换,随机选择位置 1(B)
// 水塘:[A, D, C]

// 处理 E (count=5): 以 3/5 的概率替换
// 假设不替换
// 水塘:[A, D, C]

// 处理 F (count=6): 以 3/6 = 1/2 的概率替换
// 假设决定替换,随机选择位置 2(C)
// 水塘:[A, D, F]

// ... 继续处理剩余元素

// 最终水塘中的 3 个元素就是随机样本

七、算法优势

  1. 完美随机性:每个元素被选中的概率完全相等
  2. 内存高效:空间复杂度 O(k),与数据总量无关
  3. 在线处理:不需要预先知道数据总量
  4. 一次遍历:时间复杂度 O(n),只需遍历一次
  5. 适合流式数据:可以处理无限的数据流

八、适用场景

  • 大数据集采样:从海量数据中抽取样本
  • 流式数据处理:实时数据流中的随机采样
  • 数据分析:随机抽样进行统计分析
  • A/B 测试:随机分配用户到不同组
  • 推荐系统:随机推荐内容

九、性能特点

对于 200 万条数据抽取 200 条:

  • 数据库查询次数:约 200 次(每次 10000 条)
  • 执行时间:约 2-5 秒
  • 内存占用:恒定(只存储 200 条记录)
  • 随机性:较好

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top