Skip to content

动态路由

动态路由是 Nekro Agent 插件系统提供的强大功能,允许插件创建自定义的 Web API 端点。基于 FastAPI 框架,动态路由支持完整的 RESTful API 设计、请求验证、响应处理、文档生成等现代 Web API 开发的所有特性。

功能概述

通过动态路由,插件可以:

  • 创建 RESTful API:支持 GET、POST、PUT、DELETE 等 HTTP 方法
  • 请求验证:使用 Pydantic 模型进行请求体、查询参数验证
  • 响应处理:标准化的响应格式和错误处理
  • API 文档:自动生成 OpenAPI/Swagger 文档
  • 路径参数:动态路由参数和查询参数支持
  • 中间件支持:请求预处理和响应后处理

基础用法

注册路由

使用 @plugin.mount_router() 装饰器注册路由创建函数:

python
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from nekro_agent.services.plugin.base import NekroPlugin

# 创建插件实例
plugin = NekroPlugin(
    name="API 示例插件",
    module_name="api_example",
    description="展示动态路由功能",
    version="1.0.0",
    author="your_name"
)

@plugin.mount_router()
def create_router() -> APIRouter:
    """创建并配置插件路由"""
    router = APIRouter()

    @router.get("/")
    async def index():
        return {"message": "欢迎使用插件 API"}

    return router

路由访问路径

插件路由的访问路径格式为:/plugins/{author}.{module_name}/{path}

例如,上述示例的路由可通过以下路径访问:

  • GET /plugins/your_name.api_example/ - 插件首页

数据模型定义

使用 Pydantic 模型

python
from pydantic import BaseModel, Field
from typing import Optional

class UserModel(BaseModel):
    id: int
    name: str = Field(..., min_length=1, max_length=50, description="用户名称")
    email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$', description="邮箱地址")
    age: Optional[int] = Field(None, ge=0, le=150, description="年龄")

class CreateUserRequest(BaseModel):
    name: str = Field(..., min_length=1, max_length=50)
    email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
    age: Optional[int] = Field(None, ge=0, le=150)

