跳转至

第4章:清洗与去噪


本章摘要

从互联网获取的原始数据就像未经加工的矿石,其中真正有价值的"精矿"可能只占很小的比例。本章将深入探讨预训练数据清洗的三大核心技术:启发式过滤规则用于剔除低质量文档,大规模去重技术用于消除重复内容,隐私数据清洗用于保护用户信息。掌握这些技术后,读者将能够构建工业级的数据清洗流水线,将原始网页数据转化为高质量的预训练语料。


场景引入

经过上一章的努力,你的团队成功从 Common Crawl 提取了 50TB 的中文网页文本。然而,当你随机抽样查看数据时,发现了各种令人头疼的问题:大量页面只有几个字的导航文本,完全没有实质内容;有些页面全是 JavaScript 代码或 CSS 样式的残留;某些网站的内容被重复爬取了数百次;还有大量带有用户邮箱、手机号码的敏感信息。

更糟糕的是,训练工程师告诉你,上次用未经清洗的数据训练的模型有严重的"复读机"问题——模型会反复输出相同的句子,甚至能够背诵出某些网站的完整内容。这显然是数据重复导致的。

如何系统性地解决这些问题?本章将给出完整的答案。


4.1 启发式过滤规则

启发式过滤(Heuristic Filtering)是数据清洗的第一道防线。它基于一系列可量化的规则,快速筛选出明显低质量的文档。虽然这些规则看起来简单,但在实践中能够过滤掉大部分噪声数据,是性价比极高的清洗手段。

图4-1:数据清洗流水线

图4-1:数据清洗流水线架构 —— 从原始数据到清洁语料的八阶段处理流程

4.1.1 语言识别

语言识别是多语言数据处理的基础步骤。对于训练中文模型而言,首先需要从 Common Crawl 的海量数据中筛选出中文内容,这就需要准确的语言识别能力。

FastText 语言识别器是目前最常用的工具。它由 Facebook AI Research 开发,预训练模型支持 176 种语言的识别,速度极快,准确率也相当高。FastText 提供两个预训练模型:lid.176.bin 是完整版本,准确率更高但体积较大(约 126MB);lid.176.ftz 是压缩版本,体积小(约 917KB)但准确率略低。对于大规模数据处理,建议使用完整版本。

import fasttext

# 加载语言识别模型
lang_model = fasttext.load_model('lid.176.bin')

def detect_language(text: str, min_confidence: float = 0_8) -> tuple:
    """
    识别文本语言

    Args:
        text: 待识别的文本
        min_confidence: 最低置信度阈值

    Returns:
        (语言代码, 置信度) 或 (None, 0) 如果置信度不足
    """
    # 预处理:移除换行符,截取前 1000 字符
    text = text.replace('\n', ' ')[:1000]

    # 预测
    predictions = lang_model.predict(text, k=1)
    lang = predictions[0][0].replace('__label__', '')
    confidence = predictions[1][0]

    if confidence >= min_confidence:
        return lang, confidence
    return None, confidence

def filter_by_language(documents: list, target_lang: str = 'zh') -> list:
    """过滤指定语言的文档"""
    results = []
    for doc in documents:
        lang, conf = detect_language(doc['text'])
        if lang == target_lang:
            doc['detected_lang'] = lang
            doc['lang_confidence'] = conf
            results.append(doc)
    return results

语言识别在实践中会遇到一些边界情况。混合语言的文档(如中英文混杂的技术博客)可能被错误分类。短文本的识别准确率较低,建议对长度不足 50 字符的文本跳过语言过滤。代码片段可能被识别为各种语言,需要结合内容类型进行判断。

4.1.2 文本质量评分

语言识别只能确保文档是目标语言,但无法判断内容质量。一段语法正确的垃圾广告和一篇优质的技术文章,在语言识别上可能得到相同的分数。这就需要更精细的质量评估机制。

困惑度(Perplexity)过滤是一种基于语言模型的质量评估方法。困惑度衡量的是语言模型对文本的"惊讶程度"——如果一段文本与模型训练数据的分布相似,困惑度就低;如果文本包含大量噪声、乱码或不自然的表达,困惑度就高。

KenLM 是计算困惑度最常用的工具。它基于 n-gram 语言模型,速度极快,适合大规模数据处理。

