Skip to content

Webhook 接入点

Webhook 允许你的 Nekro Agent 插件接收来自外部系统(如 GitHub, GitLab, CI/CD 工具, IoT 设备, 自定义服务等)的实时 HTTP 推送通知。这使得插件能够响应外部事件,例如代码提交、监控告警、数据更新等,并触发 Agent 内的相应操作,如发送消息、更新数据或调用其他插件功能。

Webhook 基础

当外部系统发生特定事件时,它会向预先配置好的 URL(即插件提供的 Webhook 接入点)发送一个 HTTP 请求(通常是 POST 请求,但也支持 GET 等其他方法)。插件的 Webhook 处理器接收这个请求,解析其内容(通常是 JSON 或表单数据),然后执行相应的逻辑。

注册 Webhook

插件通过 @plugin.mount_webhook_method() 装饰器注册一个异步函数作为 Webhook 处理器。这个装饰器会将该函数与一个特定的 HTTP 路径关联起来。

python
from fastapi import Request, HTTPException # FastAPI 用于处理 HTTP 请求
from nekro_agent.api import core, message, context # Agent API
import hmac
import hashlib
import json

# 假设 plugin 实例已定义

@plugin.mount_webhook_method(
    endpoint="/github", # 相对于插件 Webhook 根路径的端点
    name="GitHub Webhook Handler",
    description="接收并处理来自 GitHub 的 Webhook 事件。"
)
async def handle_github_webhook(request: Request):
    """处理 GitHub Webhook 请求。
    
    验证签名,解析 payload,并根据事件类型采取行动。
    """
    core.logger.info(f"插件 '{plugin.name}' 收到 GitHub Webhook 请求。")

    # 1. 安全验证 (推荐)
    github_signature = request.headers.get("X-Hub-Signature-256")
    if not github_signature:
        core.logger.warning("Webhook 请求缺少 X-Hub-Signature-256 头部。")
        raise HTTPException(status_code=400, detail="Missing signature")

    # 从插件配置中获取 Webhook 秘密 (secret)
    webhook_secret = plugin.config.GITHUB_WEBHOOK_SECRET # 假设配置项存在
    if not webhook_secret:
        core.logger.error("GitHub Webhook secret 未在插件配置中设置!")
        raise HTTPException(status_code=500, detail="Webhook secret not configured")

    request_body = await request.body()
    if not verify_github_signature(request_body, webhook_secret, github_signature):
        core.logger.warning("Webhook 签名验证失败!")
        raise HTTPException(status_code=403, detail="Invalid signature")

    # 2. 解析 Payload
    try:
        payload = await request.json() # 或者 request.form() 取决于外部系统发送的 Content-Type
    except json.JSONDecodeError:
        core.logger.warning("Webhook 请求体不是有效的 JSON。")
        raise HTTPException(status_code=400, detail="Invalid JSON payload")

    # 3. 处理事件
    event_type = request.headers.get("X-GitHub-Event")
    core.logger.info(f"已验证的 GitHub 事件: {event_type}")

    if event_type == "push":
        repo_name = payload.get("repository", {}).get("full_name", "未知仓库")
        pusher_name = payload.get("pusher", {}).get("name", "未知推送者")
        commits = payload.get("commits", [])
        commit_count = len(commits)
        
        notification_message = (
            f"📦 代码推送事件\n"
            f"仓库: {repo_name}\n"
            f"推送者: {pusher_name}\n"
            f"提交数: {commit_count}\n"
        )
        if commits:
            latest_commit_message = commits[0].get("message", "无提交信息")
            notification_message += f"最新提交: {latest_commit_message.splitlines()[0]}"
        
        # 假设插件配置了通知的目标会话
        target_chat_key = plugin.config.NOTIFICATION_CHAT_KEY
        if target_chat_key:
            _ctx = await context.create_temp_ctx(target_chat_key) # 创建临时上下文用于发消息
            await message.send_text(target_chat_key, notification_message, _ctx)
            core.logger.info(f"已向 {target_chat_key} 发送 GitHub 推送通知。")
        else:
            core.logger.warning("未配置 GitHub 通知的目标会话 (NOTIFICATION_CHAT_KEY)。")

    elif event_type == "issues":
        # 处理 issues 事件...
        pass
    
    # 4. 返回响应
    # 通常,Webhook 处理器应快速返回 2xx 响应,表示已收到事件。
    # 耗时操作应异步处理。
    return {"status": "success", "message": f"Event '{event_type}' received and acknowledged."}


