RAGflow流程

RAGflow架构图

查询数据流程

1、问题输入(Questions import):当用户输入一个查询时,问题 通过 Web Nginx 发送到 RAGflow Server 进行处理。
2、查询分析(Query Analyze):RAGflow Server在接收到用户问题后,对问题进行分析,通过关键词提取和嵌入式模型生成查询向量。
3、多路召回(Multi-way Recall):基于查询向量,系统从Elasticsearch中多次召回并检索出多个与问题相关的文档片段(Chunks)。
4、重排序(Re-rank):多次召回的文档片段会按照相关性进行重新排序,选择最符合问题的片段(Chunks)。
5、生成答案(Answer):系统通过调用大语言模型(LLMs)生成答案,并将答案拆分为句子。然后,对每个句子与chunk片段进行相似度比较,最后取相似度最高的chunk生成答案。
6、返回答案:最终,系统将答案返回给用户,完成问题解答流程。
问题输入 -> 查询分析 -> 多路召回 -> 重排序 -> 生成答案 -> 返回答案

文档分析流程

1、文档上传:文档通过 Web Nginx 进入系统,并被传递到 RAGflow Server
2、任务调度(Task Dispatch):RAGflow Server 将上传的文档任务传递给处理模块,由任务执行器(task_executor.py)负责分配和执行具体的处理任务。
3、文档处理:

- Document Parser(文档解析器):解析文档的内容,将其转换为可处理的文本或结构化数据。
- OCR(光学字符识别):如果文档是图片或扫描件,OCR 会识别其中的文字内容。
- Document Layout Analyze(文档布局分析):分析文档的布局,包括段落、标题、表格等结构。
- Table Structure Recognition(表格结构识别):如果文档中包含表格,该模块会识别表格的结构并提取数据。

4、数据分块(Chunking):文档经过处理后,会被划分为多个“Chunk”(分块),General分块方法可自定义Chunk数,分块将存储在Elasticsearch中供后续查询时使用。
5、文档存储:处理后的文档分块会被存入Elasticsearch数据库,并且通过关键词和嵌入信息进行索引,以便能够在用户提出问题时快速检索相关内容。

多路召回

召回的目的:RAGFlow 在聊天中使用了全文搜索和向量搜索的多重调用。多次召回,可以从多个角度、多个数据源获取信息,从而提高检索的全面性、准确性和多样性

  • 向量搜索:即计算查询向量与所有存储向量的相似性得分,返回得分高的记录。常见的相似性计算方法包括:余弦相似性、欧氏距离、曼哈顿距离等。
  • 全文搜索:全文检索是一种比较经典的检索方式,在数据存入时,通过关键词构建倒排索引;在检索时,通过关键词进行全文检索,找到对应的记录。

DeepDoc(任务执行器)

PATH:/ragflow/deepdoc
DeepDoc 是 RAGFlow 的核心组件,它利用视觉信息和解析技术,对文档进行深度理解,提取文本、表格和图像等信息。DeepDoc 的功能模块包括:

  • OCR 技术:支持多种语言和字体,并能够处理复杂的文档布局和图像质量。
  • 布局识别(布局分析识别)技术:RAGFlow 使用 Yolov8 进行 OCR/布局识别/TSR(表格结构识别),识别文档的布局结构,例如标题、段落、表格、图像等。
  • 表格结构识别 (TSR):识别表格的结构,例如行列、表头、单元格合并等,并将其转换为自然语言句子。
  • 文档解析:支持解析 PDF、DOCX、EXCEL 和 PPT 等多种文档格式,并提取文本块、表格和图像等信息。/ragflow/deepdoc/parser/
  • 简历解析:将简历中的非结构化文本解析为结构化数据,例如姓名、联系方式、工作经历、教育背景等。

解析方法

模板描述文件格式
General根据预设的分段标识符对文件进行连续分块。DOCX, EXCEL, PPT, PDF, TXT, JPEG, JPG, PNG, TIF, GIF
Q&A针对问题和回答的文档进行特定的分块。EXCEL, CSV/TXT
Manual处理手册类文档,分块基于章节或段落结构。PDF
Table表格类文件,依据行或列分块。EXCEL, CSV/TXT
Paper科研或学术文章类型的文档,按照章节或段落分块。PDF
Book处理书籍类文档,按章节或页数分块。DOCX, PDF, TXT
Laws法律文档,基于条款或章节进行分块。DOCX, PDF, TXT
Presentation针对演示文档(如幻灯片)进行分块处理。PDF, PPTX
Picture针对图片文件进行分块,可以处理各种图片格式。JPEG, JPG, PNG, TIF, GIF
One整个文档被分为一个块,不进行额外的分割。DOCX, EXCEL, PDF, TXT