import kenlm

class PerplexityFilter:
    def __init__(self, model_path: str, max_perplexity: float = 500):
        """
        初始化困惑度过滤器

        Args:
            model_path: KenLM 模型路径 (.arpa 或 .bin)
            max_perplexity: 困惑度阈值,超过此值的文档将被过滤
        """
        self.model = kenlm.Model(model_path)
        self.max_perplexity = max_perplexity

    def compute_perplexity(self, text: str) -> float:
        """计算文本的困惑度"""
        # KenLM 返回的是 log10 概率
        log_prob = self.model.score(text, bos=True, eos=True)
        # 转换为困惑度
        num_words = len(text.split()) + 1  # +1 for EOS
        perplexity = 10 ** (-log_prob / num_words)
        return perplexity

    def filter(self, documents: list) -> list:
        """过滤高困惑度文档"""
        results = []
        for doc in documents:
            ppl = self.compute_perplexity(doc['text'])
            if ppl <= self.max_perplexity:
                doc['perplexity'] = ppl
                results.append(doc)
        return results

困惑度阈值的设定需要根据具体数据进行调优。一般而言,高质量的新闻和百科文本困惑度在 100-200 之间,普通网页内容在 200-500 之间,低质量内容(如乱码、机器翻译)通常超过 500。建议先在小规模样本上分析困惑度分布,再确定合适的阈值。

4.1.3 启发式规则集

除了语言识别和困惑度过滤,还有一系列简单但有效的启发式规则,可以快速剔除明显的低质量内容。这些规则的设计来源于对大量数据的观察和经验总结。

长度过滤是最基本的规则。过短的文档(如只有几个词的导航文本)没有训练价值,应该直接移除。过长的文档可能需要截断或分段处理。典型的阈值设定是:最小长度 200 字符或 50 词,最大长度 100,000 字符。

特殊字符比例可以识别出大量噪声内容。如果一个文档中非字母数字字符的比例过高,很可能是代码残留、乱码或格式错误。类似地,数字比例过高可能表示是日志文件或数据表格。

重复行比例可以检测出模板化的低质量页面。如果一个文档中有大量完全相同的行(如导航栏在页面多处重复),说明内容质量较低。

词汇多样性衡量文档的信息丰富程度。一个只使用 10 个不同词汇的文档显然不如使用 500 个不同词汇的文档有价值。常用的指标是 Type-Token Ratio(TTR),即唯一词数与总词数的比值。

以下是一个综合的启发式过滤器实现:

import re
from collections import Counter

class HeuristicFilter:
    def __init__(self, config: dict = None):
        """
        初始化启发式过滤器

        默认配置适用于中文预训练数据
        """
        self.config = config or {
            'min_length': 200,           # 最小字符数
            'max_length': 100000,        # 最大字符数
            'min_words': 50,             # 最小词数
            'max_special_ratio': 0_3,    # 最大特殊字符比例
            'max_digit_ratio': 0_3,      # 最大数字比例
            'max_duplicate_line_ratio': 0_3,  # 最大重复行比例
            'min_avg_word_length': 2,    # 最小平均词长
            'max_avg_word_length': 20,   # 最大平均词长
            'min_unique_word_ratio': 0_1 # 最小词汇多样性
        }

    def check_length(self, text: str) -> bool:
        """检查文档长度"""
        length = len(text)
        return self.config['min_length'] <= length <= self.config['max_length']

    def check_special_chars(self, text: str) -> bool:
        """检查特殊字符比例"""
        if len(text) == 0:
            return False
        special = len(re.findall(r'[^\w\s]', text, re.UNICODE))
        ratio = special / len(text)
        return ratio <= self.config['max_special_ratio']

    def check_digit_ratio(self, text: str) -> bool:
        """检查数字比例"""
        if len(text) == 0:
            return False
        digits = len(re.findall(r'\d', text))
        ratio = digits / len(text)
        return ratio <= self.config['max_digit_ratio']

    def check_duplicate_lines(self, text: str) -> bool:
        """检查重复行比例"""
        lines = [line.strip() for line in text.split('\n') if line.strip()]
        if len(lines) == 0:
            return False
        unique_lines = len(set(lines))
        duplicate_ratio = 1 - (unique_lines / len(lines))
        return duplicate_ratio <= self.config['max_duplicate_line_ratio']

    def check_vocabulary_diversity(self, text: str) -> bool:
        """检查词汇多样性"""
        words = text.split()
        if len(words) < self.config['min_words']:
            return False
        unique_ratio = len(set(words)) / len(words)
        return unique_ratio >= self.config['min_unique_word_ratio']

    def filter(self, text: str) -> tuple:
        """
        应用所有过滤规则

        Returns:
            (是否通过, 失败原因或 None)
        """
        checks = [
            (self.check_length, 'length'),
            (self.check_special_chars, 'special_chars'),
            (self.check_digit_ratio, 'digit_ratio'),
            (self.check_duplicate_lines, 'duplicate_lines'),
            (self.check_vocabulary_diversity, 'vocabulary_diversity')
        ]

        for check_func, name in checks:
            if not check_func(text):
                return False, name

        return True, None