def verify_github_signature(payload_body: bytes, secret: str, signature_header: str) -> bool:
    """验证 GitHub Webhook 签名"""
    if not signature_header.startswith("sha256="):
        return False
    expected_signature = signature_header[7:] # 移除 "sha256=" 前缀
    
    hashed_payload = hmac.new(secret.encode('utf-8'), payload_body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(hashed_payload, expected_signature)

装饰器参数:

  • endpoint (str): Webhook 的路径,相对于插件的 Webhook 根 URL。例如,如果插件名为 my_plugin,Agent 运行在 http://localhost:8000,此处的 endpoint"/github",则完整的 Webhook URL 将是 http://localhost:8000/api/plugin/my_plugin/webhook/github
  • name (str, 可选): Webhook 的可读名称,用于日志或管理界面。
  • description (str, 可选): Webhook 功能的简短描述。

Webhook 处理器 (async def 函数):

  • 必须是异步函数。
  • 接收一个 fastapi.Request 对象作为参数,可以从中获取请求头、请求体、查询参数等。
  • 通常需要进行安全验证(如检查签名、来源 IP 等)。
  • 解析请求体(如 JSON、表单数据)。
  • 根据解析的数据执行业务逻辑。
  • 快速响应:Webhook 处理器应尽快返回 HTTP 响应(通常是 200 OK 或其他 2xx 状态码),以告知外部系统事件已成功接收。任何耗时的操作都应该在后台异步执行,避免阻塞 Webhook 响应导致外部系统超时重试。
  • 可以使用 fastapi.HTTPException 来返回标准的 HTTP 错误响应。

Webhook URL 结构

插件 Webhook 的完整 URL 结构通常如下:

{BASE_URL}/api/plugin/{plugin_author}.{plugin_module_name}/webhook{endpoint}

  • {BASE_URL}: Nekro Agent 的基础 URL (例如 http://localhost:8080)。
  • {plugin_author}.{plugin_module_name}: 插件的唯一键,由作者和模块名组成。
  • {endpoint}: 在 @plugin.mount_webhook_method() 中定义的 endpoint 参数,必须以 / 开头。

例如,如果插件作者是 dev,模块名是 sample_webhookendpoint/myevent,则 URL 可能是: http://localhost:8080/api/plugin/dev.sample_webhook/webhook/myevent

建议在插件的 README.md 或配置说明中清晰地列出其提供的所有 Webhook URL。

安全验证

由于 Webhook 接入点暴露在公网上,因此实施安全验证至关重要,以确保请求来自可信的源并防止恶意攻击。

常见的验证方法:

  1. 签名验证 (推荐)
    • 外部系统使用一个共享的秘密 (Secret) 对请求体 (Payload) 进行哈希签名 (如 HMAC-SHA256)。
    • 签名结果通过特定的请求头 (如 X-Hub-Signature-256 for GitHub, X-Gitlab-Token for GitLab) 发送。
    • 插件端使用相同的秘密和算法重新计算签名,并与请求头中的签名进行比较。
    • 如上例 verify_github_signature 函数所示。
    • 秘密应存储在插件的配置中 (标记为 is_secret),绝不能硬编码在代码里。
  2. IP 地址白名单
    • 如果外部系统的出口 IP 地址固定或在已知范围内,可以在插件或网关层面配置 IP 白名单。
    • 但 IP 地址可能变化,维护成本较高。
  3. 认证令牌
    • 要求外部系统在请求头或查询参数中携带一个预共享的认证令牌。
    • 相对简单,但安全性不如签名验证,因为令牌可能泄露。

务必实施至少一种有效的安全验证机制。

处理不同 HTTP 方法

默认情况下,Webhook 处理器可以响应所有 HTTP 方法。如果需要针对特定方法(如 GET, PUT, DELETE)进行处理,可以在处理器内部检查 request.method

python
@plugin.mount_webhook_method(endpoint="/resource", name="Resource Handler")
async def handle_resource(request: Request):
    if request.method == "GET":
        # 处理 GET 请求,例如返回资源状态
        return {"status": "available", "data": await get_resource_data()}
    elif request.method == "POST":
        # 处理 POST 请求,例如创建新资源
        payload = await request.json()
        resource_id = await create_resource(payload)
        return {"status": "created", "id": resource_id}, 201 # 返回 201 Created
    elif request.method == "DELETE":
        # 处理 DELETE 请求,例如删除资源
        # ...
        return {"status": "deleted"}
    else:
        raise HTTPException(status_code=405, detail="Method Not Allowed")

从 Webhook 向会话发送通知

Webhook 的一个常见用途是将外部事件通知给 Nekro Agent 内的特定用户或会话。

python
from nekro_agent.api import message, context

async def notify_chat(chat_key: str, notification_text: str):
    if not chat_key:
        core.logger.warning("无法发送 Webhook 通知:目标 chat_key 为空。")
        return
    
    try:
        # 为发送消息创建一个临时的 AgentCtx
        # 注意:这个临时上下文可能不包含完整的用户或会话信息,仅用于消息发送
        _ctx = await context.create_temp_ctx(chat_key=chat_key)
        
        await message.send_text(chat_key, notification_text, _ctx)
        core.logger.info(f"Webhook 通知已发送至 {chat_key}。")
    except Exception as e:
        core.logger.error(f"通过 Webhook 发送通知至 {chat_key} 失败: {e}")

# 在你的 Webhook 处理器中调用:
# target_chat = plugin.config.TARGET_CHAT_FOR_ALERTS
# await notify_chat(target_chat, "紧急告警:服务器 CPU 使用率超过 90%!")

目标会话 chat_key 通常应从插件的配置中获取,允许用户指定接收通知的群组或私聊。

处理文件上传

Webhook 也可以用于接收文件上传。FastAPI 提供了处理 multipart/form-data 请求的能力。

python
from fastapi import File, UploadFile, Form

@plugin.mount_webhook_method(endpoint="/upload_report", name="Upload Report")
async def handle_report_upload(
    request: Request, 
    report_file: UploadFile = File(...), # "report_file" 是表单字段名
    report_name: str = Form(...),      # "report_name" 也是表单字段名
    target_chat: str = Form(...)
):
    core.logger.info(f"收到文件上传: {report_file.filename} (类型: {report_file.content_type}),报告名称: {report_name}")
    
    # 验证文件类型、大小等 (推荐)
    if not report_file.content_type.startswith("text/"):
        raise HTTPException(status_code=400, detail="仅支持文本文件上传。")

    # 读取文件内容
    contents = await report_file.read() # bytes
    report_text = contents.decode('utf-8')

    # 处理文件内容,例如保存到插件目录或发送给用户
    # plugin_data_path = plugin.get_plugin_path() / "uploads"
    # plugin_data_path.mkdir(exist_ok=True)
    # async with aiofiles.open(plugin_data_path / report_file.filename, "wb") as f:
    #     await f.write(contents)

    await notify_chat(target_chat, f"收到新的报告 '{report_name}' ({report_file.filename})。内容摘要:\n{report_text[:200]}...")
    
    return {"status": "success", "filename": report_file.filename, "size": len(contents)}

异步任务处理 (重要)

虽然 FastAPI 会异步执行(await)你的 async def Webhook 处理器,但这仅表示处理器本身不会阻塞服务器的其他操作。HTTP 客户端仍然会等待处理器完成其所有内部操作(包括 await 的部分)后才能收到响应。 因此,如果 Webhook 处理器内部包含不应延迟对外部系统响应的耗时操作(例如,调用多个外部 API、处理大型数据、复杂的计算任务),则强烈建议将这些特定操作放到后台任务中执行。这能确保插件快速向外部系统确认收到 Webhook(例如返回 202 Accepted),而实际的处理在后台进行。

Python 的 asyncio.create_task() 或类似机制(如 Celery, RQ 等,如果项目集成)可用于此目的。

python
import asyncio

async def long_running_process(data: dict, chat_to_notify: str):
    core.logger.info(f"开始处理耗时任务: {data.get('task_id')}")
    await asyncio.sleep(10) # 模拟耗时操作
    result_message = f"任务 {data.get('task_id')} 处理完成。结果是:XXXX"
    await notify_chat(chat_to_notify, result_message)
    core.logger.info(f"耗时任务 {data.get('task_id')} 完成并已通知。")

@plugin.mount_webhook_method(endpoint="/start_job", name="Start Long Job")
async def start_job_webhook(request: Request):
    payload = await request.json()
    task_id = payload.get("task_id", "unknown_task")
    chat_key = payload.get("notify_chat_key") # 假设 payload 包含通知目标

    if not chat_key:
        return {"status": "error", "message": "notify_chat_key 未提供"}, 400

    # 创建并启动后台任务,不等待其完成
    asyncio.create_task(long_running_process({"task_id": task_id, **payload}, chat_key))
    
    core.logger.info(f"已为任务 {task_id} 创建后台处理程序。")
    return {"status": "accepted", "task_id": task_id, "message": "作业已接收并正在后台处理。"}, 202 # 返回 202 Accepted

通过合理使用 Webhook,你的插件可以极大地扩展 Nekro Agent 与外部世界的连接和فاعل能力。