Rerank模型

Rerank模型是对初步检索结果的再排序过程
语义检索:语义检索通过理解查询意图和文档内容的语义关系,提供更相关的检索结果。
排序:将检索结果按照相关性从高到低进行排列,确保用户最有可能关注的结果排在前面。

网易有道

https://gitcode.com/gh_mirrors/bc/BCEmbedding/overview

模型名称模型类型支持语种参数量开源权重
bce-reranker-base_v1RerankerModel中英日韩279MHuggingface(推荐), 国内通道, ModelScope, WiseModel

Huggingface(推荐):https://huggingface.co/maidalun1020/bce-reranker-base_v1

跨语种优势

RerankerModel支持中文,英文,日文和韩文。

主要特点

高效且精确的语义检索:RerankerModel采用交叉编码器,可以在第二阶段实现更高精度的语义顺序精排。
有意义的重排序分数:RerankerModel可以提供有意义的语义相关性分数(不仅仅是排序),可以用于过滤无意义文本片段,提高大模型生成效果。
RerankerModel支持 长passage(超过512 tokens,不超过32k tokens)rerank;
RerankerModel可以给出有意义 相关性分数 ,帮助 过滤低质量召回;

BAAI 模型

https://huggingface.co/BAAI/bge-reranker-v2-m3

模型名称基本模型支持语言分层优势
BAI/bge-reranker-basexlm-roberta-base中英文-轻量级的重新登录模型,易于部署,推理速度快。
BAI/bge-reranker-largexlm-roberta-large中英文-轻量级的重新登录模型,易于部署,推理速度快。
BAI/bge-reranker-v2-m3bge-m3多语言-轻量级的重新登录模型,具有强大的多语言能力,易于部署,推理速度快。
BAI/bge-reranker-v2-gemmagemma-2b多语言-适用于多语言环境,在英语水平和多语言能力方面表现良好。
BAI/bge-reranker-v2-minicpm-layerwiseMiniCPM-dpo-bf16多语言8-40适用于多语言环境,在英语和中文熟练程度方面表现良好,允许自由选择输出层,促进加速推理。

可根据场景和资源选择模型。

  • 对于多语言,请使用BAAI/bge-reranker-v2-m3和BAAI/bge-reranker-v2-gemma
  • 对于中文或英文,请使用BAAI/bge-reranker-v2-m3和BAAI/bge-reranker-v2-minicpm-layerwise。
  • 为了提高效率,请使用BAAI/bge-reranker-v2-m3和BAAI/bge-reranker-v2-minicpm-layerwise的低层。
  • 为了获得更好的性能,推荐使用BAAI/bge-reranker-v2-minicpm-layerwise和BAAI/bge-reranker-v2-gemma

嵌入模型(Embedd)

将文本转化为数值向量:嵌入模型将文本(.txt, .pdf, .docx等格式存储)中的单词、短语或整个句子映射到一个高维的数值向量空间(Elasticsearch)
捕捉语义:可以帮助RAGflow理解问题中的上下文信息,例如,同一个词在不同的上下文中可能具有不同的含义。
语义搜索:当用户提出一个问题时,RAGflow会将问题转化为一个向量,然后,系统会计算问题向量与知识库中所有文档向量之间的相似度,最相似的那几个文档就会被认为是与问题最相关的,并被呈现给用户。

BGE Large 中文模型

BGE Large 中文模型是由北京语言大学和百度人工智能研究院联合开发的多模态AI语言模型。它基于Transformer架构,使用海量的中文文本语料进行训练,包括新闻、书籍、网络文本等。

特点

  • 大规模:模型参数超过1000亿,在中文预训练语言模型中规模最大。
  • 多模态:模型具备文本生成、语言理解、翻译、问答等多种任务的能力。
  • 高质量:模型在中文自然语言处理任务上表现出色,在多个基准数据集上取得了state-of-the-art的结果。