class UpdateUserRequest(BaseModel):
    name: Optional[str] = Field(None, min_length=1, max_length=50)
    email: Optional[str] = Field(None, regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
    age: Optional[int] = Field(None, ge=0, le=150)

完整 CRUD 示例

python
from typing import Dict, List
from fastapi import APIRouter, HTTPException, Query, Path

# 模拟数据存储
users_db: Dict[int, UserModel] = {}
next_id = 1

@plugin.mount_router()
def create_router() -> APIRouter:
    router = APIRouter()

    @router.get("/", summary="API 首页")
    async def api_home():
        """返回 API 基本信息"""
        return {
            "name": plugin.name,
            "version": plugin.version,
            "endpoints": [
                "GET / - API 首页",
                "GET /users - 获取用户列表",
                "POST /users - 创建用户",
                "GET /users/{user_id} - 获取用户详情",
                "PUT /users/{user_id} - 更新用户",
                "DELETE /users/{user_id} - 删除用户"
            ]
        }

    # 获取用户列表
    @router.get("/users", response_model=List[UserModel], summary="获取用户列表")
    async def get_users(
        limit: int = Query(10, ge=1, le=100, description="返回数量限制"),
        offset: int = Query(0, ge=0, description="偏移量"),
        name_filter: Optional[str] = Query(None, description="按名称筛选")
    ):
        """获取用户列表,支持分页和筛选"""
        users = list(users_db.values())

        # 按名称筛选
        if name_filter:
            users = [u for u in users if name_filter.lower() in u.name.lower()]

        # 分页
        return users[offset:offset + limit]

    # 创建用户
    @router.post("/users", response_model=UserModel, summary="创建用户", status_code=201)
    async def create_user(user_data: CreateUserRequest):
        """创建新用户"""
        global next_id

        # 检查邮箱是否已存在
        for user in users_db.values():
            if user.email == user_data.email:
                raise HTTPException(status_code=400, detail="邮箱已存在")

        new_user = UserModel(
            id=next_id,
            name=user_data.name,
            email=user_data.email,
            age=user_data.age
        )

        users_db[next_id] = new_user
        next_id += 1

        return new_user

    # 获取用户详情
    @router.get("/users/{user_id}", response_model=UserModel, summary="获取用户详情")
    async def get_user(user_id: int = Path(..., ge=1, description="用户 ID")):
        """根据 ID 获取用户详情"""
        if user_id not in users_db:
            raise HTTPException(status_code=404, detail="用户不存在")

        return users_db[user_id]

    # 更新用户
    @router.put("/users/{user_id}", response_model=UserModel, summary="更新用户")
    async def update_user(
        user_id: int = Path(..., ge=1, description="用户 ID"),
        user_data: UpdateUserRequest = ...
    ):
        """更新用户信息"""
        if user_id not in users_db:
            raise HTTPException(status_code=404, detail="用户不存在")

        user = users_db[user_id]

        # 检查邮箱冲突
        if user_data.email and user_data.email != user.email:
            for other_user in users_db.values():
                if other_user.email == user_data.email and other_user.id != user_id:
                    raise HTTPException(status_code=400, detail="邮箱已被其他用户使用")

        # 更新字段
        if user_data.name is not None:
            user.name = user_data.name
        if user_data.email is not None:
            user.email = user_data.email
        if user_data.age is not None:
            user.age = user_data.age

        return user

    # 删除用户
    @router.delete("/users/{user_id}", summary="删除用户")
    async def delete_user(user_id: int = Path(..., ge=1, description="用户 ID")):
        """删除用户"""
        if user_id not in users_db:
            raise HTTPException(status_code=404, detail="用户不存在")

        deleted_user = users_db.pop(user_id)
        return {"message": f"用户 '{deleted_user.name}' 已删除", "deleted_user": deleted_user}

    return router

高级功能

中间件支持

python
from fastapi import Request, Response
import time

@plugin.mount_router()
def create_router() -> APIRouter:
    router = APIRouter()

    @router.middleware("http")
    async def add_process_time_header(request: Request, call_next):
        """添加请求处理时间头"""
        start_time = time.time()
        response = await call_next(request)
        process_time = time.time() - start_time
        response.headers["X-Process-Time"] = str(process_time)
        return response

    # 其他路由定义...

    return router

依赖注入

python
from fastapi import Depends

def get_current_user(user_id: int = Query(...)) -> UserModel:
    """获取当前用户(示例依赖)"""
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="用户不存在")
    return users_db[user_id]

@router.get("/profile", response_model=UserModel)
async def get_profile(current_user: UserModel = Depends(get_current_user)):
    """获取当前用户资料"""
    return current_user

文件上传支持

python
from fastapi import File, UploadFile

@router.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    """文件上传示例"""
    if not file.filename:
        raise HTTPException(status_code=400, detail="未选择文件")

    # 保存文件
    content = await file.read()

    return {
        "filename": file.filename,
        "content_type": file.content_type,
        "size": len(content)
    }

响应模型和状态码

python
from fastapi import status

class ErrorResponse(BaseModel):
    error: str
    detail: str

@router.post(
    "/users",
    response_model=UserModel,
    status_code=status.HTTP_201_CREATED,
    responses={
        400: {"model": ErrorResponse, "description": "请求参数错误"},
        409: {"model": ErrorResponse, "description": "用户已存在"}
    }
)
async def create_user_with_responses(user_data: CreateUserRequest):
    """带有详细响应定义的创建用户接口"""
    # 实现逻辑...
    pass

与插件功能集成

发送消息到聊天频道

python
from nekro_agent.api import message
from nekro_agent.api.schemas import AgentCtx

@router.post("/notify/{chat_key}")
async def send_notification(
    chat_key: str,
    notification: Dict[str, str]
):
    """向指定聊天频道发送通知"""
    try:
        # 创建上下文对象
        ctx = await AgentCtx.create_by_chat_key(chat_key)

        await message.send_text(
            chat_key=chat_key,
            text=notification.get("message", ""),
            ctx=ctx
        )

        return {"status": "success", "message": "通知已发送"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"发送失败: {str(e)}")

动态路由提供了更完整、更标准的 Web API 开发体验,是构建插件外部接口的推荐方式。