4.1.4 质量分层策略

在实践中,将数据简单地二分为"保留"和"丢弃"往往过于粗暴。更精细的做法是对数据进行质量分层,为不同质量层级的数据赋予不同的采样权重。

一种常见的分层策略是:将数据分为高、中、低三个质量层级。高质量数据(如来自权威网站、通过所有启发式检查、困惑度低)赋予较高的采样权重;中等质量数据赋予正常权重;低质量但仍可接受的数据赋予较低权重。这种策略可以在保证数据多样性的同时,让高质量数据在训练中发挥更大作用。

RefinedWeb 的论文详细记录了他们的分层策略,将数据分为五个层级,每个层级使用不同的过滤阈值。这种精细化的质量管理是构建高质量预训练数据集的关键。

图4-2:质量过滤漏斗

图4-2:数据质量过滤漏斗 —— 从100%原始数据到最终4%清洁语料的逐层过滤过程


4.2 大规模去重

数据重复是预训练数据的大敌。Common Crawl 中,同一篇文章可能被多个网站转载,同一个网页可能在不同月份被反复抓取,导致大量重复内容。研究表明,未经去重的数据会导致模型过拟合于重复内容,产生"复读机"现象,严重影响模型质量。

去重可以分为两个层次:精确去重(Exact Deduplication)移除完全相同的文档;模糊去重(Fuzzy Deduplication)移除高度相似但不完全相同的文档(如转载时略有修改的文章)。在 TB 级数据上,这两种去重都需要高效的算法和分布式实现。

4.2.1 精确去重:哈希方法

精确去重的核心思想是为每个文档计算一个指纹(fingerprint),相同指纹的文档视为重复。最简单的方法是使用 MD5 或 SHA256 等哈希函数。

import hashlib

def compute_hash(text: str) -> str:
    """计算文本的 SHA256 哈希"""
    return hashlib.sha256(text.encode('utf-8')).hexdigest()

def exact_dedup(documents: list) -> list:
    """精确去重:保留每个哈希值的第一个文档"""
    seen_hashes = set()
    results = []

    for doc in documents:
        doc_hash = compute_hash(doc['text'])
        if doc_hash not in seen_hashes:
            seen_hashes.add(doc_hash)
            doc['hash'] = doc_hash
            results.append(doc)

    return results

对于分布式场景,可以使用 Spark 或 Ray 进行并行去重:

import ray

@ray.remote
def compute_hashes_batch(documents: list) -> list:
    """批量计算哈希"""
    return [(compute_hash(doc['text']), doc) for doc in documents]

def distributed_exact_dedup(documents_path: str, output_path: str):
    """分布式精确去重"""
    ds = ray.data.read_parquet(documents_path)

    # 计算哈希
    ds = ds.map(lambda doc: {**doc, 'hash': compute_hash(doc['text'])})

    # 按哈希分组,每组保留第一个
    ds = ds.groupby('hash').map_groups(lambda group: group.head(1))

    # 保存结果
    ds.write_parquet(output_path)

精确去重效率很高,但只能处理完全相同的文档。对于略有差异的重复内容(如同一篇新闻在不同网站的转载,可能有不同的页眉页脚),精确去重无能为力。

4.2.2 模糊去重:MinHash LSH

