项目一:构建"Mini-C4"预训练集¶
1. 项目背景 (Project Brief)¶
- 任务定义: 构建一个微缩版 C4 (Colossal Clean Crawled Corpus) 数据集流水线。我们的目标是将杂乱无章的原始网页数据(Common Crawl)转化为低噪、去重、高质量的纯文本数据,使其达到可以直接输入 LLM 进行预训练的标准。
- 输入与输出:
- Input: Common Crawl 的原始 WARC 压缩包(
.warc.gz),包含 HTTP 响应头、HTML 源码、乱码及二进制数据。 - Output: 分类分级后的 JSONL 文件(
final_data.jsonl),包含纯净文本、语言标签及困惑度(Perplexity)评分。
- Input: Common Crawl 的原始 WARC 压缩包(
- 难点分析:
- 信噪比极低: 原始网页中 90% 以上是导航栏、广告、SEO 关键词堆砌、JavaScript 代码和无意义的占位符。
- 去重复杂度: 在海量文本中找出“语义重复”而非“完全一致”的文档(Fuzzy Deduplication),计算量呈指数级增长。
- 质量量化: 如何不依赖昂贵的 API(如 GPT-4 打分),仅凭统计学方法自动判断一句话是“人类高质量语言”还是“机器生成的垃圾”?
2. 架构设计 (Architecture Design)¶
为了处理非结构化的 Web 数据,我们设计了如下的漏斗型(Funnel)处理架构,层层过滤噪声:
数据流水线图:
(图注:数据流向为 Raw WARC -> Text Extraction -> Heuristic Filtering -> MinHash Deduplication -> LangID & PPL Filtering -> Final Dataset)
技术栈清单:
| 组件 | 选型 | 决策理由 |
|---|---|---|
| 下载/解析 | warcio, trafilatura |
warcio 是处理 WARC 标准的官方库;trafilatura 相比 BeautifulSoup,在提取正文(去除导航/页脚)方面有显著优势,且速度更快。 |
| 分布式计算 | Ray |
Python 的 multiprocessing 在处理大规模数据共享时开销较大。Ray 提供了极其简单的 Actor 模型,能让我们用几行代码将 MinHash 计算并行化到多核 CPU 甚至集群上。 |
| 去重算法 | MinHash LSH |
传统的两两比对复杂度为 \(O(N^2)\),无法通过。利用 LSH(局部敏感哈希)将复杂度降为 \(O(N)\),且 datasketch 库实现了高效的内存索引。 |
| 质量评估 | KenLM |
轻量级 N-gram 语言模型库。在 GPT-3 和 CCNet 论文中,均使用 KenLM 的困惑度(Perplexity)作为衡量文本自然度的核心指标,计算速度极快(C++底层)。 |
3. Step-by-Step 实战 (Implementation)¶
阶段一:从 HTML 泥潭中提取正文 (Extraction & Cleaning)¶
原始 WARC 文件包含大量非文本噪声。我们首先使用 warcio 流式读取压缩包,并利用 trafilatura 提取核心内容。
关键代码解析:流式处理与解析
在 2_process_warc.py 中,我们避免将整个 WARC 文件读入内存,而是利用 ArchiveIterator 进行流式处理:
from warcio.archiveiterator import ArchiveIterator
import trafilatura
# ... (省略文件打开逻辑)
for record in ArchiveIterator(stream):
if record.rec_type == 'response':
# 1. 过滤非 HTML 内容
content_type = record.http_headers.get_header('Content-Type')
if not content_type or 'text/html' not in content_type:
continue
# 2. 核心解析逻辑
# include_comments=False: 去除网友评论,通常质量较低
# no_fallback=False: 允许尝试多种解析策略以提高召回率
text = trafilatura.extract(
record.content_stream().read(),
include_comments=False,
include_tables=False,
no_fallback=False
)
🔍 启发式清洗规则 (Heuristic Rules)
提取后的文本仍然包含大量“垃圾”。在 3_clean_data.py 中,我们实施了 Gopher 和 C4 论文中经典的启发式规则:
- 平均词长过滤: 正常的英语/中文文本,平均词长在固定范围内。如果平均词长 > 15 字符,通常是代码 minified 后的产物或 URL 列表。
- 符号密度 (Symbol Ratio): 统计
{ } [ ] < > \等代码符号的占比。如果超过 10%,视为代码片段。 - 黑名单 (Blocklist): 包含 "lorem ipsum", "enable cookies" 等无意义内容的文本直接丢弃。
# 3_clean_data.py 核心片段
def is_high_quality(text):
# 规则: 符号密度检查
code_symbols = {'{', '}', '[', ']', '<', '>', '\\'}
symbol_count = sum(1 for char in text if char in code_symbols)
if len(text) > 0 and (symbol_count / len(text) > 0.1):
return False # 代码符号过多
# 规则: 关键词黑名单
bad_phrases = ["lorem ipsum", "enable cookies", "403 forbidden"]
if any(p in text.lower() for p in bad_phrases):
return False
return True
阶段二:分布式 MinHash 去重 (Deduplication)¶
互联网上存在大量重复内容(转载、镜像)。我们使用 Ray 实现并行的 MinHash 计算。
关键代码解析:Ray Actor 模式
在 4_deduplicate.py 中,我们定义了一个 Ray 任务来并行计算哈希签名(Signature),主进程只负责构建索引。
import ray
from datasketch import MinHash
@ray.remote
def process_batch(lines, batch_id):
"""Ray Worker: 并行计算一批数据的 MinHash 指纹"""
results = []
for line in lines:
item = json.loads(line)
m = MinHash(num_perm=128) # C4 标准参数
# Shingling: 按单词更新哈希
for w in item['text'].split():
m.update(w.encode('utf8'))
results.append((item['url'], m, item['text']))
return results
# 主流程:Map-Reduce 风格
# Map: 将数据切分 batch 分发给所有 CPU 核心
futures = [process_batch.remote(batch, i) for i, batch in enumerate(batches)]
# Reduce: 收集结果
results = ray.get(futures)
🛠️ 调试与踩坑记录 (Debugging)
- 序列化开销: 传递大量小对象给 Ray 任务会产生巨大的序列化开销。
- Fix: 将数据打包成
batch(如 1000 条一组) 再发送给 Worker,显著提升了吞吐量。
阶段三:语言识别与困惑度过滤 (Quality Filtering)¶
清洗后的数据混合了多种语言且质量参差不齐。我们先用 FastText 分流语言,再用 KenLM 计算困惑度(Perplexity)。
关键代码解析:KenLM 归一化得分
困惑度越低,代表句子越通顺。在 6_quality_filter.py 中,我们计算 Log Score 并按长度归一化。
import kenlm
import fasttext
# 1. 语言分流 (来自 5_split_lang.py)
lid_model = fasttext.load_model('lid.176.ftz')
def predict_lang(text):
# k=1 取概率最高的语言
predictions = lid_model.predict(text, k=1)
return predictions[0][0].replace('__label__', '')
# 2. 困惑度过滤 (来自 6_quality_filter.py)
kenlm_model = kenlm.Model('en.arpa.bin')
PERPLEXITY_THRESHOLD = -6.0 # 经验阈值:低于此值通常为低质量文本
def filter_by_perplexity(text):
words = text.split()
# 计算归一化得分 (Log Score / Length)
# 注意:KenLM 返回的是 log10 概率,数值通常为负
log_score = model.score(text)
normalized_score = log_score / len(words)
# 比如 -5.0 (高质量) > -6.0 (阈值) -> 保留
if normalized_score > PERPLEXITY_THRESHOLD:
return True, normalized_score
return False, normalized_score
📉 参数调优经验
我们通过采样观察确定了 -6.0 这个阈值:
- Score > -5.0: 极其通顺的新闻、维基百科。
- Score -5.0 ~ -6.0: 普通的博客、论坛讨论。
- Score < -6.5: 主要是关键词列表、破碎的句子、严重的语法错误(如机器翻译失败的结果)。
4. 效果展示 (Showcase)¶
经过这一套 Pipeline 处理,数据的面貌发生了根本性变化。以下是基于单次 Crawl (约 1GB WARC) 的实际处理统计:
数据漏斗统计 (Data Funnel):
| 处理阶段 | 输入记录数 | 输出记录数 | 留存率 | 主要损耗原因 |
|---|---|---|---|---|
| 原始 WARC | ~35,000 | ~10,000 | 28% | 非 HTML 响应、空内容、Trafilatura 解析失败 |
| 启发式清洗 | 10,000 | ~6,500 | 65% | 长度过短、符号密度过高(代码)、黑名单 |
| 去重 (LSH) | 6,500 | ~4,800 | 73% | 转载文章、镜像站点、模板文字 |
| 语言/质量过滤 | 4,800 | ~3,900 | 81% | 非英文内容、高困惑度(乱码/不通顺) |
| Total | 35,000 | 3,900 | ~11% | 最终产出率为 11% |
Case Study: 过滤效果对比
Case 1: 导航栏噪声 (已去除) Raw: "Home | About Us | Contact | Enable Cookies | Copyright 2023..." Result: [已丢弃] (触发短文本和关键词黑名单规则)
Case 2: 代码片段 (已去除) Raw: "function(x) { return x > 0 ? true : false; } var a = [1,2,3];" Result: [已丢弃] (触发符号密度 > 10% 规则)
Case 3: 高质量正文 (保留并评分) Raw: "The James Webb Space Telescope has captured a new image of the Pillars of Creation..." Result: [保留] KenLM Score: -4.82 (优于阈值 -6.0)
数据统计: 在单次 Crawl 的采样测试中: * 原始记录: 10,000 条 * 提取有效文本: 约 4,500 条 (HTML 解析损耗) * 清洗后剩余: 约 2,800 条 (启发式过滤损耗) * 去重后剩余: 约 2,100 条 (重复率约 25%) * 最终高质量集: 约 1,800 条 (KenLM 过滤)
5. 成本与优化 (Cost & Optimization)¶
资源消耗¶
- 计算: 本项目代码在单机 16核 CPU、64G 内存环境下,处理 1GB WARC 数据耗时约 5-8 分钟。
- 瓶颈:
MinHashLSH的索引构建目前是单线程的(在4_deduplicate.py中),且完全依赖内存。
扩展性思考(Scaling to TBs)¶
如果数据量扩大到 PB 级别(如真实的 C4 数据集),当前架构需要进行以下升级:
-
LSH 存储分离: 内存无法装下亿级别的 Hash 索引。需将
MinHashLSH改为使用 Redis 或 Cassandra 持久化存储哈希桶。 -
并行策略升级: 将 Ray 任务从“单机多核”扩展到“多机集群”。Worker 节点只负责计算 Hash,Master 节点或数据库负责冲突检测。
-
IO 优化: 数据读取需从本地文件系统迁移至 S3/MinIO,并使用 PyArrow 进行流式列存处理,减少 Python 对象开销。
-
CCNet 方案: 对于超大规模去重,可以参考 CCNet 的做法,先将文件按 Hash 值分桶(Sharding),在桶内进行局部去重,再进行全局合并。
