跳转至

项目一:构建"Mini-C4"预训练集

1. 项目背景 (Project Brief)

  • 任务定义: 构建一个微缩版 C4 (Colossal Clean Crawled Corpus) 数据集流水线。我们的目标是将杂乱无章的原始网页数据(Common Crawl)转化为低噪、去重、高质量的纯文本数据,使其达到可以直接输入 LLM 进行预训练的标准。
  • 输入与输出:
    • Input: Common Crawl 的原始 WARC 压缩包(.warc.gz),包含 HTTP 响应头、HTML 源码、乱码及二进制数据。
    • Output: 分类分级后的 JSONL 文件(final_data.jsonl),包含纯净文本、语言标签及困惑度(Perplexity)评分。
  • 难点分析:
    • 信噪比极低: 原始网页中 90% 以上是导航栏、广告、SEO 关键词堆砌、JavaScript 代码和无意义的占位符。
    • 去重复杂度: 在海量文本中找出“语义重复”而非“完全一致”的文档(Fuzzy Deduplication),计算量呈指数级增长。
    • 质量量化: 如何不依赖昂贵的 API(如 GPT-4 打分),仅凭统计学方法自动判断一句话是“人类高质量语言”还是“机器生成的垃圾”?

2. 架构设计 (Architecture Design)

为了处理非结构化的 Web 数据,我们设计了如下的漏斗型(Funnel)处理架构,层层过滤噪声:

数据流水线图:

图1:构建"Mini-C4"预训练集数据流水线图

(图注:数据流向为 Raw WARC -> Text Extraction -> Heuristic Filtering -> MinHash Deduplication -> LangID & PPL Filtering -> Final Dataset)

技术栈清单:

组件 选型 决策理由
下载/解析 warcio, trafilatura warcio 是处理 WARC 标准的官方库;trafilatura 相比 BeautifulSoup,在提取正文(去除导航/页脚)方面有显著优势,且速度更快。
分布式计算 Ray Python 的 multiprocessing 在处理大规模数据共享时开销较大。Ray 提供了极其简单的 Actor 模型,能让我们用几行代码将 MinHash 计算并行化到多核 CPU 甚至集群上。
去重算法 MinHash LSH 传统的两两比对复杂度为 \(O(N^2)\),无法通过。利用 LSH(局部敏感哈希)将复杂度降为 \(O(N)\),且 datasketch 库实现了高效的内存索引。
质量评估 KenLM 轻量级 N-gram 语言模型库。在 GPT-3 和 CCNet 论文中,均使用 KenLM 的困惑度(Perplexity)作为衡量文本自然度的核心指标,计算速度极快(C++底层)。

3. Step-by-Step 实战 (Implementation)

阶段一:从 HTML 泥潭中提取正文 (Extraction & Cleaning)

原始 WARC 文件包含大量非文本噪声。我们首先使用 warcio 流式读取压缩包,并利用 trafilatura 提取核心内容。

关键代码解析:流式处理与解析2_process_warc.py 中,我们避免将整个 WARC 文件读入内存,而是利用 ArchiveIterator 进行流式处理:

from warcio.archiveiterator import ArchiveIterator
import trafilatura

# ... (省略文件打开逻辑)
for record in ArchiveIterator(stream):
    if record.rec_type == 'response':
        # 1. 过滤非 HTML 内容
        content_type = record.http_headers.get_header('Content-Type')
        if not content_type or 'text/html' not in content_type:
            continue

        # 2. 核心解析逻辑
        # include_comments=False: 去除网友评论,通常质量较低
        # no_fallback=False: 允许尝试多种解析策略以提高召回率
        text = trafilatura.extract(
            record.content_stream().read(), 
            include_comments=False, 
            include_tables=False, 
            no_fallback=False
        )

🔍 启发式清洗规则 (Heuristic Rules) 提取后的文本仍然包含大量“垃圾”。在 3_clean_data.py 中,我们实施了 Gopher 和 C4 论文中经典的启发式规则:

  1. 平均词长过滤: 正常的英语/中文文本,平均词长在固定范围内。如果平均词长 > 15 字符,通常是代码 minified 后的产物或 URL 列表。
  2. 符号密度 (Symbol Ratio): 统计 { } [ ] < > \ 等代码符号的占比。如果超过 10%,视为代码片段。
  3. 黑名单 (Blocklist): 包含 "lorem ipsum", "enable cookies" 等无意义内容的文本直接丢弃。
# 3_clean_data.py 核心片段
def is_high_quality(text):
    # 规则: 符号密度检查
    code_symbols = {'{', '}', '[', ']', '<', '>', '\\'}
    symbol_count = sum(1 for char in text if char in code_symbols)
    if len(text) > 0 and (symbol_count / len(text) > 0.1): 
        return False # 代码符号过多

    # 规则: 关键词黑名单
    bad_phrases = ["lorem ipsum", "enable cookies", "403 forbidden"]
    if any(p in text.lower() for p in bad_phrases):
        return False
    return True

阶段二:分布式 MinHash 去重 (Deduplication)

互联网上存在大量重复内容(转载、镜像)。我们使用 Ray 实现并行的 MinHash 计算。

关键代码解析:Ray Actor 模式4_deduplicate.py 中,我们定义了一个 Ray 任务来并行计算哈希签名(Signature),主进程只负责构建索引。

import ray
from datasketch import MinHash