模糊去重的目标是识别"高度相似但不完全相同"的文档。这是一个计算复杂度很高的问题——朴素地比较任意两个文档需要 O(n²) 的时间复杂度,对于数十亿文档的数据集完全不可行。

MinHash LSH(Locality-Sensitive Hashing)是解决这一问题的核心算法。它的基本思想是:先将文档转换为 n-gram 集合,然后使用 MinHash 技术将集合压缩为固定长度的签名,最后使用 LSH 将相似的签名聚集到同一个桶中。只有落入同一个桶的文档对才需要进行精细比较,大大减少了计算量。

理解 MinHash LSH 需要分三步来看:

第一步是 n-gram 分解。 将文档视为 n-gram(连续 n 个字符或词)的集合。例如,"大模型数据" 的 3-gram 集合为 {"大模型", "模型数", "型数据"}。使用 n-gram 而非整个文档,可以更好地捕捉局部相似性。

第二步是 MinHash 签名。 MinHash 是一种将集合压缩为固定长度签名的技术。两个集合的 Jaccard 相似度可以通过它们 MinHash 签名的匹配程度来近似估计。签名长度越长,估计越准确,但存储和计算开销也越大。

第三步是 LSH 分桶。 将 MinHash 签名分成若干个 band,每个 band 包含若干个 hash 值。如果两个文档在任意一个 band 中的所有 hash 值都相同,则它们被放入同一个桶。调整 band 的数量和每个 band 的大小,可以控制相似度阈值和召回率。

以下是一个完整的 MinHash LSH 实现:

图4-3:MinHash LSH算法

图4-3:MinHash LSH 算法三步骤 —— N-gram分解、MinHash签名计算、LSH分桶,将复杂度从O(n²)降至O(n)

import hashlib
import struct
from typing import Set, List, Tuple
import numpy as np

class MinHashLSH:
    def __init__(self, 
                 num_hashes: int = 128,
                 num_bands: int = 16,
                 ngram_size: int = 5,
                 threshold: float = 0_8):
        """
        初始化 MinHash LSH

        Args:
            num_hashes: MinHash 签名长度
            num_bands: LSH band 数量
            ngram_size: n-gram 大小
            threshold: 相似度阈值(参考值,实际阈值由 band 参数决定)
        """
        self.num_hashes = num_hashes
        self.num_bands = num_bands
        self.rows_per_band = num_hashes // num_bands
        self.ngram_size = ngram_size

        # 生成哈希函数的随机参数
        self.hash_params = [
            (np.random.randint(1, 2**31), np.random.randint(0, 2**31))
            for _ in range(num_hashes)
        ]

        # LSH 桶
        self.buckets = [{} for _ in range(num_bands)]

    def get_ngrams(self, text: str) -> Set[str]:
        """提取 n-gram 集合"""
        text = text.lower().replace(' ', '')
        ngrams = set()
        for i in range(len(text) - self.ngram_size + 1):
            ngrams.add(text[i:i + self.ngram_size])
        return ngrams

    def compute_minhash(self, ngrams: Set[str]) -> np.ndarray:
        """计算 MinHash 签名"""
        signature = np.full(self.num_hashes, np.inf)

        for ngram in ngrams:
            # 计算 ngram 的基础哈希值
            h = int(hashlib.md5(ngram.encode()).hexdigest(), 16)

            # 使用多个哈希函数
            for i, (a, b) in enumerate(self.hash_params):
                hash_val = (a * h + b) % (2**31 - 1)
                if hash_val < signature[i]:
                    signature[i] = hash_val

        return signature.astype(np.uint32)

    def get_bands(self, signature: np.ndarray) -> List[str]:
        """将签名分割为 bands"""
        bands = []
        for i in range(self.num_bands):
            start = i * self.rows_per_band
            end = start + self.rows_per_band
            band = signature[start:end]
            band_hash = hashlib.md5(band.tobytes()).hexdigest()
            bands.append(band_hash)
        return bands

    def insert(self, doc_id: str, text: str):
        """插入文档到 LSH 索引"""
        ngrams = self.get_ngrams(text)
        if len(ngrams) == 0:
            return

        signature = self.compute_minhash(ngrams)
        bands = self.get_bands(signature)

        for band_idx, band_hash in enumerate(bands):
            if band_hash not in self.buckets[band_idx]:
                self.buckets[band_idx][band_hash] = []
            self.buckets[band_idx][band_hash].append(doc_id)

    def find_candidates(self, text: str) -> Set[str]:
        """查找候选相似文档"""
        ngrams = self.get_ngrams(text)
        if len(ngrams) == 0:
            return set()

        signature = self.compute_minhash(ngrams)
        bands = self.get_bands(signature)

        candidates = set()
        for band_idx, band_hash in enumerate(bands):
            if band_hash in self.buckets[band_idx]:
                candidates.update(self.buckets[band_idx][band_hash])

        return candidates