优势

  • 全面性:模型覆盖了广泛的中文语言现象,包括词汇、语法、语义和语用。
  • 鲁棒性:模型对中文文本的噪声和错误具有较强的鲁棒性。

    • 鲁棒性(Robustness)是指系统、模型或技术在面对各种异常情况和干扰时,仍能保持稳定性能的能力。
  • 高效性:模型经过优化,可在各种硬件平台上高效运行。

FastEmbed

官方地址:https://qdrant.github.io/fastembed/examples/Supported_Models/

模型维度描述大小(单位:GB)
BAAI/bge-base-en-v1.5768基础英语模型,v1.50.21
BAAI/bge-large-en-v1.51024大型英语模型,v1.51.2
BAAI/bge-small-en-v1.5384快速和默认英语模型0.067
BAAI/bge-small-zh-v1.5512快速推荐的中国模型0.09
jinaai/jina-embeddings-v2-base-en768支持8192序列的英文嵌入模型0.52
jinaai/jina-embeddings-v2-small-en512支持8192序列的英文嵌入模型0.12
nomic-ai/nomic-embed-text-v1.57688192上下文长度英语模型0.52
sentence-transformers/all-MiniLM-L6-v2384句子转换器模型,MiniLM-L6-v20.09

网易文本检索大模型

官方地址:https://gitcode.com/gh_mirrors/bc/BCEmbedding/overview
BCEmbedding是由网易有道开发的中英双语和跨语种语义表征算法模型库,其中包含EmbeddingModel 和
BCEmbedding以其出色的双语和跨语种能力而著称,在语义检索中消除中英语言之间的差异,从而实现:

  • 强大的双语和跨语种语义表征能力【基于MTEB的语义表征评测指标】。
  • 基于LlamaIndex的RAG评测,表现SOTA【基于LlamaIndex的RAG评测指标】。

网易AI评测指标

优势

现有的单个语义表征模型在双语和跨语种场景中常常表现不佳,特别是在中文、英文及其跨语种任务中。BCEmbedding充分利用有道翻译引擎的优势,实现只需一个模型就可以在单语、双语和跨语种场景中表现出卓越的性能。
EmbeddingModel支持中文和英文(之后会支持更多语种);RerankerModel支持中文,英文,日文和韩文。

主要特点

  • 双语和跨语种能力:基于有道翻译引擎的强大能力,BCEmbedding实现强大的中英双语和跨语种语义表征能力。
  • RAG适配:面向RAG做针对性优化,可适配大多数相关任务,比如翻译,摘要,问答等。此外,针对 问题理解(query understanding) 也做了针对优化。详见 上方基于LlamaIndex的RAG评测指标。
  • 高效且精确的语义检索:EmbeddingModel采用双编码器,可以在第一阶段实现高效的语义检索。RerankerModel采用交叉编码器,可以在第二阶段实现更高精度的语义顺序精排。
  • 更好的领域泛化性:为了在更多场景实现更好的效果,我们收集了多种多样的领域数据。
  • 用户友好:语义检索时不需要特殊指令前缀。也就是,你不需要为各种任务绞尽脑汁设计指令前缀。
  • 有意义的重排序分数:RerankerModel可以提供有意义的语义相关性分数(不仅仅是排序),可以用于过滤无意义文本片段,提高大模型生成效果。
  • 产品化检验:BCEmbedding已经被有道众多产品检验。
模型名称模型类型支持语言参数量开源权重
bce-reranker-base_v1RerankerModel中英日韩279MHuggingface(推荐), 国内通道, ModelScope, WiseModel

通义千问

模型中文名模型英文名数据类型向量维度最大处理token长度支持语种
通用文本向量text-embedding-v1float(32)15362048中文、英语、西班牙语、法语、葡萄牙语、印尼语
通用文本向量text-embedding-v2float(32)15368192中文、英语、西班牙语、法语、葡萄牙语、印尼语、日语、韩语、德语、俄语等
通用文本向量text-embedding-v3float(32)1024/768/5128192中文、英语、西班牙语、法语、葡萄牙语、印尼语、日语、韩语、德语、俄语等50+语言种

源码流程解析

源码结构

