RAG系统架构模式
引言
RAG系统的架构设计是确保系统高效、可扩展和可维护的关键。不同的架构模式适用于不同的应用场景和需求。本文将深入探讨RAG系统的各种架构模式,包括单体架构、微服务架构、分层架构和事件驱动架构等,帮助开发者选择最适合的架构模式。
RAG系统架构概述
什么是RAG系统架构
RAG系统架构是指RAG系统各个组件之间的组织方式、交互模式和部署结构。它决定了系统的性能、可扩展性、可维护性和可靠性。
架构设计的目标
架构设计的原则
- 单一职责原则:每个组件只负责一个功能
- 开闭原则:对扩展开放,对修改关闭
- 依赖倒置原则:依赖抽象而不是具体实现
- 接口隔离原则:使用多个专门的接口
- 最小知识原则:组件之间只了解必要的信息
单体架构模式
1. 传统单体架构
架构特点
实现示例
python
# 单体RAG系统架构
class MonolithicRAGSystem:
def __init__(self):
# 初始化所有模块
self.document_processor = DocumentProcessor()
self.vectorizer = Vectorizer()
self.retriever = Retriever()
self.generator = Generator()
self.database = Database()
def process_query(self, query: str) -> str:
"""处理查询"""
# 1. 文档处理
processed_docs = self.document_processor.process_documents()
# 2. 向量化
vectors = self.vectorizer.vectorize(processed_docs)
# 3. 检索
relevant_docs = self.retriever.retrieve(query, vectors)
# 4. 生成
response = self.generator.generate(query, relevant_docs)
return response
def add_document(self, document: str) -> bool:
"""添加文档"""
try:
# 处理文档
processed_doc = self.document_processor.process_document(document)
# 向量化
vector = self.vectorizer.vectorize_document(processed_doc)
# 存储
self.database.store_document(processed_doc, vector)
return True
except Exception as e:
print(f"添加文档失败: {e}")
return False
def update_document(self, doc_id: str, document: str) -> bool:
"""更新文档"""
try:
# 删除旧文档
self.database.delete_document(doc_id)
# 添加新文档
return self.add_document(document)
except Exception as e:
print(f"更新文档失败: {e}")
return False
def delete_document(self, doc_id: str) -> bool:
"""删除文档"""
try:
self.database.delete_document(doc_id)
return True
except Exception as e:
print(f"删除文档失败: {e}")
return False
class DocumentProcessor:
def __init__(self):
self.parsers = {
'pdf': PDFParser(),
'docx': DocxParser(),
'txt': TextParser(),
'html': HTMLParser()
}
def process_document(self, document: str, doc_type: str = 'txt') -> Dict[str, any]:
"""处理文档"""
parser = self.parsers.get(doc_type, self.parsers['txt'])
return parser.parse(document)
def process_documents(self) -> List[Dict[str, any]]:
"""批量处理文档"""
documents = []
# 从数据库获取所有文档
raw_docs = self.database.get_all_documents()
for doc in raw_docs:
processed_doc = self.process_document(doc['content'], doc['type'])
documents.append(processed_doc)
return documents
class Vectorizer:
def __init__(self):
self.embedding_model = EmbeddingModel()
def vectorize_document(self, document: Dict[str, any]) -> List[float]:
"""向量化文档"""
text = document['content']
return self.embedding_model.encode(text)
def vectorize(self, documents: List[Dict[str, any]]) -> List[List[float]]:
"""批量向量化"""
vectors = []
for doc in documents:
vector = self.vectorize_document(doc)
vectors.append(vector)
return vectors
class Retriever:
def __init__(self):
self.similarity_calculator = SimilarityCalculator()
def retrieve(self, query: str, vectors: List[List[float]],
top_k: int = 5) -> List[Dict[str, any]]:
"""检索相关文档"""
# 向量化查询
query_vector = self.vectorizer.vectorize_document({'content': query})
# 计算相似度
similarities = []
for i, vector in enumerate(vectors):
similarity = self.similarity_calculator.calculate(query_vector, vector)
similarities.append((i, similarity))
# 排序并返回top_k
similarities.sort(key=lambda x: x[1], reverse=True)
top_indices = [idx for idx, _ in similarities[:top_k]]
return [self.database.get_document(idx) for idx in top_indices]
class Generator:
def __init__(self):
self.llm = LargeLanguageModel()
def generate(self, query: str, relevant_docs: List[Dict[str, any]]) -> str:
"""生成回答"""
# 构建上下文
context = self._build_context(relevant_docs)
# 构建提示
prompt = self._build_prompt(query, context)
# 生成回答
response = self.llm.generate(prompt)
return response
def _build_context(self, docs: List[Dict[str, any]]) -> str:
"""构建上下文"""
context_parts = []
for doc in docs:
context_parts.append(doc['content'])
return '\n\n'.join(context_parts)
def _build_prompt(self, query: str, context: str) -> str:
"""构建提示"""
return f"""
基于以下上下文回答问题:
上下文:
{context}
问题:
{query}
回答:
"""
class Database:
def __init__(self):
self.connection = self._create_connection()
def _create_connection(self):
"""创建数据库连接"""
# 实际应用中连接到真实数据库
return None
def store_document(self, document: Dict[str, any], vector: List[float]) -> str:
"""存储文档"""
doc_id = self._generate_id()
# 存储文档和向量
return doc_id
def get_document(self, doc_id: str) -> Dict[str, any]:
"""获取文档"""
# 从数据库获取文档
return {'id': doc_id, 'content': '示例文档内容'}
def get_all_documents(self) -> List[Dict[str, any]]:
"""获取所有文档"""
# 从数据库获取所有文档
return []
def delete_document(self, doc_id: str) -> bool:
"""删除文档"""
# 从数据库删除文档
return True
def _generate_id(self) -> str:
"""生成ID"""
import uuid
return str(uuid.uuid4())2. 模块化单体架构
架构特点
实现示例
python
# 模块化单体RAG系统
class ModularRAGSystem:
def __init__(self):
# 初始化各个服务模块
self.user_service = UserService()
self.document_service = DocumentService()
self.retrieval_service = RetrievalService()
self.generation_service = GenerationService()
self.api_gateway = APIGateway()
# 注册服务
self._register_services()
def _register_services(self):
"""注册服务"""
self.api_gateway.register_service('user', self.user_service)
self.api_gateway.register_service('document', self.document_service)
self.api_gateway.register_service('retrieval', self.retrieval_service)
self.api_gateway.register_service('generation', self.generation_service)
def process_query(self, query: str, user_id: str) -> Dict[str, any]:
"""处理查询"""
# 1. 用户认证
if not self.user_service.authenticate(user_id):
return {'error': '用户认证失败'}
# 2. 检索相关文档
relevant_docs = self.retrieval_service.retrieve(query)
# 3. 生成回答
response = self.generation_service.generate(query, relevant_docs)
# 4. 记录查询历史
self.user_service.record_query_history(user_id, query, response)
return {
'response': response,
'relevant_docs': relevant_docs,
'timestamp': time.time()
}
class APIGateway:
def __init__(self):
self.services = {}
self.middleware = []
def register_service(self, name: str, service):
"""注册服务"""
self.services[name] = service
def add_middleware(self, middleware):
"""添加中间件"""
self.middleware.append(middleware)
def handle_request(self, request: Dict[str, any]) -> Dict[str, any]:
"""处理请求"""
# 应用中间件
for middleware in self.middleware:
request = middleware.process_request(request)
# 路由到相应服务
service_name = request.get('service')
if service_name in self.services:
service = self.services[service_name]
response = service.handle_request(request)
else:
response = {'error': f'服务 {service_name} 不存在'}
# 应用响应中间件
for middleware in reversed(self.middleware):
response = middleware.process_response(response)
return response
class UserService:
def __init__(self):
self.user_db = UserDatabase()
self.auth_manager = AuthManager()
def authenticate(self, user_id: str) -> bool:
"""用户认证"""
return self.auth_manager.authenticate(user_id)
def record_query_history(self, user_id: str, query: str, response: str):
"""记录查询历史"""
self.user_db.add_query_history(user_id, query, response)
def handle_request(self, request: Dict[str, any]) -> Dict[str, any]:
"""处理请求"""
action = request.get('action')
if action == 'authenticate':
user_id = request.get('user_id')
return {'authenticated': self.authenticate(user_id)}
elif action == 'get_history':
user_id = request.get('user_id')
history = self.user_db.get_query_history(user_id)
return {'history': history}
else:
return {'error': '未知操作'}
class DocumentService:
def __init__(self):
self.document_db = DocumentDatabase()
self.processor = DocumentProcessor()
def add_document(self, document: Dict[str, any]) -> str:
"""添加文档"""
# 处理文档
processed_doc = self.processor.process_document(document)
# 存储文档
doc_id = self.document_db.store_document(processed_doc)
return doc_id
def handle_request(self, request: Dict[str, any]) -> Dict[str, any]:
"""处理请求"""
action = request.get('action')
if action == 'add_document':
document = request.get('document')
doc_id = self.add_document(document)
return {'doc_id': doc_id}
elif action == 'get_document':
doc_id = request.get('doc_id')
document = self.document_db.get_document(doc_id)
return {'document': document}
else:
return {'error': '未知操作'}
class RetrievalService:
def __init__(self):
self.vector_db = VectorDatabase()
self.retriever = Retriever()
def retrieve(self, query: str, top_k: int = 5) -> List[Dict[str, any]]:
"""检索相关文档"""
# 向量化查询
query_vector = self.retriever.vectorize_query(query)
# 检索相似文档
similar_docs = self.vector_db.search(query_vector, top_k)
return similar_docs
def handle_request(self, request: Dict[str, any]) -> Dict[str, any]:
"""处理请求"""
action = request.get('action')
if action == 'retrieve':
query = request.get('query')
top_k = request.get('top_k', 5)
docs = self.retrieve(query, top_k)
return {'documents': docs}
else:
return {'error': '未知操作'}
class GenerationService:
def __init__(self):
self.llm = LargeLanguageModel()
self.prompt_builder = PromptBuilder()
def generate(self, query: str, relevant_docs: List[Dict[str, any]]) -> str:
"""生成回答"""
# 构建提示
prompt = self.prompt_builder.build_prompt(query, relevant_docs)
# 生成回答
response = self.llm.generate(prompt)
return response
def handle_request(self, request: Dict[str, any]) -> Dict[str, any]:
"""处理请求"""
action = request.get('action')
if action == 'generate':
query = request.get('query')
relevant_docs = request.get('relevant_docs', [])
response = self.generate(query, relevant_docs)
return {'response': response}
else:
return {'error': '未知操作'}微服务架构模式
1. 服务拆分策略
服务划分原则
实现示例
python
# 微服务RAG系统架构
class MicroserviceRAGSystem:
def __init__(self):
# 服务发现
self.service_registry = ServiceRegistry()
# 初始化服务客户端
self.user_client = UserServiceClient()
self.document_client = DocumentServiceClient()
self.vector_client = VectorServiceClient()
self.retrieval_client = RetrievalServiceClient()
self.generation_client = GenerationServiceClient()
self.notification_client = NotificationServiceClient()
# 注册服务
self._register_services()
def _register_services(self):
"""注册服务"""
services = [
('user-service', self.user_client),
('document-service', self.document_client),
('vector-service', self.vector_client),
('retrieval-service', self.retrieval_client),
('generation-service', self.generation_client),
('notification-service', self.notification_client)
]
for service_name, service_client in services:
self.service_registry.register(service_name, service_client)
def process_query(self, query: str, user_id: str) -> Dict[str, any]:
"""处理查询"""
try:
# 1. 用户认证
auth_result = self.user_client.authenticate(user_id)
if not auth_result['authenticated']:
return {'error': '用户认证失败'}
# 2. 检索相关文档
retrieval_result = self.retrieval_client.retrieve(query)
relevant_docs = retrieval_result['documents']
# 3. 生成回答
generation_result = self.generation_client.generate(query, relevant_docs)
response = generation_result['response']
# 4. 记录查询历史
self.user_client.record_query_history(user_id, query, response)
# 5. 发送通知(如果需要)
if self._should_send_notification(user_id):
self.notification_client.send_notification(user_id, response)
return {
'response': response,
'relevant_docs': relevant_docs,
'timestamp': time.time()
}
except Exception as e:
# 错误处理
return {'error': f'处理查询失败: {str(e)}'}
def _should_send_notification(self, user_id: str) -> bool:
"""判断是否需要发送通知"""
# 根据用户偏好和查询类型判断
return False
class ServiceRegistry:
def __init__(self):
self.services = {}
self.health_checker = HealthChecker()
def register(self, service_name: str, service_client):
"""注册服务"""
self.services[service_name] = {
'client': service_client,
'status': 'healthy',
'last_check': time.time()
}
def get_service(self, service_name: str):
"""获取服务"""
if service_name in self.services:
service_info = self.services[service_name]
if self.health_checker.is_healthy(service_info['client']):
return service_info['client']
else:
# 服务不健康,尝试故障转移
return self._failover_service(service_name)
else:
raise Exception(f'服务 {service_name} 不存在')
def _failover_service(self, service_name: str):
"""故障转移"""
# 实现故障转移逻辑
pass
class UserServiceClient:
def __init__(self):
self.service_url = "http://user-service:8001"
self.http_client = HttpClient()
def authenticate(self, user_id: str) -> Dict[str, any]:
"""用户认证"""
response = self.http_client.post(f"{self.service_url}/authenticate", {
'user_id': user_id
})
return response.json()
def record_query_history(self, user_id: str, query: str, response: str):
"""记录查询历史"""
self.http_client.post(f"{self.service_url}/history", {
'user_id': user_id,
'query': query,
'response': response
})
def get_user_preferences(self, user_id: str) -> Dict[str, any]:
"""获取用户偏好"""
response = self.http_client.get(f"{self.service_url}/preferences/{user_id}")
return response.json()
class DocumentServiceClient:
def __init__(self):
self.service_url = "http://document-service:8002"
self.http_client = HttpClient()
def add_document(self, document: Dict[str, any]) -> str:
"""添加文档"""
response = self.http_client.post(f"{self.service_url}/documents", document)
return response.json()['doc_id']
def get_document(self, doc_id: str) -> Dict[str, any]:
"""获取文档"""
response = self.http_client.get(f"{self.service_url}/documents/{doc_id}")
return response.json()
def update_document(self, doc_id: str, document: Dict[str, any]) -> bool:
"""更新文档"""
response = self.http_client.put(f"{self.service_url}/documents/{doc_id}", document)
return response.status_code == 200
def delete_document(self, doc_id: str) -> bool:
"""删除文档"""
response = self.http_client.delete(f"{self.service_url}/documents/{doc_id}")
return response.status_code == 200
class VectorServiceClient:
def __init__(self):
self.service_url = "http://vector-service:8003"
self.http_client = HttpClient()
def vectorize_document(self, document: Dict[str, any]) -> List[float]:
"""向量化文档"""
response = self.http_client.post(f"{self.service_url}/vectorize", document)
return response.json()['vector']
def vectorize_query(self, query: str) -> List[float]:
"""向量化查询"""
response = self.http_client.post(f"{self.service_url}/vectorize-query", {
'query': query
})
return response.json()['vector']
class RetrievalServiceClient:
def __init__(self):
self.service_url = "http://retrieval-service:8004"
self.http_client = HttpClient()
def retrieve(self, query: str, top_k: int = 5) -> Dict[str, any]:
"""检索相关文档"""
response = self.http_client.post(f"{self.service_url}/retrieve", {
'query': query,
'top_k': top_k
})
return response.json()
class GenerationServiceClient:
def __init__(self):
self.service_url = "http://generation-service:8005"
self.http_client = HttpClient()
def generate(self, query: str, relevant_docs: List[Dict[str, any]]) -> Dict[str, any]:
"""生成回答"""
response = self.http_client.post(f"{self.service_url}/generate", {
'query': query,
'relevant_docs': relevant_docs
})
return response.json()
class NotificationServiceClient:
def __init__(self):
self.service_url = "http://notification-service:8006"
self.http_client = HttpClient()
def send_notification(self, user_id: str, message: str):
"""发送通知"""
self.http_client.post(f"{self.service_url}/notify", {
'user_id': user_id,
'message': message
})
class HttpClient:
def __init__(self):
self.session = requests.Session()
self.timeout = 30
def get(self, url: str, params: Dict = None) -> requests.Response:
"""GET请求"""
return self.session.get(url, params=params, timeout=self.timeout)
def post(self, url: str, data: Dict = None) -> requests.Response:
"""POST请求"""
return self.session.post(url, json=data, timeout=self.timeout)
def put(self, url: str, data: Dict = None) -> requests.Response:
"""PUT请求"""
return self.session.put(url, json=data, timeout=self.timeout)
def delete(self, url: str) -> requests.Response:
"""DELETE请求"""
return self.session.delete(url, timeout=self.timeout)2. 服务间通信
同步通信
python
class SynchronousCommunication:
def __init__(self):
self.http_client = HttpClient()
self.circuit_breaker = CircuitBreaker()
def call_service(self, service_name: str, endpoint: str,
data: Dict[str, any]) -> Dict[str, any]:
"""调用服务"""
try:
# 检查熔断器状态
if not self.circuit_breaker.can_execute(service_name):
return {'error': '服务熔断'}
# 调用服务
response = self.http_client.post(f"{service_name}{endpoint}", data)
# 记录成功
self.circuit_breaker.record_success(service_name)
return response.json()
except Exception as e:
# 记录失败
self.circuit_breaker.record_failure(service_name)
return {'error': f'调用服务失败: {str(e)}'}
def call_service_with_retry(self, service_name: str, endpoint: str,
data: Dict[str, any], max_retries: int = 3) -> Dict[str, any]:
"""带重试的服务调用"""
for attempt in range(max_retries):
result = self.call_service(service_name, endpoint, data)
if 'error' not in result:
return result
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
return result
class CircuitBreaker:
def __init__(self):
self.service_states = {}
self.failure_threshold = 5
self.timeout = 60
def can_execute(self, service_name: str) -> bool:
"""检查是否可以执行"""
if service_name not in self.service_states:
self.service_states[service_name] = {
'state': 'closed',
'failure_count': 0,
'last_failure_time': 0
}
state = self.service_states[service_name]
if state['state'] == 'open':
# 检查是否应该尝试恢复
if time.time() - state['last_failure_time'] > self.timeout:
state['state'] = 'half-open'
return True
return False
return True
def record_success(self, service_name: str):
"""记录成功"""
if service_name in self.service_states:
state = self.service_states[service_name]
state['state'] = 'closed'
state['failure_count'] = 0
def record_failure(self, service_name: str):
"""记录失败"""
if service_name not in self.service_states:
self.service_states[service_name] = {
'state': 'closed',
'failure_count': 0,
'last_failure_time': 0
}
state = self.service_states[service_name]
state['failure_count'] += 1
state['last_failure_time'] = time.time()
if state['failure_count'] >= self.failure_threshold:
state['state'] = 'open'异步通信
python
import asyncio
import aiohttp
from typing import Dict, Any, Callable
class AsynchronousCommunication:
def __init__(self):
self.session = None
self.message_queue = asyncio.Queue()
self.event_handlers = {}
async def initialize(self):
"""初始化异步通信"""
self.session = aiohttp.ClientSession()
# 启动消息处理循环
asyncio.create_task(self._message_loop())
async def call_service_async(self, service_name: str, endpoint: str,
data: Dict[str, any]) -> Dict[str, any]:
"""异步调用服务"""
try:
url = f"{service_name}{endpoint}"
async with self.session.post(url, json=data) as response:
result = await response.json()
return result
except Exception as e:
return {'error': f'异步调用服务失败: {str(e)}'}
async def publish_event(self, event_type: str, data: Dict[str, any]):
"""发布事件"""
event = {
'type': event_type,
'data': data,
'timestamp': time.time()
}
await self.message_queue.put(event)
async def subscribe_event(self, event_type: str, handler: Callable):
"""订阅事件"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
async def _message_loop(self):
"""消息处理循环"""
while True:
try:
event = await self.message_queue.get()
await self._handle_event(event)
except Exception as e:
print(f"处理事件失败: {e}")
async def _handle_event(self, event: Dict[str, any]):
"""处理事件"""
event_type = event['type']
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
try:
await handler(event['data'])
except Exception as e:
print(f"事件处理器执行失败: {e}")
async def close(self):
"""关闭异步通信"""
if self.session:
await self.session.close()分层架构模式
1. 三层架构
架构特点
实现示例
python
# 三层架构RAG系统
class ThreeTierRAGSystem:
def __init__(self):
# 表示层
self.presentation_layer = PresentationLayer()
# 业务逻辑层
self.business_logic_layer = BusinessLogicLayer()
# 数据访问层
self.data_access_layer = DataAccessLayer()
# 连接各层
self._connect_layers()
def _connect_layers(self):
"""连接各层"""
self.presentation_layer.set_business_logic(self.business_logic_layer)
self.business_logic_layer.set_data_access(self.data_access_layer)
def start(self):
"""启动系统"""
self.presentation_layer.start()
class PresentationLayer:
def __init__(self):
self.business_logic = None
self.web_server = None
def set_business_logic(self, business_logic):
"""设置业务逻辑层"""
self.business_logic = business_logic
def start(self):
"""启动表示层"""
# 启动Web服务器
self.web_server = WebServer()
self.web_server.set_presentation_layer(self)
self.web_server.start()
def handle_request(self, request: Dict[str, any]) -> Dict[str, any]:
"""处理请求"""
# 验证请求
if not self._validate_request(request):
return {'error': '请求验证失败'}
# 调用业务逻辑层
result = self.business_logic.process_request(request)
# 格式化响应
return self._format_response(result)
def _validate_request(self, request: Dict[str, any]) -> bool:
"""验证请求"""
required_fields = ['action', 'data']
return all(field in request for field in required_fields)
def _format_response(self, result: Dict[str, any]) -> Dict[str, any]:
"""格式化响应"""
return {
'status': 'success' if 'error' not in result else 'error',
'data': result,
'timestamp': time.time()
}
class BusinessLogicLayer:
def __init__(self):
self.data_access = None
self.user_manager = UserManager()
self.document_manager = DocumentManager()
self.retrieval_manager = RetrievalManager()
self.generation_manager = GenerationManager()
def set_data_access(self, data_access):
"""设置数据访问层"""
self.data_access = data_access
def process_request(self, request: Dict[str, any]) -> Dict[str, any]:
"""处理请求"""
action = request['action']
data = request['data']
if action == 'query':
return self._process_query(data)
elif action == 'add_document':
return self._process_add_document(data)
elif action == 'update_document':
return self._process_update_document(data)
elif action == 'delete_document':
return self._process_delete_document(data)
else:
return {'error': '未知操作'}
def _process_query(self, data: Dict[str, any]) -> Dict[str, any]:
"""处理查询"""
query = data['query']
user_id = data['user_id']
# 1. 用户认证
if not self.user_manager.authenticate(user_id):
return {'error': '用户认证失败'}
# 2. 检索相关文档
relevant_docs = self.retrieval_manager.retrieve(query)
# 3. 生成回答
response = self.generation_manager.generate(query, relevant_docs)
# 4. 记录查询历史
self.user_manager.record_query_history(user_id, query, response)
return {
'response': response,
'relevant_docs': relevant_docs
}
def _process_add_document(self, data: Dict[str, any]) -> Dict[str, any]:
"""处理添加文档"""
document = data['document']
# 处理文档
processed_doc = self.document_manager.process_document(document)
# 存储文档
doc_id = self.data_access.store_document(processed_doc)
return {'doc_id': doc_id}
def _process_update_document(self, data: Dict[str, any]) -> Dict[str, any]:
"""处理更新文档"""
doc_id = data['doc_id']
document = data['document']
# 更新文档
success = self.data_access.update_document(doc_id, document)
return {'success': success}
def _process_delete_document(self, data: Dict[str, any]) -> Dict[str, any]:
"""处理删除文档"""
doc_id = data['doc_id']
# 删除文档
success = self.data_access.delete_document(doc_id)
return {'success': success}
class DataAccessLayer:
def __init__(self):
self.user_db = UserDatabase()
self.document_db = DocumentDatabase()
self.vector_db = VectorDatabase()
self.index_db = IndexDatabase()
def store_document(self, document: Dict[str, any]) -> str:
"""存储文档"""
# 存储到文档数据库
doc_id = self.document_db.store(document)
# 存储到向量数据库
vector = document['vector']
self.vector_db.store(doc_id, vector)
# 更新索引
self.index_db.update_index(doc_id, document)
return doc_id
def get_document(self, doc_id: str) -> Dict[str, any]:
"""获取文档"""
return self.document_db.get(doc_id)
def update_document(self, doc_id: str, document: Dict[str, any]) -> bool:
"""更新文档"""
# 更新文档数据库
success = self.document_db.update(doc_id, document)
if success:
# 更新向量数据库
vector = document['vector']
self.vector_db.update(doc_id, vector)
# 更新索引
self.index_db.update_index(doc_id, document)
return success
def delete_document(self, doc_id: str) -> bool:
"""删除文档"""
# 从文档数据库删除
success = self.document_db.delete(doc_id)
if success:
# 从向量数据库删除
self.vector_db.delete(doc_id)
# 从索引删除
self.index_db.delete_index(doc_id)
return success
def search_documents(self, query_vector: List[float], top_k: int = 5) -> List[Dict[str, any]]:
"""搜索文档"""
# 从向量数据库搜索
similar_vectors = self.vector_db.search(query_vector, top_k)
# 获取文档详情
documents = []
for vector_info in similar_vectors:
doc_id = vector_info['doc_id']
document = self.document_db.get(doc_id)
if document:
documents.append(document)
return documents2. 四层架构
架构特点
实现示例
python
# 四层架构RAG系统
class FourTierRAGSystem:
def __init__(self):
# 表示层
self.presentation_layer = PresentationLayer()
# 业务逻辑层
self.business_logic_layer = BusinessLogicLayer()
# 服务层
self.service_layer = ServiceLayer()
# 数据访问层
self.data_access_layer = DataAccessLayer()
# 连接各层
self._connect_layers()
def _connect_layers(self):
"""连接各层"""
self.presentation_layer.set_business_logic(self.business_logic_layer)
self.business_logic_layer.set_service_layer(self.service_layer)
self.service_layer.set_data_access(self.data_access_layer)
def start(self):
"""启动系统"""
self.presentation_layer.start()
class ServiceLayer:
def __init__(self):
self.data_access = None
self.user_service = UserService()
self.document_service = DocumentService()
self.retrieval_service = RetrievalService()
self.generation_service = GenerationService()
def set_data_access(self, data_access):
"""设置数据访问层"""
self.data_access = data_access
def process_request(self, request: Dict[str, any]) -> Dict[str, any]:
"""处理请求"""
service_type = request['service_type']
action = request['action']
data = request['data']
if service_type == 'user':
return self.user_service.process_action(action, data)
elif service_type == 'document':
return self.document_service.process_action(action, data)
elif service_type == 'retrieval':
return self.retrieval_service.process_action(action, data)
elif service_type == 'generation':
return self.generation_service.process_action(action, data)
else:
return {'error': '未知服务类型'}
def get_service(self, service_type: str):
"""获取服务"""
services = {
'user': self.user_service,
'document': self.document_service,
'retrieval': self.retrieval_service,
'generation': self.generation_service
}
return services.get(service_type)
class UserService:
def __init__(self):
self.data_access = None
def set_data_access(self, data_access):
"""设置数据访问层"""
self.data_access = data_access
def process_action(self, action: str, data: Dict[str, any]) -> Dict[str, any]:
"""处理动作"""
if action == 'authenticate':
return self.authenticate(data['user_id'])
elif action == 'get_profile':
return self.get_profile(data['user_id'])
elif action == 'update_profile':
return self.update_profile(data['user_id'], data['profile'])
elif action == 'get_history':
return self.get_query_history(data['user_id'])
else:
return {'error': '未知动作'}
def authenticate(self, user_id: str) -> Dict[str, any]:
"""用户认证"""
user = self.data_access.get_user(user_id)
if user:
return {'authenticated': True, 'user': user}
else:
return {'authenticated': False}
def get_profile(self, user_id: str) -> Dict[str, any]:
"""获取用户档案"""
profile = self.data_access.get_user_profile(user_id)
return {'profile': profile}
def update_profile(self, user_id: str, profile: Dict[str, any]) -> Dict[str, any]:
"""更新用户档案"""
success = self.data_access.update_user_profile(user_id, profile)
return {'success': success}
def get_query_history(self, user_id: str) -> Dict[str, any]:
"""获取查询历史"""
history = self.data_access.get_user_query_history(user_id)
return {'history': history}
class DocumentService:
def __init__(self):
self.data_access = None
self.processor = DocumentProcessor()
def set_data_access(self, data_access):
"""设置数据访问层"""
self.data_access = data_access
def process_action(self, action: str, data: Dict[str, any]) -> Dict[str, any]:
"""处理动作"""
if action == 'add_document':
return self.add_document(data['document'])
elif action == 'get_document':
return self.get_document(data['doc_id'])
elif action == 'update_document':
return self.update_document(data['doc_id'], data['document'])
elif action == 'delete_document':
return self.delete_document(data['doc_id'])
elif action == 'search_documents':
return self.search_documents(data['query'])
else:
return {'error': '未知动作'}
def add_document(self, document: Dict[str, any]) -> Dict[str, any]:
"""添加文档"""
# 处理文档
processed_doc = self.processor.process_document(document)
# 存储文档
doc_id = self.data_access.store_document(processed_doc)
return {'doc_id': doc_id}
def get_document(self, doc_id: str) -> Dict[str, any]:
"""获取文档"""
document = self.data_access.get_document(doc_id)
return {'document': document}
def update_document(self, doc_id: str, document: Dict[str, any]) -> Dict[str, any]:
"""更新文档"""
# 处理文档
processed_doc = self.processor.process_document(document)
# 更新文档
success = self.data_access.update_document(doc_id, processed_doc)
return {'success': success}
def delete_document(self, doc_id: str) -> Dict[str, any]:
"""删除文档"""
success = self.data_access.delete_document(doc_id)
return {'success': success}
def search_documents(self, query: str) -> Dict[str, any]:
"""搜索文档"""
documents = self.data_access.search_documents(query)
return {'documents': documents}
class RetrievalService:
def __init__(self):
self.data_access = None
self.vectorizer = Vectorizer()
self.similarity_calculator = SimilarityCalculator()
def set_data_access(self, data_access):
"""设置数据访问层"""
self.data_access = data_access
def process_action(self, action: str, data: Dict[str, any]) -> Dict[str, any]:
"""处理动作"""
if action == 'retrieve':
return self.retrieve(data['query'], data.get('top_k', 5))
elif action == 'vectorize_query':
return self.vectorize_query(data['query'])
elif action == 'calculate_similarity':
return self.calculate_similarity(data['vector1'], data['vector2'])
else:
return {'error': '未知动作'}
def retrieve(self, query: str, top_k: int = 5) -> Dict[str, any]:
"""检索相关文档"""
# 向量化查询
query_vector = self.vectorizer.vectorize_query(query)
# 搜索相似文档
similar_docs = self.data_access.search_documents(query_vector, top_k)
return {'documents': similar_docs}
def vectorize_query(self, query: str) -> Dict[str, any]:
"""向量化查询"""
vector = self.vectorizer.vectorize_query(query)
return {'vector': vector}
def calculate_similarity(self, vector1: List[float], vector2: List[float]) -> Dict[str, any]:
"""计算相似度"""
similarity = self.similarity_calculator.calculate(vector1, vector2)
return {'similarity': similarity}
class GenerationService:
def __init__(self):
self.data_access = None
self.llm = LargeLanguageModel()
self.prompt_builder = PromptBuilder()
def set_data_access(self, data_access):
"""设置数据访问层"""
self.data_access = data_access
def process_action(self, action: str, data: Dict[str, any]) -> Dict[str, any]:
"""处理动作"""
if action == 'generate':
return self.generate(data['query'], data['relevant_docs'])
elif action == 'build_prompt':
return self.build_prompt(data['query'], data['context'])
else:
return {'error': '未知动作'}
def generate(self, query: str, relevant_docs: List[Dict[str, any]]) -> Dict[str, any]:
"""生成回答"""
# 构建提示
prompt = self.prompt_builder.build_prompt(query, relevant_docs)
# 生成回答
response = self.llm.generate(prompt)
return {'response': response}
def build_prompt(self, query: str, context: str) -> Dict[str, any]:
"""构建提示"""
prompt = self.prompt_builder.build_prompt(query, context)
return {'prompt': prompt}事件驱动架构模式
1. 事件驱动架构
架构特点
实现示例
python
# 事件驱动RAG系统架构
class EventDrivenRAGSystem:
def __init__(self):
# 事件总线
self.event_bus = EventBus()
# 事件生产者
self.event_producers = {
'user': UserEventProducer(),
'document': DocumentEventProducer(),
'system': SystemEventProducer()
}
# 事件消费者
self.event_consumers = {
'document_processing': DocumentProcessingConsumer(),
'vectorization': VectorizationConsumer(),
'indexing': IndexingConsumer(),
'notification': NotificationConsumer()
}
# 注册事件
self._register_events()
def _register_events(self):
"""注册事件"""
# 注册事件生产者
for producer_name, producer in self.event_producers.items():
producer.set_event_bus(self.event_bus)
# 注册事件消费者
for consumer_name, consumer in self.event_consumers.items():
consumer.set_event_bus(self.event_bus)
self.event_bus.subscribe(consumer_name, consumer)
def start(self):
"""启动系统"""
# 启动事件总线
self.event_bus.start()
# 启动事件消费者
for consumer in self.event_consumers.values():
consumer.start()
def process_query(self, query: str, user_id: str) -> Dict[str, any]:
"""处理查询"""
# 发布查询事件
query_event = {
'type': 'query',
'data': {
'query': query,
'user_id': user_id,
'timestamp': time.time()
}
}
self.event_bus.publish('query', query_event)
# 等待处理结果
result = self._wait_for_result(query_event['data']['timestamp'])
return result
def _wait_for_result(self, timestamp: float) -> Dict[str, any]:
"""等待处理结果"""
# 实际应用中实现结果等待逻辑
return {'response': '示例回答'}
class EventBus:
def __init__(self):
self.subscribers = {}
self.event_queue = asyncio.Queue()
self.event_store = EventStore()
self.running = False
def subscribe(self, event_type: str, consumer):
"""订阅事件"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(consumer)
def publish(self, event_type: str, event: Dict[str, any]):
"""发布事件"""
# 存储事件
self.event_store.store_event(event)
# 添加到事件队列
asyncio.create_task(self.event_queue.put(event))
async def start(self):
"""启动事件总线"""
self.running = True
asyncio.create_task(self._event_loop())
async def _event_loop(self):
"""事件处理循环"""
while self.running:
try:
event = await self.event_queue.get()
await self._process_event(event)
except Exception as e:
print(f"处理事件失败: {e}")
async def _process_event(self, event: Dict[str, any]):
"""处理事件"""
event_type = event['type']
if event_type in self.subscribers:
for consumer in self.subscribers[event_type]:
try:
await consumer.handle_event(event)
except Exception as e:
print(f"消费者处理事件失败: {e}")
def stop(self):
"""停止事件总线"""
self.running = False
class EventStore:
def __init__(self):
self.events = []
def store_event(self, event: Dict[str, any]):
"""存储事件"""
event['id'] = self._generate_event_id()
event['stored_at'] = time.time()
self.events.append(event)
def get_events(self, event_type: str = None, limit: int = 100) -> List[Dict[str, any]]:
"""获取事件"""
events = self.events
if event_type:
events = [e for e in events if e['type'] == event_type]
return events[-limit:]
def _generate_event_id(self) -> str:
"""生成事件ID"""
import uuid
return str(uuid.uuid4())
class UserEventProducer:
def __init__(self):
self.event_bus = None
def set_event_bus(self, event_bus):
"""设置事件总线"""
self.event_bus = event_bus
def publish_user_login(self, user_id: str):
"""发布用户登录事件"""
event = {
'type': 'user_login',
'data': {
'user_id': user_id,
'timestamp': time.time()
}
}
self.event_bus.publish('user_login', event)
def publish_user_query(self, user_id: str, query: str):
"""发布用户查询事件"""
event = {
'type': 'user_query',
'data': {
'user_id': user_id,
'query': query,
'timestamp': time.time()
}
}
self.event_bus.publish('user_query', event)
class DocumentEventProducer:
def __init__(self):
self.event_bus = None
def set_event_bus(self, event_bus):
"""设置事件总线"""
self.event_bus = event_bus
def publish_document_added(self, doc_id: str, document: Dict[str, any]):
"""发布文档添加事件"""
event = {
'type': 'document_added',
'data': {
'doc_id': doc_id,
'document': document,
'timestamp': time.time()
}
}
self.event_bus.publish('document_added', event)
def publish_document_updated(self, doc_id: str, document: Dict[str, any]):
"""发布文档更新事件"""
event = {
'type': 'document_updated',
'data': {
'doc_id': doc_id,
'document': document,
'timestamp': time.time()
}
}
self.event_bus.publish('document_updated', event)
def publish_document_deleted(self, doc_id: str):
"""发布文档删除事件"""
event = {
'type': 'document_deleted',
'data': {
'doc_id': doc_id,
'timestamp': time.time()
}
}
self.event_bus.publish('document_deleted', event)
class SystemEventProducer:
def __init__(self):
self.event_bus = None
def set_event_bus(self, event_bus):
"""设置事件总线"""
self.event_bus = event_bus
def publish_system_error(self, error: str, component: str):
"""发布系统错误事件"""
event = {
'type': 'system_error',
'data': {
'error': error,
'component': component,
'timestamp': time.time()
}
}
self.event_bus.publish('system_error', event)
def publish_system_health(self, health_status: Dict[str, any]):
"""发布系统健康状态事件"""
event = {
'type': 'system_health',
'data': {
'health_status': health_status,
'timestamp': time.time()
}
}
self.event_bus.publish('system_health', event)
class DocumentProcessingConsumer:
def __init__(self):
self.event_bus = None
self.processor = DocumentProcessor()
def set_event_bus(self, event_bus):
"""设置事件总线"""
self.event_bus = event_bus
async def handle_event(self, event: Dict[str, any]):
"""处理事件"""
event_type = event['type']
if event_type == 'document_added':
await self._handle_document_added(event['data'])
elif event_type == 'document_updated':
await self._handle_document_updated(event['data'])
elif event_type == 'document_deleted':
await self._handle_document_deleted(event['data'])
async def _handle_document_added(self, data: Dict[str, any]):
"""处理文档添加事件"""
doc_id = data['doc_id']
document = data['document']
# 处理文档
processed_doc = self.processor.process_document(document)
# 发布文档处理完成事件
processed_event = {
'type': 'document_processed',
'data': {
'doc_id': doc_id,
'processed_document': processed_doc,
'timestamp': time.time()
}
}
self.event_bus.publish('document_processed', processed_event)
async def _handle_document_updated(self, data: Dict[str, any]):
"""处理文档更新事件"""
# 类似文档添加的处理逻辑
pass
async def _handle_document_deleted(self, data: Dict[str, any]):
"""处理文档删除事件"""
doc_id = data['doc_id']
# 发布文档删除完成事件
deleted_event = {
'type': 'document_deleted',
'data': {
'doc_id': doc_id,
'timestamp': time.time()
}
}
self.event_bus.publish('document_deleted', deleted_event)
def start(self):
"""启动消费者"""
# 启动消费者逻辑
pass
class VectorizationConsumer:
def __init__(self):
self.event_bus = None
self.vectorizer = Vectorizer()
def set_event_bus(self, event_bus):
"""设置事件总线"""
self.event_bus = event_bus
async def handle_event(self, event: Dict[str, any]):
"""处理事件"""
event_type = event['type']
if event_type == 'document_processed':
await self._handle_document_processed(event['data'])
async def _handle_document_processed(self, data: Dict[str, any]):
"""处理文档处理完成事件"""
doc_id = data['doc_id']
processed_doc = data['processed_document']
# 向量化文档
vector = self.vectorizer.vectorize_document(processed_doc)
# 发布向量化完成事件
vectorized_event = {
'type': 'document_vectorized',
'data': {
'doc_id': doc_id,
'vector': vector,
'timestamp': time.time()
}
}
self.event_bus.publish('document_vectorized', vectorized_event)
def start(self):
"""启动消费者"""
pass
class IndexingConsumer:
def __init__(self):
self.event_bus = None
self.indexer = Indexer()
def set_event_bus(self, event_bus):
"""设置事件总线"""
self.event_bus = event_bus
async def handle_event(self, event: Dict[str, any]):
"""处理事件"""
event_type = event['type']
if event_type == 'document_vectorized':
await self._handle_document_vectorized(event['data'])
elif event_type == 'document_deleted':
await self._handle_document_deleted(event['data'])
async def _handle_document_vectorized(self, data: Dict[str, any]):
"""处理文档向量化完成事件"""
doc_id = data['doc_id']
vector = data['vector']
# 更新索引
self.indexer.update_index(doc_id, vector)
# 发布索引更新完成事件
indexed_event = {
'type': 'document_indexed',
'data': {
'doc_id': doc_id,
'timestamp': time.time()
}
}
self.event_bus.publish('document_indexed', indexed_event)
async def _handle_document_deleted(self, data: Dict[str, any]):
"""处理文档删除事件"""
doc_id = data['doc_id']
# 从索引删除
self.indexer.delete_index(doc_id)
def start(self):
"""启动消费者"""
pass
class NotificationConsumer:
def __init__(self):
self.event_bus = None
self.notifier = Notifier()
def set_event_bus(self, event_bus):
"""设置事件总线"""
self.event_bus = event_bus
async def handle_event(self, event: Dict[str, any]):
"""处理事件"""
event_type = event['type']
if event_type == 'user_query':
await self._handle_user_query(event['data'])
elif event_type == 'system_error':
await self._handle_system_error(event['data'])
async def _handle_user_query(self, data: Dict[str, any]):
"""处理用户查询事件"""
user_id = data['user_id']
query = data['query']
# 发送查询通知
self.notifier.send_query_notification(user_id, query)
async def _handle_system_error(self, data: Dict[str, any]):
"""处理系统错误事件"""
error = data['error']
component = data['component']
# 发送错误通知
self.notifier.send_error_notification(error, component)
def start(self):
"""启动消费者"""
pass最佳实践
1. 架构选择策略
python
def select_architecture_pattern(requirements: dict) -> str:
"""选择架构模式"""
if requirements['scale'] == 'small' and requirements['complexity'] == 'low':
return 'monolithic_architecture'
elif requirements['scale'] == 'large' and requirements['complexity'] == 'high':
return 'microservice_architecture'
elif requirements['maintainability'] == 'high':
return 'layered_architecture'
elif requirements['real_time'] == True:
return 'event_driven_architecture'
else:
return 'hybrid_architecture'2. 架构设计建议
python
class ArchitectureDesignAdvisor:
def __init__(self):
self.design_principles = {}
def get_design_recommendations(self, requirements: Dict[str, any]) -> List[str]:
"""获取设计建议"""
recommendations = []
# 基于需求提供建议
if requirements.get('performance', 0) > 0.8:
recommendations.append("使用缓存和CDN提升性能")
if requirements.get('scalability', 0) > 0.8:
recommendations.append("采用微服务架构支持水平扩展")
if requirements.get('maintainability', 0) > 0.8:
recommendations.append("使用分层架构提高可维护性")
if requirements.get('real_time', False):
recommendations.append("采用事件驱动架构支持实时处理")
return recommendations总结
RAG系统架构模式的选择对系统的性能、可扩展性和可维护性具有重要影响。本文介绍了单体架构、微服务架构、分层架构和事件驱动架构等主要模式,以及它们的特点、实现方式和适用场景。
关键要点:
- 架构选择:根据系统规模和复杂度选择合适的架构模式
- 服务拆分:合理拆分服务,确保单一职责
- 通信机制:选择合适的服务间通信方式
- 事件驱动:使用事件驱动架构提高系统的响应性和可扩展性
在下一篇文章中,我们将探讨微服务RAG架构,了解如何设计和实现微服务化的RAG系统。
下一步学习建议:
- 阅读《微服务RAG架构》,了解微服务化的RAG系统设计
- 实践不同的架构模式,比较它们的特点
- 关注RAG系统架构设计的最新发展和创新方案