@ray.remote
def process_batch(lines, batch_id):
    """Ray Worker: 并行计算一批数据的 MinHash 指纹"""
    results = []
    for line in lines:
        item = json.loads(line)
        m = MinHash(num_perm=128) # C4 标准参数
        # Shingling: 按单词更新哈希
        for w in item['text'].split():
            m.update(w.encode('utf8'))
        results.append((item['url'], m, item['text']))
    return results

# 主流程:Map-Reduce 风格
# Map: 将数据切分 batch 分发给所有 CPU 核心
futures = [process_batch.remote(batch, i) for i, batch in enumerate(batches)]
# Reduce: 收集结果
results = ray.get(futures)

🛠️ 调试与踩坑记录 (Debugging)

  • 序列化开销: 传递大量小对象给 Ray 任务会产生巨大的序列化开销。
  • Fix: 将数据打包成 batch (如 1000 条一组) 再发送给 Worker,显著提升了吞吐量。

阶段三:语言识别与困惑度过滤 (Quality Filtering)

清洗后的数据混合了多种语言且质量参差不齐。我们先用 FastText 分流语言,再用 KenLM 计算困惑度(Perplexity)。

关键代码解析:KenLM 归一化得分 困惑度越低,代表句子越通顺。在 6_quality_filter.py 中,我们计算 Log Score 并按长度归一化。

import kenlm
import fasttext

# 1. 语言分流 (来自 5_split_lang.py)
lid_model = fasttext.load_model('lid.176.ftz')
def predict_lang(text):
    # k=1 取概率最高的语言
    predictions = lid_model.predict(text, k=1)
    return predictions[0][0].replace('__label__', '')

# 2. 困惑度过滤 (来自 6_quality_filter.py)
kenlm_model = kenlm.Model('en.arpa.bin')
PERPLEXITY_THRESHOLD = -6.0  # 经验阈值:低于此值通常为低质量文本

def filter_by_perplexity(text):
    words = text.split()
    # 计算归一化得分 (Log Score / Length)
    # 注意:KenLM 返回的是 log10 概率,数值通常为负
    log_score = model.score(text)
    normalized_score = log_score / len(words)

    # 比如 -5.0 (高质量) > -6.0 (阈值) -> 保留
    if normalized_score > PERPLEXITY_THRESHOLD:
        return True, normalized_score
    return False, normalized_score

📉 参数调优经验 我们通过采样观察确定了 -6.0 这个阈值:

  • Score > -5.0: 极其通顺的新闻、维基百科。
  • Score -5.0 ~ -6.0: 普通的博客、论坛讨论。
  • Score < -6.5: 主要是关键词列表、破碎的句子、严重的语法错误(如机器翻译失败的结果)。

4. 效果展示 (Showcase)

经过这一套 Pipeline 处理,数据的面貌发生了根本性变化。以下是基于单次 Crawl (约 1GB WARC) 的实际处理统计:

数据漏斗统计 (Data Funnel):

处理阶段 输入记录数 输出记录数 留存率 主要损耗原因
原始 WARC ~35,000 ~10,000 28% 非 HTML 响应、空内容、Trafilatura 解析失败
启发式清洗 10,000 ~6,500 65% 长度过短、符号密度过高(代码)、黑名单
去重 (LSH) 6,500 ~4,800 73% 转载文章、镜像站点、模板文字
语言/质量过滤 4,800 ~3,900 81% 非英文内容、高困惑度(乱码/不通顺)
Total 35,000 3,900 ~11% 最终产出率为 11%

Case Study: 过滤效果对比

Case 1: 导航栏噪声 (已去除) Raw: "Home | About Us | Contact | Enable Cookies | Copyright 2023..." Result: [已丢弃] (触发短文本和关键词黑名单规则)

Case 2: 代码片段 (已去除) Raw: "function(x) { return x > 0 ? true : false; } var a = [1,2,3];" Result: [已丢弃] (触发符号密度 > 10% 规则)

Case 3: 高质量正文 (保留并评分) Raw: "The James Webb Space Telescope has captured a new image of the Pillars of Creation..." Result: [保留] KenLM Score: -4.82 (优于阈值 -6.0)

数据统计: 在单次 Crawl 的采样测试中: * 原始记录: 10,000 条 * 提取有效文本: 约 4,500 条 (HTML 解析损耗) * 清洗后剩余: 约 2,800 条 (启发式过滤损耗) * 去重后剩余: 约 2,100 条 (重复率约 25%) * 最终高质量集: 约 1,800 条 (KenLM 过滤)

5. 成本与优化 (Cost & Optimization)

资源消耗

  • 计算: 本项目代码在单机 16核 CPU、64G 内存环境下,处理 1GB WARC 数据耗时约 5-8 分钟。
  • 瓶颈: MinHashLSH 的索引构建目前是单线程的(在 4_deduplicate.py 中),且完全依赖内存。

扩展性思考(Scaling to TBs)

如果数据量扩大到 PB 级别(如真实的 C4 数据集),当前架构需要进行以下升级:

  1. LSH 存储分离: 内存无法装下亿级别的 Hash 索引。需将 MinHashLSH 改为使用 RedisCassandra 持久化存储哈希桶。

  2. 并行策略升级: 将 Ray 任务从“单机多核”扩展到“多机集群”。Worker 节点只负责计算 Hash,Master 节点或数据库负责冲突检测。

  3. IO 优化: 数据读取需从本地文件系统迁移至 S3/MinIO,并使用 PyArrow 进行流式列存处理,减少 Python 对象开销。

  4. CCNet 方案: 对于超大规模去重,可以参考 CCNet 的做法,先将文件按 Hash 值分桶(Sharding),在桶内进行局部去重,再进行全局合并。