def jaccard_similarity(set1: Set, set2: Set) -> float:
    """计算 Jaccard 相似度"""
    intersection = len(set1 & set2)
    union = len(set1 | set2)
    return intersection / union if union > 0 else 0

4.2.3 分布式去重实践

在 TB 级数据上运行 MinHash LSH 需要精心设计的分布式策略。一个典型的流程包括:

阶段一:签名计算。 并行遍历所有文档,为每个文档计算 MinHash 签名。这一步是完全并行的,可以充分利用分布式计算资源。

阶段二:Band 分组。 将每个文档按 band 值进行分组。相同 band 值的文档被分配到同一个分区,便于后续比较。

阶段三:组内去重。 在每个分区内,对候选重复文档对进行精细的相似度计算,确定真正的重复关系。

阶段四:传递闭包。 如果文档 A 与 B 重复,B 与 C 重复,则 A、B、C 都应视为一组重复。需要计算重复关系的传递闭包。

阶段五:选择保留文档。 在每组重复文档中选择一个代表(通常选择质量最高或长度最长的)保留,其他删除。

import ray

def distributed_fuzzy_dedup(input_path: str, output_path: str, 
                            threshold: float = 0_8):
    """
    分布式模糊去重流水线
    """
    # 读取数据
    ds = ray.data.read_parquet(input_path)

    # 阶段一:计算 MinHash 签名
    def compute_signature(doc):
        lsh = MinHashLSH()
        ngrams = lsh.get_ngrams(doc['text'])
        signature = lsh.compute_minhash(ngrams)
        bands = lsh.get_bands(signature)
        return {**doc, 'signature': signature.tolist(), 'bands': bands}

    ds = ds.map(compute_signature)

    # 阶段二:按 band 值分组,找候选对
    # (这里简化处理,实际实现需要更复杂的分组逻辑)

    # 阶段三&四:组内精确比较,建立重复关系图
    # ...

    # 阶段五:选择保留文档
    # ...

    # 保存结果
    ds.write_parquet(output_path)

实际工程中,推荐使用现成的工具。text-dedup 是一个开源的文本去重库,实现了多种去重算法,包括 MinHash LSH、SimHash、Suffix Array 等,并提供了 Spark 和 Ray 的分布式实现。Dolma 的去重模块也是一个高质量的参考实现。

4.2.4 文档内去重

除了文档级别的去重,还需要处理文档内部的重复内容。常见的情况包括:网页中反复出现的导航栏、页眉页脚;由于 JavaScript 渲染问题导致的内容重复;某些 CMS 系统生成的模板化重复段落。

文档内去重的策略相对简单:将文档按段落或固定长度分块,计算每个块的哈希值,移除重复的块。

def remove_duplicate_paragraphs(text: str, min_length: int = 50) -> str:
    """移除文档内的重复段落"""
    paragraphs = text.split('\n\n')
    seen_hashes = set()
    unique_paragraphs = []

    for para in paragraphs:
        para = para.strip()
        if len(para) < min_length:
            unique_paragraphs.append(para)
            continue

        para_hash = hashlib.md5(para.encode()).hexdigest()
        if para_hash not in seen_hashes:
            seen_hashes.add(para_hash)
            unique_paragraphs.append(para)

    return '\n\n'.join(unique_paragraphs)

