承接笔主前面文章https://blog.csdn.***/2402_87515571/article/details/154364069?fromshare=blogdetail&sharetype=blogdetail&sharerId=154364069&sharerefer=PC&sharesource=2402_87515571&sharefrom=from_link对检索的演进理解总结,这篇文章将基于 ReAct(Reasoning-Acting)模式,通过 "思考 - 行动 - 观察" 的循环来处理用户查询详细展开解读
系统整体架构
该 Agentic RAG 系统主要由以下几个核心组件构成:
- Agent 核心逻辑(agent.py):实现了 ReAct 模式的主体逻辑,包括对话管理、工具调用和决策流程
- 工具集(tools.py):提供了与知识库交互的工具,包括搜索和文档获取功能
- 配置模块(config.py):管理系统的各种配置参数
- 评估模块(evaluate.py):用于评估系统性能
- 主程序入口(main.py):提供了交互界面和运行模式
一、数据集构建
以法律领域为例子,,高质量的标注数据是模型训练与评估的基础,采用分层构建原则数据集分为简单案例和复杂案例
- 简单案例聚焦单一法律条文的直接应用(如 “故意杀人罪判几年?”),适合评估模型对基础法律知识的掌握程度。
- 复杂案例模拟真实司法场景(如 “酒后驾车致人死亡后逃逸如何定罪?”),需结合多个条文分析,用于评估模型的逻辑推理能力。
1. 数据集构建模式(Dataset Construction Pattern)
class LegalDatasetBuilder:
def __init__(self):
self.simple_cases = []
self.***plex_cases = []
分层数据设计
- 将测试用例分为简单和复杂两类,这是评估系统的经典方法
- 体现了渐进式评估的思想:从基础能力到复合能力的测试
- 对应机器学习中的curriculum learning概念
2. 简单案例生成(Simple Cases Generation)
简单案例主要用于评估系统的基础检索能力
def create_simple_cases(self) -> List[Dict[str, Any]]:
simple_cases = [
{
"id": "simple_1",
"question": "故意杀人罪判几年?",
"expected_keywords": ["死刑", "无期徒刑", "十年以上有期徒刑"],
"reference": "《中华人民共和国刑法》第二百三十二条",
"difficulty": "easy"
},
# ...
]
结构化评估设计
- expected_keywords: 体现了基于关键词匹配的评估方法,这是信息检索系统的基础评估指标
- reference: 提供了ground truth的来源,确保评估的可追溯性
- difficulty: 实现了分级评估,对应教育学中的布鲁姆分类法
深层含义:
- 这些简单案例测试的是RAG系统的事实检索能力
- 对应信息检索中的精确匹配问题
- 评估系统是否能准确定位和提取特定法条信息
3.复杂案例生成(***plex Cases Generation)
def create_***plex_cases(self) -> List[Dict[str, Any]]:
***plex_cases = [
{
"id": "***plex_1",
"question": """张某因与李某发生经济纠纷,持刀闯入李某家中,意图讨债。
在争执过程中,张某用刀刺伤李某,导致李某重伤。
同时,张某还顺手拿走了李某家中的现金5万元。
请问张某的行为应如何定性?可能面临什么样的刑事处罚?""",
"expected_analysis": ["入户抢劫", "故意伤害", "数罪并罚"],
"requires_multi_query": True
}
]
知识点:多步推理评估
- requires_multi_query: 标识需要多次检索的复杂问题
- expected_analysis: 不再是简单关键词,而是分析要点
- 体现了组合推理(***positional reasoning)的评估
深层含义:
- 测试系统的多文档整合能力
- 评估法律推理链的构建能力
- 对应认知科学中的案例推理(Case-Based Reasoning)
4. 知识库文档创建(Knowledge Base Construction)
def create_legal_documents() -> List[Dict[str, str]]:
documents = [
{
"doc_id": "criminal_law_homicide",
"title": "刑法-故意杀人罪",
"content": """第二百三十二条 【故意杀人罪】故意杀人的,处死刑、无期徒刑或者十年以上有期徒刑;
情节较轻的,处三年以上十年以下有期徒刑。
故意杀人罪是指故意非法剥夺他人生命的行为。该罪侵犯的客体是他人的生命权。
法律依据是《中华人民共和国刑法》第二百三十二条。
量刑标准:
1. 情节严重的:死刑、无期徒刑或十年以上有期徒刑
2. 情节较轻的:三年以上十年以下有期徒刑"""
}
]
二、Agent 核心逻辑
1. 导入与日志配置
"""Agentic RAG System with ReAct Pattern"""
import json
import logging
from typing import List, Dict, Any, Optional, Generator
from dataclasses import dataclass, field
from datetime import datetime
from openai import OpenAI
from config import Config, LLMConfig, AgentConfig
from tools import KnowledgeBaseTools, get_tool_definitions
logging.basi***onfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
2. 消息数据结构设计
@dataclass
class Message:
"""Represents a message in the conversation"""
role: str # "user", "assistant", "tool"
content: str
tool_calls: Optional[List[Dict[str, Any]]] = None
tool_call_id: Optional[str] = None
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
@dataclass 装饰器的优势
- 自动生成
__init__,__repr__,__eq__等方法 - 减少样板代码,提高可读性
-
field(default_factory=...)避免可变默认参数陷阱
对话角色设计
这里定义了三种角色,对应 OpenAI Function Calling 的标准:
- user:用户输入
- assistant:AI 的响应(可能包含工具调用)
- tool:工具执行结果
时间戳的作用
- 追踪对话流程
- 调试时定位问题
- 分析系统性能(响应时间
3. 初始化与多 LLM 提供商支持
class AgenticRAG:
"""Agentic RAG system with ReAct pattern and multiple LLM provider support"""
def __init__(self, config: Optional[Config] = None):
"""Initialize the agent"""
self.config = config or Config.from_env()
# Initialize LLM client
self._init_llm_client()
# Initialize knowledge base tools
self.kb_tools = KnowledgeBaseTools(self.config.knowledge_base)
# Conversation history
self.conversation_history: List[Dict[str, Any]] = []
# Tool definitions
self.tools = get_tool_definitions()
logger.info(f"Initialized AgenticRAG with provider: {self.config.llm.provider}")
依赖注入模式
config: Optional[Config] = None
self.config = config or Config.from_env()
- 支持手动配置或从环境变量读取
- 提高测试性:可以注入 mock 配置
- 符合 SOLID 原则中的依赖倒置原则
组件化设计
系统分为三个核心组件:
- LLM Client:负责与大模型交互
- Knowledge Base Tools:负责知识库检索
- Conversation History:维护对话上下文
4. LLM 客户端初始化(多提供商支持)
def _init_llm_client(self):
"""Initialize the LLM client based on provider"""
client_config, model = self.config.llm.get_client_config()
# Extract base_url if present
base_url = client_config.pop("base_url", None)
# Create OpenAI client
if base_url:
self.client = OpenAI(base_url=base_url, **client_config)
else:
self.client = OpenAI(**client_config)
self.model = model
logger.info(f"Using model: {self.model}")
OpenAI 兼容接口的统一性
- 大多数 LLM 提供商都实现了 OpenAI 兼容的 API
- 通过修改
base_url,可以无缝切换不同提供商 - 支持的提供商:Kimi、DeepSeek、SiliconFlow、Groq 等
配置解耦
client_config, model = self.config.llm.get_client_config()
- 配置逻辑在
Config类中,不污染业务代码 - 便于添加新的 LLM 提供商
- 符合单一职责原则
5. 系统提示词(System Prompt)
def _get_system_prompt(self) -> str:
"""Generate the system prompt"""
return """You are an intelligent assistant with a***ess to a knowledge base. Your primary role is to answer questions a***urately based on the information available in the knowledge base.
## Important Guidelines:
1. **Knowledge Base Only**: You MUST only answer questions based on information found in the knowledge base. If the information is not available, clearly state that you cannot answer based on the available knowledge.
2. **Use Tools Effectively**:
- Use `knowledge_base_search` to search for relevant information
- Use `get_document` to retrieve ***plete documents when you need more context
- You may need multiple searches with different queries to fully answer a question
3. **Citations Required**: Always include citations in your answers. Format citations as [Doc: document_id] or [Chunk: chunk_id] inline with your response.
4. **Reasoning Process**: Think step-by-step:
- First, understand what information is needed
- Search for relevant information
- If needed, retrieve full documents for context
- Synthesize the information to answer the question
- Include proper citations
5. **Handle Follow-ups**: For follow-up questions, consider the conversation context but always verify information from the knowledge base.
6. **Be A***urate**: Never make up information. If something is unclear or not found, say so explicitly.
Remember: Your credibility depends on providing a***urate, well-cited information from the knowledge base only."""
提示词工程的核心原
1. 角色定位(Role Definition)
You are an intelligent assistant with a***ess to a knowledge base.
- 明确 AI 的身份和能力边界
- 避免 AI 产生幻觉(hallucination)
2. 约束条件(Constraints)
You MUST only answer questions based on information found in the knowledge base.
- 强制 AI 只使用知识库信息
- 防止编造答案
- 这是 RAG 系统的核心价值
3. 工具使用指导(Tool Usage Guidance)
- Use `knowledge_base_search` to search for relevant information
- Use `get_document` to retrieve ***plete documents when you need more context
- You may need multiple searches with different queries to fully answer a question
- 教 AI 如何使用工具
- 暗示可能需要多次检索(Agentic 的核心)
- 引导 AI 进行策略性思考
4. 引用要求(Citation Requirements)
Always include citations in your answers. Format citations as [Doc: document_id]
5. 推理链引导(Chain-of-Thought)
Think step-by-step:
- First, understand what information is needed
- Search for relevant information
- If needed, retrieve full documents for context
- Synthesize the information to answer the question
- Include proper citations
- 引导 AI 进行结构化思考
- 这是 ReAct 模式的体现
- 提高复杂问题的解决能力
6. 工具执行函数
def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
"""Execute a tool and return the result"""
try:
if tool_name == "knowledge_base_search":
query = arguments.get("query", "")
results = self.kb_tools.knowledge_base_search(query)
# Log full trajectory when verbose
if self.config.agent.verbose:
logger.info("=" * 80)
logger.info(f"TOOL EXECUTION: {tool_name}")
logger.info("-" * 80)
logger.info(f"Query: {query}")
logger.info("-" * 80)
if not results:
if self.config.agent.verbose:
logger.info("Results: No relevant documents found")
logger.info("=" * 80)
return {"status": "no_results", "message": "No relevant documents found"}
# Format results for agent - KEEP ALL RESULTS
formatted_results = []
for i, r in enumerate(results, 1):
formatted_results.append({
"doc_id": r["doc_id"],
"chunk_id": r["chunk_id"],
"text": r["text"],
"score": r["score"]
})
# Log each result in full detail
if self.config.agent.verbose:
logger.info(f"Result {i}/{len(results)}:")
logger.info(f" Document ID: {r['doc_id']}")
logger.info(f" Chunk ID: {r['chunk_id']}")
logger.info(f" Score: {r['score']:.4f}")
logger.info(f" Text (full):\n{'-' * 40}")
logger.info(r['text'])
logger.info("-" * 40)
if self.config.agent.verbose:
logger.info(f"Total results found: {len(results)}")
logger.info("=" * 80)
return {
"status": "su***ess",
"results": formatted_results[:3], # Limit to top 3 for LLM context
"total_found": len(results),
"all_results": formatted_results # Keep all for logging
}
工具抽象层设计
这个函数是工具执行的统一入口,体现了几个重要设计模式:
1. 策略模式(Strategy Pattern)
if tool_name == "knowledge_base_search":
# 执行搜索
elif tool_name == "get_document":
# 获取文档
- 根据工具名称选择不同的执行策略
- 易于扩展新工具
2. 结果格式化
formatted_results = []
for i, r in enumerate(results, 1):
formatted_results.append({
"doc_id": r["doc_id"],
"chunk_id": r["chunk_id"],
"text": r["text"],
"score": r["score"]
})
- 统一返回格式
- 包含必要的元数据(doc_id, chunk_id, score)
- 便于 LLM 理解和引用
3. 上下文窗口管理
return {
"status": "su***ess",
"results": formatted_results[:3], # Limit to top 3 for LLM context
"total_found": len(results),
"all_results": formatted_results # Keep all for logging
}
深层知识点:为什么只返回 Top 3?
- Token 限制:LLM 有上下文窗口限制(如 GPT-4 的 128K tokens)
- 注意力稀释:信息过多会降低 LLM 的理解质量
- 成本控制:更少的 token 意味着更低的 API 成本
- 质量优先:Top 3 通常已经包含最相关的信息
7. 获取文档工具
elif tool_name == "get_document":
doc_id = arguments.get("doc_id", "")
# Log full trajectory when verbose
if self.config.agent.verbose:
logger.info("=" * 80)
logger.info(f"TOOL EXECUTION: {tool_name}")
logger.info("-" * 80)
logger.info(f"Document ID: {doc_id}")
logger.info("-" * 80)
document = self.kb_tools.get_document(doc_id)
if "error" in document:
if self.config.agent.verbose:
logger.info(f"Error: {document['error']}")
logger.info("=" * 80)
return {"status": "error", "message": document["error"]}
# Log full document content
if self.config.agent.verbose:
logger.info("Document Retrieved:")
logger.info(f" Doc ID: {document.get('doc_id', doc_id)}")
if document.get('metadata'):
logger.info(f" Metadata: {json.dumps(document['metadata'], indent=2, ensure_ascii=False)}")
logger.info(" Content (full):\n" + "=" * 40)
logger.info(document.get('content', ''))
logger.info("=" * 80)
return {
"status": "su***ess",
"document": {
"doc_id": document.get("doc_id", doc_id),
"content": document.get("content", ""),
"metadata": document.get("metadata", {})
}
}
两阶段检索策略
这个工具体现了 Agentic RAG 的核心优势:
第一阶段:knowledge_base_search
- 快速检索相关片段
- 返回 Top-K 结果
- 适合回答简单问题
第二阶段:get_document
- 获取完整文档
- 提供更多上下文
- 适合复杂问题或需要深入理解的场景
为什么需要两个工具?
- 效率优化:不是所有问题都需要完整文档
- 成本控制:完整文档消耗更多 tokens
- 灵活性:AI 可以根据需要选择策略
- 渐进式检索:先粗后细,模拟人类思考过程
8. 查询处理主函数
def query(self, user_query: str, stream: bool = None) -> Any:
"""
Process a user query using the ReAct pattern.
Args:
user_query: The user's question
stream: Whether to stream the response
Returns:
The agent's response (string or generator for streaming)
"""
if stream is None:
stream = self.config.llm.stream
# Build messages
messages = self._build_messages(user_query)
# Track iterations
iterations = 0
max_iterations = self.config.agent.max_iterations
# Process with ReAct loop
while iterations < max_iterations:
iterations += 1
if self.config.agent.verbose:
logger.info("\n" + "=" * 100)
logger.info(f"ITERATION {iterations}/{max_iterations}")
logger.info("=" * 100)
1. 迭代限制(Max Iterations)
max_iterations = self.config.agent.max_iterations
为什么需要限制?
- 防止无限循环:AI 可能陷入重复检索
- 成本控制:每次迭代都消耗 API 调用
- 用户体验:过长的等待时间不可接受
- 实践经验:大多数问题在 3-5 次迭代内可以解决
2. 消息构建
messages = self._build_messages(user_query)
这个函数的作用:
- 添加系统提示词
- 包含历史对话(支持多轮对话)
- 添加当前用户查询
9. LLM 调用与工具使用
try:
# Call LLM with tools
response = self.client.chat.***pletions.create(
model=self.model,
messages=messages,
tools=self.tools,
tool_choice="auto",
temperature=self.config.llm.temperature,
max_tokens=self.config.llm.max_tokens,
stream=False # We handle streaming separately
)
message = response.choices[0].message
# Add assistant message to history
assistant_msg = {"role": "assistant", "content": message.content or ""}
if message.tool_calls:
assistant_msg["tool_calls"] = [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
} for tc in message.tool_calls
]
messages.append(assistant_msg)
OpenAI Function Calling 机制
这段代码使用了 OpenAI 的 Function Calling 功能,这是实现 Agentic 行为的关键技术:
1. tools 参数
tools=self.tools,
- 告诉 LLM 有哪些工具可用
- 包含工具的名称、描述、参数定义
- LLM 会根据用户问题决定是否调用工具
2. tool_choice="auto"
三种模式:
-
"auto":LLM 自主决定是否使用工具(最常用) -
"none":强制不使用工具 -
{"type": "function", "function": {"name": "xxx"}}:强制使用特定工具
3. 温度参数(Temperature)
temperature=self.config.llm.temperature,
温度的影响:
- 0.0-0.3:确定性强,适合事实性任务(如 RAG)
- 0.7-1.0:创造性强,适合创作任务
- >1.0:随机性很高,通常不推荐
4. 工具调用的数据结构
if message.tool_calls:
assistant_msg["tool_calls"] = [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
} for tc in message.tool_calls
]
OpenAI Function Calling 的工作流程:
- LLM 分析用户问题
- 决定需要调用哪个工具
- 生成工具调用请求(包含参数)
- 系统执行工具
- 将结果返回给 LLM
- LLM 基于结果生成最终答案
10. 工具调用处理
# Process tool calls if present
if message.tool_calls:
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
try:
arguments = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
arguments = {}
if self.config.agent.verbose:
logger.info("\n" + "#" * 80)
logger.info(f"TOOL CALL: {tool_name}")
logger.info(f"Arguments: {json.dumps(arguments, indent=2, ensure_ascii=False)}")
logger.info("#" * 80)
# Execute tool
result = self._execute_tool(tool_name, arguments)
# Log full tool result when verbose
if self.config.agent.verbose:
logger.info("\n" + "*" * 80)
logger.info("TOOL RESULT:")
logger.info("*" * 80)
# Show full result including all_results if present
if 'all_results' in result:
logger.info("All Search Results (***plete):")
for idx, res in enumerate(result['all_results'], 1):
logger.info(f"\nResult {idx}:")
logger.info(json.dumps(res, indent=2, ensure_ascii=False))
else:
logger.info(json.dumps(result, indent=2, ensure_ascii=False))
logger.info("*" * 80 + "\n")
# For messages, don't include all_results to avoid overloading LLM
result_for_llm = {k: v for k, v in result.items() if k != 'all_results'}
# Add tool result to messages
tool_message = {
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result_for_llm, ensure_ascii=False)
}
messages.append(tool_message)
# Continue loop for next iteration
continue
工具调用的完整生命周期
1. 参数解析
try:
arguments = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
arguments = {}
为什么需要 try-except?
- LLM 生成的 JSON 可能格式错误
- 提高系统鲁棒性
- 优雅降级:参数错误时使用空字典
2. 结果过滤
result_for_llm = {k: v for k, v in result.items() if k != 'all_results'}
这是一个关键优化:
-
all_results用于日志记录(完整信息) -
result_for_llm只包含 Top-3(节省 tokens) - 平衡了可观测性和效率
3. 工具结果的消息格式
tool_message = {
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result_for_llm, ensure_ascii=False)
}
messages.append(tool_message)
OpenAI 要求的格式:
-
role: "tool":标识这是工具返回的结果 -
tool_call_id:关联到具体的工具调用 -
content:工具执行结果(必须是字符串)
4. 循环继续
continue
- 有工具调用时,不返回答案
- 继续下一次迭代
- LLM 会基于工具结果进行下一步推理
11. 最终答案生成
else:
# No tool calls, we have final answer
# Update conversation history
self.conversation_history.append({"role": "user", "content": user_query})
self.conversation_history.append(assistant_msg)
# Return response
if stream:
return self._stream_response(message.content or "")
else:
return message.content or ""
附:工具构建
"""Tools for knowledge base interaction"""
import json
import logging
import requests
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from config import KnowledgeBaseConfig, KnowledgeBaseType
logger = logging.getLogger(__name__)
@dataclass
class SearchResult:
"""Search result from knowledge base"""
doc_id: str
chunk_id: str
text: str
score: float
metadata: Dict[str, Any] = None
def to_dict(self) -> Dict[str, Any]:
return {
"doc_id": self.doc_id,
"chunk_id": self.chunk_id,
"text": self.text,
"score": self.score,
"metadata": self.metadata or {}
}
class KnowledgeBaseTools:
"""Tools for interacting with knowledge base"""
def __init__(self, config: KnowledgeBaseConfig):
self.config = config
self.document_store = {} # In-memory store for documents
# Load document store if exists
try:
with open(config.document_store_path, 'r', encoding='utf-8') as f:
self.document_store = json.load(f)
except FileNotFoundError:
logger.info("No existing document store found, starting fresh")
except Exception as e:
logger.error(f"Error loading document store: {e}")
def save_document_store(self):
"""Save document store to disk"""
try:
with open(self.config.document_store_path, 'w', encoding='utf-8') as f:
json.dump(self.document_store, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Error saving document store: {e}")
def knowledge_base_search(self, query: str) -> List[Dict[str, Any]]:
"""
Search the knowledge base with a natural language query.
Args:
query: Natural language query string
Returns:
List of matching document chunks with scores
"""
try:
if self.config.type == KnowledgeBaseType.LOCAL:
return self._search_local(query)
elif self.config.type == KnowledgeBaseType.DIFY:
return self._search_dify(query)
elif self.config.type == KnowledgeBaseType.RAPTOR:
return self._search_raptor(query)
elif self.config.type == KnowledgeBaseType.GRAPHRAG:
return self._search_graphrag(query)
else:
raise ValueError(f"Unsupported knowledge base type: {self.config.type}")
except Exception as e:
logger.error(f"Error in knowledge base search: {e}")
return []
def _search_local(self, query: str) -> List[Dict[str, Any]]:
"""Search using local retrieval pipeline"""
try:
response = requests.post(
f"{self.config.local_base_url}/search",
json={
"query": query,
"mode": "hybrid",
"top_k": self.config.local_top_k,
"rerank": True
}
)
response.raise_for_status()
results = []
data = response.json()
# The retrieval pipeline returns results in different keys based on mode
# For hybrid mode, we want the reranked_results
search_results = data.get("reranked_results", [])
# If no reranked results, fall back to dense or sparse results
if not search_results:
search_results = data.get("dense_results", [])
if not search_results:
search_results = data.get("sparse_results", [])
for item in search_results:
# Extract doc_id and chunk_id from the result
doc_id = item.get("doc_id", "")
chunk_id = item.get("chunk_id", f"{doc_id}_chunk_{len(results)}")
# Get the text field and score based on result type
text = item.get("text", "")
score = item.get("rerank_score", item.get("score", 0.0))
result = SearchResult(
doc_id=doc_id,
chunk_id=chunk_id,
text=text,
score=score,
metadata=item.get("metadata", {})
)
results.append(result.to_dict())
logger.info(f"Local search returned {len(results)} results")
return results
except requests.exceptions.RequestException as e:
logger.error(f"Error connecting to local retrieval pipeline: {e}")
return []
def _search_dify(self, query: str) -> List[Dict[str, Any]]:
"""Search using Dify API"""
if not self.config.dify_api_key:
logger.error("Dify API key not configured")
return []
try:
headers = {
"Authorization": f"Bearer {self.config.dify_api_key}",
"Content-Type": "application/json"
}
payload = {
"query": query,
"top_k": self.config.dify_top_k
}
if self.config.dify_dataset_id:
payload["dataset_id"] = self.config.dify_dataset_id
response = requests.post(
f"{self.config.dify_base_url}/datasets/search",
headers=headers,
json=payload
)
response.raise_for_status()
results = []
data = response.json()
for item in data.get("data", {}).get("records", []):
doc_id = item.get("document_id", "")
chunk_id = item.get("segment_id", f"{doc_id}_chunk_{len(results)}")
result = SearchResult(
doc_id=doc_id,
chunk_id=chunk_id,
text=item.get("content", ""),
score=item.get("score", 0.0),
metadata=item.get("metadata", {})
)
results.append(result.to_dict())
logger.info(f"Dify search returned {len(results)} results")
return results
except requests.exceptions.RequestException as e:
logger.error(f"Error connecting to Dify API: {e}")
return []
def _search_raptor(self, query: str) -> List[Dict[str, Any]]:
"""Search using RAPTOR tree-based index"""
try:
response = requests.post(
f"{self.config.raptor_base_url}/query",
json={
"query": query,
"index_type": "raptor",
"top_k": self.config.raptor_top_k
}
)
response.raise_for_status()
results = []
data = response.json()
for i, item in enumerate(data.get("results", [])):
# RAPTOR returns tree nodes with levels and summaries
doc_id = item.get("node_id", f"raptor_node_{i}")
chunk_id = f"{doc_id}_level_{item.get('level', 0)}"
# Use summary if available, otherwise use text
text_content = item.get("summary", item.get("text", ""))
result = SearchResult(
doc_id=doc_id,
chunk_id=chunk_id,
text=text_content,
score=item.get("score", 0.0),
metadata={
"level": item.get("level", 0),
"source": "raptor"
}
)
results.append(result.to_dict())
logger.info(f"RAPTOR search returned {len(results)} results")
return results
except requests.exceptions.RequestException as e:
logger.error(f"Error connecting to RAPTOR index: {e}")
return []
def _search_graphrag(self, query: str) -> List[Dict[str, Any]]:
"""Search using GraphRAG knowledge graph index"""
try:
response = requests.post(
f"{self.config.graphrag_base_url}/query",
json={
"query": query,
"index_type": "graphrag",
"top_k": self.config.graphrag_top_k,
"search_type": self.config.graphrag_search_type
}
)
response.raise_for_status()
results = []
data = response.json()
for i, item in enumerate(data.get("results", [])):
# GraphRAG returns entities or ***munities
result_type = item.get("type", "unknown")
if result_type == "entity":
doc_id = item.get("id", f"entity_{i}")
chunk_id = f"{doc_id}_{item.get('entity_type', 'unknown')}"
text_content = f"{item.get('name', '')}. {item.get('description', '')}"
metadata = {
"type": "entity",
"entity_type": item.get("entity_type"),
"related_entities": item.get("related_entities", [])
}
else: # ***munity
doc_id = item.get("id", f"***munity_{i}")
chunk_id = f"{doc_id}_level_{item.get('level', 0)}"
text_content = item.get("summary", "")
metadata = {
"type": "***munity",
"level": item.get("level", 0),
"entity_count": item.get("entity_count", 0),
"sample_entities": item.get("sample_entities", [])
}
result = SearchResult(
doc_id=doc_id,
chunk_id=chunk_id,
text=text_content,
score=item.get("score", 0.0),
metadata={**metadata, "source": "graphrag"}
)
results.append(result.to_dict())
logger.info(f"GraphRAG search returned {len(results)} results")
return results
except requests.exceptions.RequestException as e:
logger.error(f"Error connecting to GraphRAG index: {e}")
return []
def get_document(self, doc_id: str) -> Dict[str, Any]:
"""
Retrieve the entire document from the knowledge base.
Args:
doc_id: Document ID
Returns:
Full document content and metadata
"""
try:
# First check local document store
if doc_id in self.document_store:
return self.document_store[doc_id]
if self.config.type == KnowledgeBaseType.LOCAL:
return self._get_document_local(doc_id)
elif self.config.type == KnowledgeBaseType.DIFY:
return self._get_document_dify(doc_id)
elif self.config.type == KnowledgeBaseType.RAPTOR:
return self._get_document_raptor(doc_id)
elif self.config.type == KnowledgeBaseType.GRAPHRAG:
return self._get_document_graphrag(doc_id)
else:
raise ValueError(f"Unsupported knowledge base type: {self.config.type}")
except Exception as e:
logger.error(f"Error retrieving document {doc_id}: {e}")
return {"error": f"Document {doc_id} not found"}
def _get_document_local(self, doc_id: str) -> Dict[str, Any]:
"""Get document from local retrieval pipeline"""
try:
response = requests.get(
f"{self.config.local_base_url}/documents/{doc_id}"
)
if response.status_code == 404:
return {"error": f"Document {doc_id} not found"}
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error getting document from local pipeline: {e}")
return {"error": str(e)}
def _get_document_dify(self, doc_id: str) -> Dict[str, Any]:
"""Get document from Dify"""
if not self.config.dify_api_key:
return {"error": "Dify API key not configured"}
try:
headers = {
"Authorization": f"Bearer {self.config.dify_api_key}",
"Content-Type": "application/json"
}
response = requests.get(
f"{self.config.dify_base_url}/documents/{doc_id}",
headers=headers
)
if response.status_code == 404:
return {"error": f"Document {doc_id} not found"}
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error getting document from Dify: {e}")
return {"error": str(e)}
def _get_document_raptor(self, doc_id: str) -> Dict[str, Any]:
"""Get document/node from RAPTOR index"""
try:
# For RAPTOR, we perform a targeted search for the specific node
response = requests.post(
f"{self.config.raptor_base_url}/query",
json={
"query": f"node:{doc_id}", # Specific node query
"index_type": "raptor",
"top_k": 1
}
)
if response.status_code == 404:
return {"error": f"Document {doc_id} not found"}
response.raise_for_status()
data = response.json()
if data.get("results"):
result = data["results"][0]
return {
"doc_id": doc_id,
"content": result.get("text", ""),
"metadata": {
"summary": result.get("summary", ""),
"level": result.get("level", 0),
"source": "raptor"
}
}
return {"error": f"Document {doc_id} not found"}
except requests.exceptions.RequestException as e:
logger.error(f"Error getting document from RAPTOR: {e}")
return {"error": str(e)}
def _get_document_graphrag(self, doc_id: str) -> Dict[str, Any]:
"""Get entity or ***munity from GraphRAG index"""
try:
# For GraphRAG, we perform a targeted search for the specific entity/***munity
response = requests.post(
f"{self.config.graphrag_base_url}/query",
json={
"query": f"id:{doc_id}", # Specific ID query
"index_type": "graphrag",
"top_k": 1,
"search_type": "hybrid"
}
)
if response.status_code == 404:
return {"error": f"Document {doc_id} not found"}
response.raise_for_status()
data = response.json()
if data.get("results"):
result = data["results"][0]
content = ""
metadata = {"source": "graphrag"}
if result.get("type") == "entity":
content = f"{result.get('name', '')}\n\n{result.get('description', '')}"
metadata.update({
"type": "entity",
"entity_type": result.get("entity_type"),
"related_entities": result.get("related_entities", [])
})
else: # ***munity
content = result.get("summary", "")
metadata.update({
"type": "***munity",
"level": result.get("level", 0),
"entity_count": result.get("entity_count", 0),
"sample_entities": result.get("sample_entities", [])
})
return {
"doc_id": doc_id,
"content": content,
"metadata": metadata
}
return {"error": f"Document {doc_id} not found"}
except requests.exceptions.RequestException as e:
logger.error(f"Error getting document from GraphRAG: {e}")
return {"error": str(e)}
def add_document(self, doc_id: str, content: str, metadata: Optional[Dict] = None):
"""Add a document to the local store"""
self.document_store[doc_id] = {
"doc_id": doc_id,
"content": content,
"metadata": metadata or {}
}
self.save_document_store()
# Tool function definitions for agent
def get_tool_definitions() -> List[Dict[str, Any]]:
"""Get OpenAI-format tool definitions"""
return [
{
"type": "function",
"function": {
"name": "knowledge_base_search",
"description": "Search the knowledge base for relevant information using a natural language query. Returns top-matching document chunks.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language search query to find relevant information"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "get_document",
"description": "Retrieve the ***plete content of a specific document from the knowledge base using its document ID.",
"parameters": {
"type": "object",
"properties": {
"doc_id": {
"type": "string",
"description": "The unique identifier of the document to retrieve"
}
},
"required": ["doc_id"]
}
}
}
]
致谢˚𝜗𝜚🍒ᝰ.ᐟ
谢谢大家的阅读,以上是我对构建 Agentic RAG 系统几个核心组件的总结,欢迎大家在评论区指出,如果我的内容对你有帮助,可以点赞 , 收藏 ,大家的支持就是我坚持下去的动力!
“请赐予我平静,去接受我无法改变的 ;赐予我勇气,去改变我能改变的。