源码结构
对应模块的功能如下:

  • api 为后端的 API
  • web 对应的是前端页面
  • conf 为配置信息
  • deepdoc 对应的就是文件解析模块

其他相关的技术栈如下:

  • Web 服务是基于 Flask 实现
  • 业务数据库使用的是 MySQL
  • 向量数据库使用的是 ElasticSearch
  • 文件存储使用的是 MinIO

源码解析

文件加载的支持

常规的 RAG 服务都是在上传时进行文件的加载和解析,但是 RAGFlow 的上传仅仅包含上传至 MinIO,需要手工点击触发文件的解析。
解析图

文件解析接口

文件解析通过接口 /v1/document/run 进行触发,实际的处理是在 api/db/services/task_service.py 中的 queue_tasks() 中完成。此方法会根据文件创建一个或多个异步任务,通过 Redis 消息队列进行暂存之后就是离线异步处理。实现如下所示:

def queue_tasks(doc, bucket, name):
    def new_task():
        nonlocal doc
        return {
            "id": get_uuid(),
            "doc_id": doc["id"]
        }
    tsks = []
    # pdf 文件的解析,根据不同的类型设置单个任务最多处理的页数
    
    # 默认单个任务处理 12 页 pdf,pager 类型的 pdf 一个任务处理 22 页,其他 pdf 不分页
    if doc["type"] == FileType.PDF.value:
        file_bin = STORAGE_IMPL.get(bucket, name)
        do_layout = doc["parser_config"].get("layout_recognize", True)
        pages = PdfParser.total_page_number(doc["name"], file_bin)
        page_size = doc["parser_config"].get("task_page_size", 12)
        if doc["parser_id"] == "paper":
            page_size = doc["parser_config"].get("task_page_size", 22)
        if doc["parser_id"] == "one":
            page_size = 1000000000
        if doc["parser_id"] == "knowledge_graph":
            page_size = 1000000000
        if not do_layout:
            page_size = 1000000000
        page_ranges = doc["parser_config"].get("pages")
        if not page_ranges:
            page_ranges = [(1, 100000)]
        for s, e in page_ranges:
            s -= 1
            s = max(0, s)
            e = min(e - 1, pages)
            for p in range(s, e, page_size):
                task = new_task()
                task["from_page"] = p
                task["to_page"] = min(p + page_size, e)
                tsks.append(task)
                                
    # 表格数据单个任务处理 3000 行
    elif doc["parser_id"] == "table":
        file_bin = STORAGE_IMPL.get(bucket, name)
        rn = RAGFlowExcelParser.row_number(
            doc["name"], file_bin)
        for i in range(0, rn, 3000):
            task = new_task()
            task["from_page"] = i
            task["to_page"] = min(i + 3000, rn)
            tsks.append(task)
    else:
        tsks.append(new_task())

    bulk_insert_into_db(Task, tsks, True)
    DocumentService.begin2parse(doc["id"])

    # 任务插入 Redis 消息队列,方便异步处理
    for t in tsks:
        assert REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=t), "Can't access Redis. Please check the Redis' status."

消息队列的消费模块

查看对应的消息队列的消费模块,对应在 rag/svr/task_executor.py 中的 main() 方法中。实现如下所示:

