Files
server-configs/openclaw_api.py
2026-02-13 22:24:27 +08:00

280 lines
8.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
OpenClaw API Server - 暴露所有功能供外部调用
基于 OpenAPI v3 标准
"""
import os
import sys
import json
import subprocess
from datetime import datetime
from typing import Optional, Dict, Any
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.security import APIKeyHeader
from pydantic import BaseModel, Field
import uvicorn
# ============= 配置 =============
API_KEY = os.getenv("OPENCLAW_API_KEY", "your-api-key-change-me")
app = FastAPI(
title="OpenClaw API",
description="OpenClaw AI助手 API 接口",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
api_key_header = APIKeyHeader(name="X-API-Key")
# ============= 认证 =============
async def verify_api_key(x_api_key: str = Header(..., description="API Key")):
if x_api_key != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API Key")
return x_api_key
# ============= 请求/响应模型 =============
class MessageSendRequest(BaseModel):
message: str = Field(..., description="消息内容")
channel: Optional[str] = Field("whatsapp", description="频道")
class MemorySearchRequest(BaseModel):
query: str = Field(..., description="搜索关键词")
max_results: Optional[int] = Field(10, description="最大结果数")
class MemoryWriteRequest(BaseModel):
content: str = Field(..., description="要写入的内容")
file_path: str = Field(..., description="文件路径")
class ExecRequest(BaseModel):
command: str = Field(..., description="Shell 命令")
timeout: Optional[int] = Field(30, description="超时秒数")
class StockQueryRequest(BaseModel):
symbol: str = Field(..., description="股票代码")
class WebSearchRequest(BaseModel):
query: str = Field(..., description="搜索关键词")
count: Optional[int] = Field(5, description="结果数量")
class GitCommitRequest(BaseModel):
message: str = Field(..., description="提交信息")
files: list = Field(..., description="要提交的文件列表")
# ============= 工具函数 =============
def read_memory_file(path: str) -> str:
try:
with open(path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
return f"Error: {e}"
def write_memory_file(path: str, content: str) -> dict:
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
return {"status": "success", "path": path}
except Exception as e:
return {"status": "error", "message": str(e)}
def run_shell(command: str, timeout: int = 30) -> dict:
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=timeout
)
return {
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr
}
except Exception as e:
return {"returncode": -1, "error": str(e)}
# ============= API 端点 =============
@app.get("/", tags=["Info"])
async def root():
return {
"name": "OpenClaw API",
"version": "1.0.0",
"docs": "/docs"
}
@app.get("/health", tags=["System"])
async def health():
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
# ============ 消息功能 ============
@app.post("/api/v1/messages/send", tags=["Messages"])
async def send_message(request: MessageSendRequest, api_key: str = Depends(verify_api_key)):
"""
发送消息到指定频道
"""
return {
"status": "todo",
"message": "发送消息功能待实现",
"request": request.dict()
}
# ============ 记忆功能 ============
@app.get("/api/v1/memory/search", tags=["Memory"])
async def search_memory(
q: str,
max_results: int = 10,
api_key: str = Depends(verify_api_key)
):
"""
搜索记忆文件
"""
results = []
search_path = "/root/.openclaw/workspace"
try:
for root, dirs, files in os.walk(search_path):
for f in files:
if f.endswith('.md'):
fp = os.path.join(root, f)
try:
with open(fp, 'r', encoding='utf-8') as file:
content = file.read()
if q.lower() in content.lower():
results.append({
"file": fp,
"preview": content[:500]
})
except:
continue
except Exception as e:
return {"error": str(e)}
return {"query": q, "results": results[:max_results]}
@app.get("/api/v1/memory/{path:path}", tags=["Memory"])
async def read_memory(
path: str,
api_key: str = Depends(verify_api_key)
):
"""
读取记忆文件
"""
file_path = f"/root/.openclaw/workspace/{path}"
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
return {"path": path, "content": read_memory_file(file_path)}
@app.put("/api/v1/memory/{path:path}", tags=["Memory"])
async def write_memory(
path: str,
request: MemoryWriteRequest,
api_key: str = Depends(verify_api_key)
):
"""
写入记忆文件
"""
file_path = f"/root/.openclaw/workspace/{path}"
result = write_memory_file(file_path, request.content)
return result
# ============ 执行功能 ============
@app.post("/api/v1/exec", tags=["Execution"])
async def exec_command(request: ExecRequest, api_key: str = Depends(verify_api_key)):
"""
执行 Shell 命令
"""
result = run_shell(request.command, request.timeout)
return result
# ============ Web 功能 ============
@app.post("/api/v1/web/search", tags=["Web"])
async def web_search(request: WebSearchRequest, api_key: str = Depends(verify_api_key)):
"""
Web 搜索(返回搜索结果,不实际调用)
"""
return {
"status": "placeholder",
"message": "Web 搜索需要通过 agent 调用",
"request": request.dict()
}
# ============ Git 功能 ============
@app.post("/api/v1/git/commit", tags=["Git"])
async def git_commit(request: GitCommitRequest, api_key: str = Depends(verify_api_key)):
"""
Git 提交
"""
workspace = "/root/.openclaw/workspace/openclaw-memory"
results = []
for f in request.files:
fp = os.path.join(workspace, f)
if os.path.exists(fp):
subprocess.run(f"cp {fp} {workspace}/", shell=True)
subprocess.run(f"cd {workspace} && git add {f}", shell=True)
results.append(f)
subprocess.run(f"cd {workspace} && git commit -m '{request.message}'", shell=True)
subprocess.run(f"cd {workspace} && git push origin main", shell=True)
return {
"status": "committed",
"files": results,
"message": request.message
}
# ============ 股票功能 ============
@app.get("/api/v1/stock/{symbol}", tags=["Stocks"])
async def get_stock(symbol: str, api_key: str = Depends(verify_api_key)):
"""
查询股票信息(返回占位符)
"""
return {
"symbol": symbol,
"status": "placeholder",
"message": "需要调用 agent 执行股票查询"
}
# ============ 文件功能 ============
@app.get("/api/v1/files/list", tags=["Files"])
async def list_files(
path: str = "/root/.openclaw/workspace",
api_key: str = Depends(verify_api_key)
):
"""
列出目录文件
"""
try:
items = []
for item in os.listdir(path):
item_path = os.path.join(path, item)
items.append({
"name": item,
"type": "directory" if os.path.isdir(item_path) else "file",
"path": item_path
})
return {"path": path, "items": items}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# ============ Cron 功能 ============
@app.get("/api/v1/cron/list", tags=["Cron"])
async def list_crons(api_key: str = Depends(verify_api_key)):
"""
列出定时任务(返回占位符)
"""
return {
"status": "placeholder",
"message": "需要通过 agent 管理 cron"
}
# ============ 启动 ============
if __name__ == "__main__":
print(f"Starting OpenClaw API Server...")
print(f"API Key: {API_KEY}")
print(f"Docs: http://localhost:8000/docs")
uvicorn.run(app, host="0.0.0.0", port=8000)