def remove_duplicate_ngrams(text: str, n: int = 10, threshold: int = 3) -> str:
    """移除文档内高频重复的 n-gram"""
    words = text.split()
    ngram_counts = Counter()

    # 计算 n-gram 频次
    for i in range(len(words) - n + 1):
        ngram = tuple(words[i:i + n])
        ngram_counts[ngram] += 1

    # 标记需要移除的位置
    remove_positions = set()
    for i in range(len(words) - n + 1):
        ngram = tuple(words[i:i + n])
        if ngram_counts[ngram] >= threshold:
            # 保留第一次出现,移除后续重复
            for j in range(i + n, len(words) - n + 1):
                if tuple(words[j:j + n]) == ngram:
                    for k in range(j, min(j + n, len(words))):
                        remove_positions.add(k)

    # 重建文本
    result_words = [w for i, w in enumerate(words) if i not in remove_positions]
    return ' '.join(result_words)

4.3 隐私数据清洗 (PII Removal)

预训练数据中不可避免地包含个人身份信息(Personally Identifiable Information, PII),如邮箱地址、电话号码、身份证号、银行卡号、家庭住址等。在数据合规要求日益严格的今天(如 GDPR、CCPA、《个人信息保护法》),清洗 PII 不仅是道德责任,也是法律义务。

4.3.1 PII 的类型与风险

PII 可以分为直接标识符和准标识符两类。直接标识符可以单独识别个人身份,如姓名、身份证号、社会保障号、电话号码、电子邮箱。准标识符单独难以识别个人,但组合使用可能导致识别,如出生日期、邮政编码、职业、工作单位。

在预训练数据中保留 PII 存在多重风险。首先是隐私泄露风险:模型可能"记住"训练数据中的敏感信息,在推理时被恶意提取。其次是合规风险:违反数据保护法规可能导致巨额罚款。最后是声誉风险:如果模型输出他人隐私信息,会严重损害企业形象。

图4-4:PII类型与风险

图4-4:PII类型与风险等级 —— 直接标识符(高风险)与准标识符(中风险)的分类

4.3.2 Microsoft Presidio

Presidio 是微软开源的 PII 识别和匿名化工具包,支持多种语言和多种 PII 类型。它采用模块化设计,包含两个核心组件:Analyzer 负责在文本中识别 PII 实体,Anonymizer 负责对识别出的 PII 进行处理(如替换、掩码、删除)。

from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig

# 初始化引擎
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

def analyze_pii(text: str, language: str = 'en') -> list:
    """
    识别文本中的 PII

    Returns:
        PII 实体列表,包含类型、位置和置信度
    """
    results = analyzer.analyze(
        text=text,
        language=language,
        entities=[
            'EMAIL_ADDRESS', 'PHONE_NUMBER', 'CREDIT_CARD',
            'IP_ADDRESS', 'PERSON', 'LOCATION', 'DATE_TIME'
        ]
    )
    return results

def anonymize_pii(text: str, language: str = 'en') -> str:
    """
    匿名化文本中的 PII

    将识别出的 PII 替换为占位符
    """
    # 先识别
    analyzer_results = analyzer.analyze(text=text, language=language)

    # 定义匿名化策略
    operators = {
        'EMAIL_ADDRESS': OperatorConfig('replace', {'new_value': '<EMAIL>'}),
        'PHONE_NUMBER': OperatorConfig('replace', {'new_value': '<PHONE>'}),
        'CREDIT_CARD': OperatorConfig('replace', {'new_value': '<CREDIT_CARD>'}),
        'IP_ADDRESS': OperatorConfig('replace', {'new_value': '<IP>'}),
        'PERSON': OperatorConfig('replace', {'new_value': '<PERSON>'}),
        'LOCATION': OperatorConfig('replace', {'new_value': '<LOCATION>'}),
        'DATE_TIME': OperatorConfig('keep', {})  # 日期时间通常可以保留
    }

    # 匿名化
    anonymized = anonymizer.anonymize(
        text=text,
        analyzer_results=analyzer_results,
        operators=operators
    )

    return anonymized.text

4.3.3 中文 PII 处理

Presidio 对中文的支持相对有限。对于中文预训练数据,通常需要补充基于正则表达式的规则匹配。

import re