def main():
    #获取任务
    rows = collect()
    if len(rows) == 0:
        return
    for _, r in rows.iterrows():
        callback = partial(set_progress, r["id"], r["from_page"], r["to_page"])
        try:
            embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING, llm_name=r["embd_id"], lang=r["language"])
        except Exception as e:
            callback(-1, msg=str(e))
            cron_logger.error(str(e))
            continue

        if r.get("task_type", "") == "raptor":
            try:
                chat_mdl = LLMBundle(r["tenant_id"], LLMType.CHAT, llm_name=r["llm_id"], lang=r["language"])
                cks, tk_count = run_raptor(r, chat_mdl, embd_mdl, callback)
            except Exception as e:
                callback(-1, msg=str(e))
                cron_logger.error(str(e))
                continue
        else:
            st = timer()
            # 执行文件解析
            cks = build(r)
            cron_logger.info("Build chunks({}): {}".format(r["name"], timer() - st))
            if cks is None:
                continue
            if not cks:
                callback(1., "No chunk! Done!")
                continue
            # TODO: exception handler
            ## set_progress(r["did"], -1, "ERROR: ")
            callback(
                msg="Finished slicing files(%d). Start to embedding the content." %
                    len(cks))
            st = timer()
            try:
                # 执行向量化
                tk_count = embedding(cks, embd_mdl, r["parser_config"], callback)
            except Exception as e:
                callback(-1, "Embedding error:{}".format(str(e)))
                cron_logger.error(str(e))
                tk_count = 0
            cron_logger.info("Embedding elapsed({}): {:.2f}".format(r["name"], timer() - st))
            callback(msg="Finished embedding({:.2f})! Start to build index!".format(timer() - st))

        init_kb(r)
        chunk_count = len(set([c["_id"] for c in cks]))
        st = timer()
        es_r = ""
        es_bulk_size = 4
        # 写入 ES
        for b in range(0, len(cks), es_bulk_size):
            es_r = ELASTICSEARCH.bulk(cks[b:b + es_bulk_size], search.index_name(r["tenant_id"]))
            if b % 128 == 0:
                callback(prog=0.8 + 0.1 * (b + 1) / len(cks), msg="")

        cron_logger.info("Indexing elapsed({}): {:.2f}".format(r["name"], timer() - st))
        if es_r:
            callback(-1, f"Insert chunk error, detail info please check ragflow-logs/api/cron_logger.log. Please also check ES status!")
            ELASTICSEARCH.deleteByQuery(
                Q("match", doc_id=r["doc_id"]), idxnm=search.index_name(r["tenant_id"]))
            cron_logger.error(str(es_r))
        else:
            if TaskService.do_cancel(r["id"]):
                ELASTICSEARCH.deleteByQuery(
                    Q("match", doc_id=r["doc_id"]), idxnm=search.index_name(r["tenant_id"]))
                continue
            callback(1., "Done!")
            DocumentService.increment_chunk_num(
                r["doc_id"], r["kb_id"], tk_count, chunk_count, 0)
            cron_logger.info(
                "Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(
                    r["id"], tk_count, len(cks), timer() - st))

完整的处理流程如下所示:
1、调用 collect() 方法从消息队列中获取任务
2、接下来每个任务会依次调用 build() 进行文件的解析
3、调用 embedding() 方法进行向量化
4、最后调用 ELASTICSEARCH.bulk() 写入 ElasticSearch,从这里就可以看到向量库的技术选型

build() 方法

build() 方法深入 RAGFlow 核心的文件解析,具体的实现如下所示:

def build(row):
    if row["size"] > DOC_MAXIMUM_SIZE:
        set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
                                             (int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
        return []

    callback = partial(
        set_progress,
        row["id"],
        row["from_page"],
        row["to_page"])
                
    # 根据类型选择合适的解析器
    chunker = FACTORY[row["parser_id"].lower()]
    try:
        st = timer()
        bucket, name = File2DocumentService.get_minio_address(doc_id=row["doc_id"])
        binary = get_minio_binary(bucket, name)
        cron_logger.info(
            "From minio({}) {}/{}".format(timer() - st, row["location"], row["name"]))
    except TimeoutError as e:
        callback(-1, f"Internal server error: Fetch file from minio timeout. Could you try it again.")
        cron_logger.error(
            "Minio {}/{}: Fetch file from minio timeout.".format(row["location"], row["name"]))
        return
    except Exception as e:
        if re.search("(No such file|not found)", str(e)):
            callback(-1, "Can not find file <%s> from minio. Could you try it again?" % row["name"])
        else:
            callback(-1, f"Get file from minio: %s" %
                     str(e).replace("'", ""))
        traceback.print_exc()
        return

    try:
        # 执行文档的解析和切片
        cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
                            to_page=row["to_page"], lang=row["language"], callback=callback,
                            kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
        cron_logger.info(
            "Chunking({}) {}/{}".format(timer() - st, row["location"], row["name"]))
    except Exception as e:
        callback(-1, f"Internal server error while chunking: %s" %
                     str(e).replace("'", ""))
        cron_logger.error(
            "Chunking {}/{}: {}".format(row["location"], row["name"], str(e)))
        traceback.print_exc()
        return

    docs = []
    doc = {
        "doc_id": row["doc_id"],
        "kb_id": [str(row["kb_id"])]
    }
    el = 0
    for ck in cks:
        d = copy.deepcopy(doc)
        d.update(ck)
        md5 = hashlib.md5()
        md5.update((ck["content_with_weight"] +
                    str(d["doc_id"])).encode("utf-8"))
        d["_id"] = md5.hexdigest()
        d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19]
        d["create_timestamp_flt"] = datetime.datetime.now().timestamp()
        if not d.get("image"):
            docs.append(d)
            continue

        try:
            output_buffer = BytesIO()
            if isinstance(d["image"], bytes):
                output_buffer = BytesIO(d["image"])
            else:
                d["image"].save(output_buffer, format='JPEG')

            st = timer()
            STORAGE_IMPL.put(row["kb_id"], d["_id"], output_buffer.getvalue())
            el += timer() - st
        except Exception as e:
            cron_logger.error(str(e))
            traceback.print_exc()

        d["img_id"] = "{}-{}".format(row["kb_id"], d["_id"])
        del d["image"]
        docs.append(d)
    cron_logger.info("MINIO PUT({}):{}".format(row["name"], el))

    return docs

