#!/usr/bin/env python3 """ OpenClaw HTTP → WebSocket 代理 把 OpenAI 格式的 HTTP 请求转成 WebSocket 发送给 Gateway """ import asyncio import json import os import websockets import aiohttp from aiohttp import web from datetime import datetime import uuid # ============= 配置 ============= GATEWAY_URL = "ws://localhost:18789" API_PORT = 8081 # 换个端口,避免和现有 API 冲突 API_KEY = os.getenv("OPENCLAW_API_KEY", "your-api-key-change-me") # ============= 全局变量 ============= gateway_connection = None async def connect_gateway(): """连接 Gateway WebSocket""" global gateway_connection try: gateway_connection = await websockets.connect(GATEWAY_URL) print(f"✅ 已连接到 Gateway: {GATEWAY_URL}") return True except Exception as e: print(f"❌ 连接 Gateway 失败: {e}") return False async def send_to_gateway(message: str, session_id: str = None): """发送消息到 Gateway,返回 Agent 回复""" global gateway_connection if gateway_connection is None or gateway_connection.closed: if not await connect_gateway(): return {"error": "Gateway 未连接"} # 构建 agent turn 请求 request_id = str(uuid.uuid4()) payload = { "jsonrpc": "2.0", "method": "agent.turn", "params": { "message": message, "sessionId": session_id, "model": "default" }, "id": request_id } try: # 发送请求 await gateway_connection.send(json.dumps(payload)) # 等待回复 response = await asyncio.wait_for(gateway_connection.recv(), timeout=60) data = json.loads(response) # 提取回复内容 if "result" in data: return data["result"] elif "error" in data: return {"error": data["error"]} else: return data except asyncio.TimeoutError: return {"error": "Gateway 超时"} except Exception as e: return {"error": str(e)} async def handle_chat_completions(request): """处理 OpenAI Chat Completions 格式请求""" try: data = await request.json() # 提取用户消息 user_message = "" messages = data.get("messages", []) for msg in messages: if msg.get("role") == "user": user_message = msg.get("content", "") break if not user_message: return web.json_response( {"error": {"message": "No user message", "type": "invalid_request_error"}}, status=400 ) # 发送到 Gateway result = await send_to_gateway(user_message) if "error" in result: return web.json_response({"error": result["error"]}, status=500) # 构造 OpenAI 格式回复 reply_content = result.get("text", result.get("content", str(result))) response = { "id": f"chatcmpl-{uuid.uuid4().hex[:24]}", "object": "chat.completion", "created": int(datetime.now().timestamp()), "model": data.get("model", "openclaw"), "choices": [{ "index": 0, "message": { "role": "assistant", "content": reply_content }, "finish_reason": "stop" }], "usage": { "prompt_tokens": len(user_message.split()), "completion_tokens": len(reply_content.split()), "total_tokens": len(user_message.split()) + len(reply_content.split()) } } return web.json_response(response) except Exception as e: return web.json_response({"error": str(e)}, status=500) async def handle_health(request): """健康检查""" status = "healthy" if gateway_connection and not gateway_connection.closed else "unhealthy" return web.json_response({ "status": status, "gateway": "connected" if status == "healthy" else "disconnected", "timestamp": datetime.now().isoformat() }) async def handle_models(request): """返回模型列表""" return web.json_response({ "object": "list", "data": [{ "id": "openclaw", "object": "model", "created": int(datetime.now().timestamp()), "owned_by": "openclaw" }] }) async def handle_root(request): """API 信息""" return web.json_response({ "name": "OpenClaw HTTP Proxy", "version": "1.0.0", "gateway": GATEWAY_URL, "docs": "OpenAI Compatible API" }) # ============= HTTP 服务器 ============= async def init_app(): app = web.Application() # 路由 app.router.add_get('/', handle_root) app.router.add_get('/health', handle_health) app.router.add_get('/v1/models', handle_models) app.router.add_post('/v1/chat/completions', handle_chat_completions) return app # ============= 启动 ============= async def main(): # 先连接 Gateway print(f"🔌 正在连接 Gateway: {GATEWAY_URL}...") await connect_gateway() # 启动 HTTP 服务 app = await init_app() print(f"\n🚀 OpenClaw HTTP Proxy 启动成功!") print(f" API 地址: http://localhost:{API_PORT}/v1") print(f" Gateway: {GATEWAY_URL}") print(f" API Key: {API_KEY}") print(f"\n📡 端点:") print(f" GET /health - 健康检查") print(f" GET /v1/models - 模型列表") print(f" POST /v1/chat/completions - 对话") runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', API_PORT) await site.start() print(f"\n✅ 服务运行中: http://0.0.0.0:{API_PORT}") print(f"\n💡 ChatBox 配置:") print(f" API 地址: http://43.163.195.176:{API_PORT}/v1") print(f" API Key: {API_KEY}") # 保持运行 try: while True: await asyncio.sleep(3600) except KeyboardInterrupt: print("\n🛑 正在关闭...") finally: await runner.cleanup() if __name__ == '__main__': # 检查 websockets 库 try: import websockets except ImportError: print("❌ 需要安装 websockets 库:") print(" pip3 install websockets aiohttp") print("\n💡 或者使用纯 Python 版本(功能简化)") exit(1) asyncio.run(main())