class ChinesePIIFilter:
    """中文 PII 过滤器"""

    patterns = {
        'phone': [
            r'1[3-9]\d{9}',  # 手机号
            r'0\d{2,3}-?\d{7,8}',  # 固定电话
        ],
        'id_card': [
            r'\d{17}[\dXx]',  # 18位身份证
            r'\d{15}',  # 15位身份证
        ],
        'email': [
            r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
        ],
        'bank_card': [
            r'\d{16,19}',  # 银行卡号(需要结合上下文判断)
        ],
        'ip_address': [
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
        ],
        'qq': [
            r'[Qq][Qq][::]?\s*\d{5,11}',
            r'[Qq][::]?\s*\d{5,11}',
        ],
        'wechat': [
            r'[Vv][Xx][::]?\s*[a-zA-Z0-9_-]{6,20}',
            r'微信[::]?\s*[a-zA-Z0-9_-]{6,20}',
        ],
    }

    def __init__(self):
        self.compiled_patterns = {}
        for pii_type, patterns in self.patterns.items():
            self.compiled_patterns[pii_type] = [
                re.compile(p) for p in patterns
            ]

    def find_pii(self, text: str) -> list:
        """查找所有 PII"""
        findings = []
        for pii_type, patterns in self.compiled_patterns.items():
            for pattern in patterns:
                for match in pattern.finditer(text):
                    findings.append({
                        'type': pii_type,
                        'value': match.group(),
                        'start': match.start(),
                        'end': match.end()
                    })
        return findings

    def anonymize(self, text: str) -> str:
        """匿名化 PII"""
        findings = self.find_pii(text)

        # 按位置倒序处理,避免替换影响后续位置
        findings.sort(key=lambda x: x['start'], reverse=True)

        for finding in findings:
            placeholder = f"<{finding['type'].upper()}>"
            text = text[:finding['start']] + placeholder + text[finding['end']:]

        return text

4.3.4 PII 处理策略的权衡

PII 处理面临准确率与召回率的权衡。过于激进的过滤可能误伤正常内容(如将普通数字序列误判为电话号码),过于保守则可能遗漏真正的敏感信息。

在实践中,建议采用分层策略。对于高风险 PII(如身份证号、银行卡号),使用较严格的匹配规则,宁可误删也不遗漏。对于中风险 PII(如电话号码、邮箱),使用适中的阈值,平衡准确率和召回率。对于低风险信息(如日期、地点),可以根据具体场景决定是否处理。

另一个重要决策是替换策略。常见的选择包括:完全删除,简单但可能破坏语句流畅性;固定占位符替换(如 <EMAIL>),保留语义信息但可能引入不自然的模式;随机生成替换(如用随机邮箱替换真实邮箱),最接近原始分布但实现复杂。大多数预训练数据集采用占位符替换策略,作为准确率和复杂度的平衡。


4.4 完整清洗流水线

将前面介绍的各个组件串联起来,构建一个完整的数据清洗流水线。

4.4.1 流水线架构

一个工业级的清洗流水线通常包括以下阶段,按顺序执行:

阶段一:格式标准化。 将各种来源的数据转换为统一格式,处理编码问题,提取必要的元数据。

阶段二:语言过滤。 使用 FastText 识别语言,保留目标语言的文档。对于混合语言文档,根据主要语言进行分类。

阶段三:启发式过滤。 应用长度、特殊字符、重复行等启发式规则,快速过滤明显的低质量内容。

阶段四:文档内去重。 移除文档内部的重复段落和重复 n-gram。

阶段五:PII 清洗。 识别并匿名化敏感个人信息。

阶段六:质量评分。 计算困惑度等质量指标,为后续的质量分层提供依据。

阶段七:文档间去重。 使用 MinHash LSH 进行大规模模糊去重,移除高度相似的文档。

阶段八:质量分层与采样。 根据质量评分将数据分层,确定各层的采样权重。

import ray
from dataclasses import dataclass
from typing import Optional

@dataclass
class CleaningConfig:
    """清洗配置"""
    target_language: str = 'zh'
    min_length: int = 200
    max_length: int = 100000
    max_perplexity: float = 500
    dedup_threshold: float = 0_8
    anonymize_pii: bool = True