实际是根据 parser_id 去选择合适的解析器组,注意这个应该是从业务层得到的一个类型,每个解析器组中都包含了 pdf, word 等支持格式的文件解析,可以理解为一个使用场景的属性。
以默认的 naive 类型为例深入对应的 chunk() 实现,其对应的实现在 rag/app/naive.py 中。此方法中包含了目前主持的 docx, pdf, xlsx, md 等格式的解析,以 pdf 为例深入查看对应的实现。
从rag/app/naive.py可以看到解析器是继承自 deepdoc/parser/pdf_parser.py 中的 RAGFlowPdfParser 实现

pdf 文件的打开是基于 PyPDF2 实现,并基于 pdfplumber 实现表格数据的提取,这个库相对 PyMuPDF 速度更慢,但是可以处理得更精细

  • PyMuPDF是一个高性能Python库,用于PDF(及其他)文档的数据提取、分析、转换和操作。
  • PyPDF2 是一个免费的开源纯 Python PDF 库,能够分割、 合并、 裁剪和转换 PDF 文件的页面。它还可以 向 PDF 文件 添加自定义数据、查看选项和密码。PyPDF2 还可以从 PDF 中检索文本 和 元数据。

使用的 OCR 模型为 /InfiniFlow/deepdoc,在解析中额外加载了一个 XGB 模型 InfiniFlow/text_concat_xgb_v1.0 用于内容提取。

  • OCR(光学字符识别)模型的主要作用是将图片中的文本内容识别并转换为机器可读的文本数据。OCR 模型 /InfiniFlow/deepdoc 负责从文档或图像中提取文本,这个过程可以用于处理扫描文件、图片、PDF 等类型的文档。
  • InfiniFlow/text_concat_xgb_v1.0:是一个梯度提升决策树模型(XGBoost),用于对从 OCR 模型提取的文本进行进一步的处理和内容提取。

两者的结合可以实现不仅从图像中获取文本,还能从文本中提炼出有价值的关键信息。
RAGFlow 可以将解析后的文本块与原始文档中的原始位置关联起来,目前看起来只有 RagFlow 实现了类似的效果。

文件的预处理策略

在 RAGFlow 中的文件中包含了不少了数据的清理操作,比如在 deepdoc/vision/layout_recognizer.py 中的就包含着文档中无用内容的判断,示例如下:

        def __is_garbage(b):
            patt = [r"^•+$", r"(版权归©|免责条款|地址[::])", r"\.{3,}", "^[0-9]{1,2} / ?[0-9]{1,2}$",
                    r"^[0-9]{1,2} of [0-9]{1,2}$", "^http://[^ ]{12,}",
                    "(资料|数据)来源[::]", "[0-9a-z._-]+@[a-z0-9-]+\\.[a-z]{2,3}",
                    "\\(cid *: *[0-9]+ *\\)"
                    ]
            return any([re.search(p, b["text"]) for p in patt])

文档中页码、链接、版权信息等会被清理。但是这样处理比较分散,而且不同的流程中也充斥着大量的特殊处理,导致从源码很难拆分出明确的预处理逻辑。

文件检索的支持 (包含混合检索)

