动态路由
动态路由是 Nekro Agent 插件系统提供的强大功能,允许插件创建自定义的 Web API 端点。基于 FastAPI 框架,动态路由支持完整的 RESTful API 设计、请求验证、响应处理、文档生成等现代 Web API 开发的所有特性。
功能概述
通过动态路由,插件可以:
- 创建 RESTful API:支持 GET、POST、PUT、DELETE 等 HTTP 方法
- 请求验证:使用 Pydantic 模型进行请求体、查询参数验证
- 响应处理:标准化的响应格式和错误处理
- API 文档:自动生成 OpenAPI/Swagger 文档
- 路径参数:动态路由参数和查询参数支持
- 中间件支持:请求预处理和响应后处理
基础用法
注册路由
使用 @plugin.mount_router()
装饰器注册路由创建函数:
python
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from nekro_agent.services.plugin.base import NekroPlugin
# 创建插件实例
plugin = NekroPlugin(
name="API 示例插件",
module_name="api_example",
description="展示动态路由功能",
version="1.0.0",
author="your_name"
)
@plugin.mount_router()
def create_router() -> APIRouter:
"""创建并配置插件路由"""
router = APIRouter()
@router.get("/")
async def index():
return {"message": "欢迎使用插件 API"}
return router
路由访问路径
插件路由的访问路径格式为:/plugins/{author}.{module_name}/{path}
例如,上述示例的路由可通过以下路径访问:
GET /plugins/your_name.api_example/
- 插件首页
数据模型定义
使用 Pydantic 模型
python
from pydantic import BaseModel, Field
from typing import Optional
class UserModel(BaseModel):
id: int
name: str = Field(..., min_length=1, max_length=50, description="用户名称")
email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$', description="邮箱地址")
age: Optional[int] = Field(None, ge=0, le=150, description="年龄")
class CreateUserRequest(BaseModel):
name: str = Field(..., min_length=1, max_length=50)
email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
age: Optional[int] = Field(None, ge=0, le=150)
class UpdateUserRequest(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=50)
email: Optional[str] = Field(None, regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
age: Optional[int] = Field(None, ge=0, le=150)
完整 CRUD 示例
python
from typing import Dict, List
from fastapi import APIRouter, HTTPException, Query, Path
# 模拟数据存储
users_db: Dict[int, UserModel] = {}
next_id = 1
@plugin.mount_router()
def create_router() -> APIRouter:
router = APIRouter()
@router.get("/", summary="API 首页")
async def api_home():
"""返回 API 基本信息"""
return {
"name": plugin.name,
"version": plugin.version,
"endpoints": [
"GET / - API 首页",
"GET /users - 获取用户列表",
"POST /users - 创建用户",
"GET /users/{user_id} - 获取用户详情",
"PUT /users/{user_id} - 更新用户",
"DELETE /users/{user_id} - 删除用户"
]
}
# 获取用户列表
@router.get("/users", response_model=List[UserModel], summary="获取用户列表")
async def get_users(
limit: int = Query(10, ge=1, le=100, description="返回数量限制"),
offset: int = Query(0, ge=0, description="偏移量"),
name_filter: Optional[str] = Query(None, description="按名称筛选")
):
"""获取用户列表,支持分页和筛选"""
users = list(users_db.values())
# 按名称筛选
if name_filter:
users = [u for u in users if name_filter.lower() in u.name.lower()]
# 分页
return users[offset:offset + limit]
# 创建用户
@router.post("/users", response_model=UserModel, summary="创建用户", status_code=201)
async def create_user(user_data: CreateUserRequest):
"""创建新用户"""
global next_id
# 检查邮箱是否已存在
for user in users_db.values():
if user.email == user_data.email:
raise HTTPException(status_code=400, detail="邮箱已存在")
new_user = UserModel(
id=next_id,
name=user_data.name,
email=user_data.email,
age=user_data.age
)
users_db[next_id] = new_user
next_id += 1
return new_user
# 获取用户详情
@router.get("/users/{user_id}", response_model=UserModel, summary="获取用户详情")
async def get_user(user_id: int = Path(..., ge=1, description="用户 ID")):
"""根据 ID 获取用户详情"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
return users_db[user_id]
# 更新用户
@router.put("/users/{user_id}", response_model=UserModel, summary="更新用户")
async def update_user(
user_id: int = Path(..., ge=1, description="用户 ID"),
user_data: UpdateUserRequest = ...
):
"""更新用户信息"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
user = users_db[user_id]
# 检查邮箱冲突
if user_data.email and user_data.email != user.email:
for other_user in users_db.values():
if other_user.email == user_data.email and other_user.id != user_id:
raise HTTPException(status_code=400, detail="邮箱已被其他用户使用")
# 更新字段
if user_data.name is not None:
user.name = user_data.name
if user_data.email is not None:
user.email = user_data.email
if user_data.age is not None:
user.age = user_data.age
return user
# 删除用户
@router.delete("/users/{user_id}", summary="删除用户")
async def delete_user(user_id: int = Path(..., ge=1, description="用户 ID")):
"""删除用户"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
deleted_user = users_db.pop(user_id)
return {"message": f"用户 '{deleted_user.name}' 已删除", "deleted_user": deleted_user}
return router
高级功能
中间件支持
python
from fastapi import Request, Response
import time
@plugin.mount_router()
def create_router() -> APIRouter:
router = APIRouter()
@router.middleware("http")
async def add_process_time_header(request: Request, call_next):
"""添加请求处理时间头"""
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
# 其他路由定义...
return router
依赖注入
python
from fastapi import Depends
def get_current_user(user_id: int = Query(...)) -> UserModel:
"""获取当前用户(示例依赖)"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
return users_db[user_id]
@router.get("/profile", response_model=UserModel)
async def get_profile(current_user: UserModel = Depends(get_current_user)):
"""获取当前用户资料"""
return current_user
文件上传支持
python
from fastapi import File, UploadFile
@router.post("/upload")
async def upload_file(file: UploadFile = File(...)):
"""文件上传示例"""
if not file.filename:
raise HTTPException(status_code=400, detail="未选择文件")
# 保存文件
content = await file.read()
return {
"filename": file.filename,
"content_type": file.content_type,
"size": len(content)
}
响应模型和状态码
python
from fastapi import status
class ErrorResponse(BaseModel):
error: str
detail: str
@router.post(
"/users",
response_model=UserModel,
status_code=status.HTTP_201_CREATED,
responses={
400: {"model": ErrorResponse, "description": "请求参数错误"},
409: {"model": ErrorResponse, "description": "用户已存在"}
}
)
async def create_user_with_responses(user_data: CreateUserRequest):
"""带有详细响应定义的创建用户接口"""
# 实现逻辑...
pass
与插件功能集成
发送消息到聊天频道
python
from nekro_agent.api import message
from nekro_agent.api.schemas import AgentCtx
@router.post("/notify/{chat_key}")
async def send_notification(
chat_key: str,
notification: Dict[str, str]
):
"""向指定聊天频道发送通知"""
try:
# 创建上下文对象
ctx = await AgentCtx.create_by_chat_key(chat_key)
await message.send_text(
chat_key=chat_key,
text=notification.get("message", ""),
ctx=ctx
)
return {"status": "success", "message": "通知已发送"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"发送失败: {str(e)}")
动态路由提供了更完整、更标准的 Web API 开发体验,是构建插件外部接口的推荐方式。