class DataCleaningPipeline:
    def __init__(self, config: CleaningConfig):
        self.config = config
        self.lang_filter = LanguageFilter(config.target_language)
        self.heuristic_filter = HeuristicFilter()
        self.perplexity_filter = PerplexityFilter(max_ppl=config.max_perplexity)
        self.pii_filter = ChinesePIIFilter() if config.target_language == 'zh' else None
        self.deduplicator = MinHashLSH(threshold=config.dedup_threshold)

    def process_document(self, doc: dict) -> Optional[dict]:
        """处理单个文档"""
        text = doc.get('text', '')

        # 阶段二:语言过滤
        lang, conf = self.lang_filter.detect(text)
        if lang != self.config.target_language:
            return None

        # 阶段三:启发式过滤
        passed, reason = self.heuristic_filter.filter(text)
        if not passed:
            return None

        # 阶段四:文档内去重
        text = remove_duplicate_paragraphs(text)

        # 阶段五:PII 清洗
        if self.config.anonymize_pii and self.pii_filter:
            text = self.pii_filter.anonymize(text)

        # 阶段六:质量评分
        perplexity = self.perplexity_filter.compute_perplexity(text)
        if perplexity > self.config.max_perplexity:
            return None

        return {
            **doc,
            'text': text,
            'language': lang,
            'lang_confidence': conf,
            'perplexity': perplexity
        }

    def run(self, input_path: str, output_path: str):
        """运行完整流水线"""
        # 读取数据
        ds = ray.data.read_parquet(input_path)

        # 阶段一到六:单文档处理
        ds = ds.map(self.process_document)
        ds = ds.filter(lambda x: x is not None)

        # 阶段七:文档间去重
        ds = self.deduplicator.deduplicate(ds)

        # 保存结果
        ds.write_parquet(output_path)

4.4.2 质量监控与迭代

清洗流水线不是一次性任务,而是需要持续监控和迭代优化的过程。建议建立以下监控机制:

过滤率监控:统计每个阶段的过滤率。如果某个阶段突然过滤掉大量数据,可能是阈值设置不当或数据分布发生变化。

样本抽检:定期人工抽检清洗结果,评估过滤规则的准确性。误删的好样本和漏删的坏样本都需要关注。

下游反馈:模型训练后的评测结果是最终的质量验证。如果模型表现不佳,需要回溯分析数据是否存在问题。


4.5 本章小结

本章系统介绍了预训练数据清洗的核心技术。

在启发式过滤方面,语言识别使用 FastText 快速筛选目标语言文档,困惑度过滤使用 KenLM 评估文本质量,启发式规则集涵盖长度、特殊字符、重复行、词汇多样性等多个维度。质量分层策略将数据划分为不同等级,为后续采样提供依据。

在大规模去重方面,精确去重使用哈希方法快速移除完全相同的文档,模糊去重使用 MinHash LSH 算法识别高度相似的内容。分布式实现是处理 TB 级数据的必要手段。文档内去重处理段落和 n-gram 级别的重复。

在隐私清洗方面,PII 识别可以使用 Presidio 或自定义正则规则,匿名化策略需要在准确率和信息保留之间权衡。中文 PII 处理需要特别设计的规则集。

完整的清洗流水线将各个组件串联,按照格式标准化、语言过滤、启发式过滤、文档内去重、PII 清洗、质量评分、文档间去重、质量分层的顺序执行。持续的质量监控和迭代优化是保证数据质量的关键。

图4-5:本章知识结构

图4-5:第4章知识结构 —— 启发式过滤、大规模去重、PII清洗三大核心主题


延伸阅读

关于数据清洗的深入内容,以下资源值得参考:

RefinedWeb 论文详细记录了从 Common Crawl 构建高质量预训练集的完整清洗流程。Dolma 数据集的技术报告介绍了 Allen AI 的清洗策略和工具。text-dedup 开源库(github.com/ChenghaoMou/text-dedup)提供了多种去重算法的实现。Microsoft Presidio 文档(microsoft.github.io/presidio)是 PII 处理的权威参考。CCNet 论文介绍了 Facebook 处理 Common Crawl 数据的方法,特别是困惑度过滤的细节。


下一章预告

在下一章《分词与序列化》中,我们将探讨预训练数据准备的最后一个关键步骤:如何将清洗后的文本转换为模型可以理解的 Token 序列。你将学习 BPE、WordPiece、Unigram 等分词算法的原理与选择,如何为特定领域扩充词表,以及数据混合与课程学习的采样策略。

带着这个问题进入下一章:如果你要训练一个专门处理代码的模型,标准的 GPT-2 分词器会遇到什么问题?