文件检索的支持可以查看实际的对话处理流程,对话的 API 为 /v1/conversation/completion,实际对话的处理是在 api/db/services/dialog_service.py 中的 chat() 方法中完成
跟踪对话处理流程,可以看到文件的检索是在 rag/nlp/search.py 中的 search() 方法中完成。

  • search 方法负责在知识库中检索信息
  • chat 在生成对话时使用了类似的检索机制

RAGFlow 的检索目前实现的是混合检索,实现的是文本检索 + 向量检索,混合检索完全依赖 ElasticSearch 实现,具体的实现如下所示:

    def search(self, req, idxnm, emb_mdl=None, highlight=False):
        qst = req.get("question", "")
        bqry, keywords = self.qryr.question(qst, min_match="30%")
        bqry = self._add_filters(bqry, req)
        bqry.boost = 0.05
                
        # 构造 ElasticSearch 文本查询的请求
        s = Search()
        pg = int(req.get("page", 1)) - 1
        topk = int(req.get("topk", 1024))
        ps = int(req.get("size", topk))
        src = req.get("fields", ["docnm_kwd", "content_ltks", "kb_id", "img_id", "title_tks", "important_kwd",
                                 "image_id", "doc_id", "q_512_vec", "q_768_vec", "position_int", "knowledge_graph_kwd",
                                 "q_1024_vec", "q_1536_vec", "available_int", "content_with_weight"])

        s = s.query(bqry)[pg * ps:(pg + 1) * ps]
        s = s.highlight("content_ltks")
        s = s.highlight("title_ltks")
        if not qst:
            if not req.get("sort"):
                s = s.sort(
                    #{"create_time": {"order": "desc", "unmapped_type": "date"}},
                    {"create_timestamp_flt": {
                        "order": "desc", "unmapped_type": "float"}}
                )
            else:
                s = s.sort(
                    {"page_num_int": {"order": "asc", "unmapped_type": "float",
                                      "mode": "avg", "numeric_type": "double"}},
                    {"top_int": {"order": "asc", "unmapped_type": "float",
                                 "mode": "avg", "numeric_type": "double"}},
                    #{"create_time": {"order": "desc", "unmapped_type": "date"}},
                    {"create_timestamp_flt": {
                        "order": "desc", "unmapped_type": "float"}}
                )

        if qst:
            s = s.highlight_options(
                fragment_size=120,
                number_of_fragments=5,
                boundary_scanner_locale="zh-CN",
                boundary_scanner="SENTENCE",
                boundary_chars=",./;:\\!(),。?:!……()——、"
            )
        s = s.to_dict()
                
        # 补充向量查询的信息
        q_vec = []
        if req.get("vector"):
            assert emb_mdl, "No embedding model selected"
            s["knn"] = self._vector(
                qst, emb_mdl, req.get(
                    "similarity", 0.1), topk)
            s["knn"]["filter"] = bqry.to_dict()
            if not highlight and "highlight" in s:
                del s["highlight"]
            q_vec = s["knn"]["query_vector"]
        es_logger.info("【Q】: {}".format(json.dumps(s)))
        res = self.es.search(deepcopy(s), idxnm=idxnm, timeout="600s", src=src)
        es_logger.info("TOTAL: {}".format(self.es.getTotal(res)))
        if self.es.getTotal(res) == 0 and "knn" in s:
            bqry, _ = self.qryr.question(qst, min_match="10%")
            if req.get("doc_ids"):
                bqry = Q("bool", must=[])
            bqry = self._add_filters(bqry, req)
            s["query"] = bqry.to_dict()
            s["knn"]["filter"] = bqry.to_dict()
            s["knn"]["similarity"] = 0.17
                        
            # 将构造的完整查询提交给 ElasticSearch 进行查询
            res = self.es.search(s, idxnm=idxnm, timeout="600s", src=src)
            es_logger.info("【Q】: {}".format(json.dumps(s)))

        kwds = set([])
        for k in keywords:
            kwds.add(k)
            for kk in rag_tokenizer.fine_grained_tokenize(k).split(" "):
                if len(kk) < 2:
                    continue
                if kk in kwds:
                    continue
                kwds.add(kk)

        aggs = self.getAggregation(res, "docnm_kwd")

        return self.SearchResult(
            total=self.es.getTotal(res),
            ids=self.es.getDocIds(res),
            query_vector=q_vec,
            aggregation=aggs,
            highlight=self.getHighlight(res, keywords, "content_with_weight"),
            field=self.getFields(res, src),
            keywords=list(kwds)
        )

可以看到 RAGFlow 将混合检索需求转换为复杂的查询条件,利用 elasticsearch-dsl 进行复杂查询的构造,之后直接提交给 ElasticSearch 即可。

检索结果的重排

文件的重排是在 rag/nlp/search.py 中的 rerank() 中完成的,重排是基于文本匹配得分 + 向量匹配得分混合进行排序,默认文本匹配的权重为 0.3, 向量匹配的权重为 0.7,对应的实现如下所示:

    # tkweight 为文本匹配权重,vtweight 为向量匹配权重
    def rerank(self, sres, query, tkweight=0.3,
               vtweight=0.7, cfield="content_ltks"):
        # 获取文本关键词
        _, keywords = self.qryr.question(query)
        # 获取文本向量
        ins_embd = [
            Dealer.trans2floats(
                sres.field[i].get("q_%d_vec" % len(sres.query_vector), "\t".join(["0"] * len(sres.query_vector)))) for i in sres.ids]
        if not ins_embd:
            return [], [], []

        for i in sres.ids:
            if isinstance(sres.field[i].get("important_kwd", []), str):
                sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]
        ins_tw = []
        for i in sres.ids:
            content_ltks = sres.field[i][cfield].split(" ")
            title_tks = [t for t in sres.field[i].get("title_tks", "").split(" ") if t]
            important_kwd = sres.field[i].get("important_kwd", [])
            tks = content_ltks + title_tks + important_kwd
            ins_tw.append(tks)

        # 获取整体相似分,文本相似分,向量相似分
        sim, tksim, vtsim = self.qryr.hybrid_similarity(sres.query_vector,
                                                        ins_embd,
                                                        keywords,
                                                        ins_tw, tkweight, vtweight)
        return sim, tksim, vtsim

获取混合相似分之后,基于混合的相似分进行过滤和重排,默认混合得分低于 0.2 的会被过滤掉

大模型的处理

在进行检索和重排阶段中,只进行了必要的过滤,没有限制匹配文档的数量。
实际内容可能会超过大模型的输入 token 数量,因此在调用大模型前会调用 api/db/services/dialog_service.py 文件中 message_fit_in() 根据大模型可用的 token 数量进行过滤。

def message_fit_in(msg, max_length=4000):
    def count():
        nonlocal msg
        tks_cnts = []
        for m in msg:
            tks_cnts.append(
                {"role": m["role"], "count": num_tokens_from_string(m["content"])})
        total = 0
        for m in tks_cnts:
            total += m["count"]
        return total

    c = count()
    if c < max_length:
        return c, msg

    msg_ = [m for m in msg[:-1] if m["role"] == "system"]
    msg_.append(msg[-1])
    msg = msg_
    c = count()
    if c < max_length:
        return c, msg

    ll = num_tokens_from_string(msg_[0]["content"])
    l = num_tokens_from_string(msg_[-1]["content"])
    if ll / (ll + l) > 0.8:
        m = msg_[0]["content"]
        m = encoder.decode(encoder.encode(m)[:max_length - l])
        msg[0]["content"] = m
        return max_length, msg

    m = msg_[1]["content"]
    m = encoder.decode(encoder.encode(m)[:max_length - l])
    msg[1]["content"] = m
    return max_length, msg

根据提供的最大长度限制(max_length)调整消息列表的内容,使其总长度不超过指定的限制。具体来说,代码会根据消息的 role 和 content 来计算消息的总字符数或 token 数,并通过裁剪(truncate)部分消息的内容来确保消息的总长度符合限制。
将检索的内容,历史聊天记录以及问题构造为 prompt,即可作为大模型的输入了,默认的中文 prompt 如下所示:

你是一个智能助手,请总结知识库的内容来回答问题,请列举知识库中的数据详细回答。当所有知识库内容都与问题无关时,你的回答必须包括“知识库中未找到您要的答案!”这句话。回答需要考虑聊天历史。
        以下是知识库:
        {knowledge}
        以上是知识库。
最后修改:2024 年 10 月 12 日
如果觉得我的文章对你有用,请随意赞赏