AI Agent的代码沙盒
虽然远程编码IDE已经存在多年,但一种新的远程代码执行环境正在兴起。例如,当你要求像ChatGPT这样的大型语言模型(LLM)执行一些LLMs通常不擅长的数值计算,比如计算时间间隔内的天数、创建图表或更复杂的事情时,它通常会编写一个Python脚本并运行它以生成结果。这些代码在哪里运行?在沙盒中!
沙盒是一个隔离环境,用于运行不受信任的代码,并具有严格定义的安全边界。它包含工具(如Python解释器)和有用的模块,可以在运行用户代码时使用。每次执行的隔离确保来自一个用户或请求的不受信任代码不会干扰另一个。
1、代码沙盒的工作原理
当LLM生成代码时,后台会创建一个沙盒,代码上传、执行并返回其结果:
根据用例的不同,沙盒可以在不同的代码执行调用之间持续存在,你可能希望为每个用户保留一个持久的沙盒。这对于需要访问用户上传文件的AI代理执行代码很有用。
一如既往,魔鬼在细节中……
我们当然可以将代码传递给Python并使用eval()
执行代码,但现在已经不是90年代了。虽然代码是由LLM生成的,但这并不能保证它是安全的。我们有几种方法可以实现安全的沙盒:
- 容器: 标准Linux容器(LXC)在性能上与在沙盒外运行代码没有明显差异。这就是Docker默认使用的。
- 用户模式内核: 用户模式内核进程拦截并服务于Linux系统调用,将应用程序与主机内核隔离。
- 虚拟机: 轻量级虚拟化管理程序通过硬件虚拟化提供隔离,相比容器会有轻微的性能损失。
我还应该提到WebAssembly,这是一种虚拟栈机,以及JVM等其他技术。它只支持运行专门为它编译的代码。是的,Python和Node.js可以编译成Wasm,但它们经常依赖于可能不兼容的本地模块。
我们的选择最终归结为这些具体实现:
- Linux容器
- Docker(使用默认LXC后端)
- 用户模式内核
- Docker(使用gVisor后端)
- 虚拟机
- Firecracker
- Kata Containers
- Cloud Hypervisor
尽管由于访客和主机之间的边界,虚拟机在性能上有一定损失,但它们增加了额外的安全层。你还需要提前规划容量,因为每个虚拟机都需要分配专用的CPU、RAM和根文件系统。你可以通过共享多个虚拟机的只读根文件系统并使用叠加文件系统来处理可写存储,但协调很快变得复杂。
gVisor是一个用户空间内核,通过拦截虚拟化环境中进程发出的系统调用,提供强大的隔离。它可以独立运行或作为Docker运行时使用。对于我们的用途,我认为这是运行不受信任代码的理想中间地带。集成它只需要简单的更改Docker守护进程配置。
接下来,我们需要决定沙盒环境本身,如何将代码传入其中,以及如何提取执行结果。沙盒是否应该在后台持续存在?如果是的话,何时终止它?
一种初步的方法可能是为每种受支持的语言编写小的包装器。如果LLM生成代码,它可能是Python、JavaScript、TypeScript或Bash等语言。结果可能会打印到控制台或写入文件。语言有多种框架,弄清楚如何将代码传入和结果传出每种受支持的语言可能会很麻烦。
幸运的是,有Jupyter Notebook。它允许您创建代码和数据的交互式笔记本,并且多年来在数据科学社区中非常流行。它有一个内核的概念,该内核在目标环境中执行代码(例如,Python、Node.js等)。每个代码单元可以输出文本、图像或任意HTML。
与其构建一个定制的REPL,我们可以利用Jupyter Notebook并通过其API以编程方式提交代码执行并返回结果。无需重新发明轮子!因此,我们的沙盒环境应包括一个正确配置的Jupyter Notebook安装及其适当的内核。
Jupyter Notebook后端可能不适合长时间运行的代码,如网络应用服务器,但它适用于大多数用例。无论如何,Jupyter只是一个实现细节,不需要向消费者(LLM、代理等)暴露。为此,我们需要在Jupyter Notebook前面运行一个最小化的API服务器,该服务器接受代码执行请求,将其提交给Jupyter Notebook实例,并提取结果。这样,LLM或AI代理可以消耗一组简单的HTTP端点,这些端点处理生成代码的执行。
2、概念验证
安装gVisor 并通过默认设置配置Docker。
让我们快速编写一个PoC,启动一个Jupyter内核并执行返回文本和图像形式图表的代码:
import asyncio
from jupyter_client.manager import AsyncKernelManager
async def main():
km = AsyncKernelManager()
await km.start_kernel()
kc = km.client()
kc.start_channels()
await kc.wait_for_ready()
msg_id = kc.execute("""
import matplotlib.pyplot as plt
print("hello")
fig, ax = plt.subplots()
ax.plot([1, 2])
plt.show()
print("world")
""")
while True:
reply = await kc.get_iopub_msg()
if reply["parent_header"]["msg_id"] == msg_id:
msg_type = reply["msg_type"]
if msg_type == "stream":
print(f'TEXT: {reply["content"]["text"]}')
elif msg_type == "display_data":
content = reply["content"]
if "image/png" in content["data"]:
print(f'IMAGE: {content["data"]["image/png"]}\n')
elif msg_type == "error":
print(f'ERROR: {reply["content"]["traceback"][0]}')
break
elif msg_type == "status" and reply["content"]["execution_state"] == "idle":
break
kc.shutdown()
asyncio.run(main())
运行此代码的结果如下:
TEXT: hello
IMAGE: <长BASE64字符串>
TEXT: world
漂亮。
由于未解决的问题,远程启动Jupyter Notebook内核比较困难,因此我们需要在容器内部运行上述代码。最简单的方法是在容器内部运行一个HTTP服务器,同时在外部运行一个代理服务器,将请求转发到适当的内部服务器。不过,我认为这正是你在生产环境中应该如何做的。
鉴于此,我们现在需要做的就是围绕它包装一个FastAPI服务器,通过gVisor在Docker容器中运行它,并编写另一个FastAPI服务器来协调这些容器!
3、沙盒服务器解决方案
高层设计:
在我们的设计中,沙盒管理器公开了一个轻量级API服务器,带有用于启动沙盒、执行代码和返回结果的HTTP端点。
我不想直接向消费者(例如,代理)暴露代码执行引擎(Jupyter Notebook)的接口,因为它只是实现细节,我们不想仅仅绑定于Jupyter Notebook。
3.1 沙盒内API服务器
每个沙盒都将运行自己的API服务器,该服务器提供单个/execute
端点:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from jupyter_client.manager import AsyncKernelManager
import asyncio
import json
from io import BytesIO
app = FastAPI()
async def execute_code(code: str):
km = AsyncKernelManager()
await km.start_kernel()
kc = km.client()
kc.start_channels()
await kc.wait_for_ready()
msg_id = kc.execute(code)
async def stream_results():
try:
while True:
reply = await kc.get_iopub_msg()
msg_type = reply["msg_type"]
if msg_type == 'stream':
yield json.dumps({"text": reply['content']['text']}) + "\n"
elif msg_type == 'display_data':
data = reply['content']['data']
if "image/png" in data:
yield json.dumps({"image": data["image/png"]}) + "\n"
elif msg_type == "error":
traceback = "\n".join(reply['content']['traceback'])
yield json.dumps({"error": traceback}) + "\n"
break
elif msg_type == "status" and reply["content"]["execution_state"] == "idle":
break
except asyncio.CancelledError:
pass
finally:
kc.stop_channels()
await km.shutdown_kernel()
return StreamingResponse(stream_results(), media_type="application/x-ndjson")
@app.post("/execute")
async def execute(request: dict):
if "code" not in request:
raise HTTPException(status_code=400, detail="Missing 'code' field")
return await execute_code(request["code"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
让我们将其打包成 Docker 镜像。我们将使用 Jupyter 镜像作为基础。Dockerfile 如下:
FROM jupyter/base-notebook
RUN pip install --no-cache-dir fastapi uvicorn jupyter_client
WORKDIR /app
COPY inside_server.py /app/server.py
EXPOSE 8000
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
让我们构建并运行它:
docker build -t fastapi-jupyter-server .
docker run -p 8000:8000 fastapi-jupyter-server
你应该看到:
INFO: Started server process [7]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
我们来 curl 一下:
curl "http://localhost:8000/execute" -H "Content-Type: application/json" -d '{"code": "print(\"hello from sandbox\")"}'
{"text": "hello from sandbox\n"}
很棒。但是图片怎么办?
curl "http://localhost:8000/execute" -H "Content-Type: application/json" -d \
'{"code": "import matplotlib.pyplot as plt\nfig, ax = plt.subplots()\nax.plot([1, 2])\nplt.show()"}'
...
崩溃了。在大量的 VT100 转义序列中,我们发现了错误: ModuleNotFoundError\u001b[0m: No module named 'matplotlib
。当然,我忘了安装依赖项!该怎么办呢?
我们可以公开另一个端点,例如 /install
,它接受 Python 依赖项列表。但这对于本 PoC 来说太复杂了,我们只需在 Dockerfile 中 pip install
命令的末尾添加 matplotlib,就可以将它们嵌入到镜像中:
RUN pip install --no-cache-dir fastapi uvicorn jupyter_client matplotlib
重新运行 curl 命令,一切恢复正常。
3.2 沙盒管理 API 服务器
现在我们已经完成了基本功能,接下来考虑如何构建“外部”服务器。它应该暴露一个简单的 HTTP 接口,类似于:
GET /sandboxes
POST /sandboxes
POST /sandboxes/<id>/execute
GET /sandboxes/<id>
DELETE /sandboxes/<id>
这是一个基础的 REST API。我们已经实现了其中一个端点。
至于其他部分,我们可以直接调用 Docker API。GET /sandboxes
基本上是一个容器列表操作(通过某些标签过滤,以防你有其他非沙盒容器在运行),等等。
至于端口转发问题,让我们使用端口 0 并让 Docker 分配一个端口,然后从容器元数据中提取它。
准备好或没准备好,以下是快速实现的代码:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from contextlib import asynccontextmanager
import asyncio
import time
import docker
import httpx
import uuid
import json
# 配置
IMAGE_NAME = "fastapi-jupyter-server"
CONTAINER_PREFIX = "sandbox_"
SANDBOX_PORT = 8000
IDLE_TIMEOUT = 60
CHECK_INTERVAL = 60
client = docker.from_env()
hx = httpx.AsyncClient()
last_active = {}
async def terminate_idle_sandboxes():
while True:
await asyncio.sleep(CHECK_INTERVAL)
now = time.time()
for container in await asyncio.to_thread(list_sandboxes):
sandbox_id = container.id
last_time = last_active.get(sandbox_id, None)
if last_time is None:
print(f"终止未跟踪的沙盒 {sandbox_id} (服务器重启了?)")
try:
container.stop()
container.remove()
except docker.errors.NotFound:
pass
continue
if now - last_time > IDLE_TIMEOUT:
print(f"终止空闲沙盒 {sandbox_id} (空闲 {now - last_time:.1f} 秒)")
try:
container.stop()
container.remove()
last_active.pop(sandbox_id, None)
except docker.errors.NotFound:
last_active.pop(sandbox_id, None)
@asynccontextmanager
async def lifespan(app: FastAPI):
asyncio.create_task(terminate_idle_sandboxes())
yield
app = FastAPI(lifespan=lifespan)
class CreateSandboxRequest(BaseModel):
lang: str
class ExecuteRequest(BaseModel):
code: str
def list_sandboxes():
return client.containers.list(filters={"label": "sbx=1"})
@app.get("/sandboxes")
async def get_sandboxes():
sandboxes = [
{"id": container.id, "name": container.name, "status": container.status}
for container in list_sandboxes()
]
return {"sandboxes": sandboxes}
@app.post("/sandboxes")
async def create_sandbox(request: CreateSandboxRequest):
if request.lang.lower() != "python":
raise HTTPException(status_code=400, detail="仅支持 Python 沙盒。")
container_name = CONTAINER_PREFIX + str(uuid.uuid4())[:8]
try:
container = client.containers.run(
IMAGE_NAME,
name=container_name,
labels={
"sbx": "1",
"sbx_lang": request.lang.lower()
},
detach=True,
stdin_open=False,
tty=False,
ports={f"{SANDBOX_PORT}/tcp": 0}, # 自动分配端口
)
last_active[container.id] = time.time()
return {"id": container.id, "name": container.name, "status": container.status}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/sandboxes/{sandbox_id}")
async def get_sandbox(sandbox_id: str):
try:
container = client.containers.get(sandbox_id)
if "sbx" not in container.labels:
raise HTTPException(status_code=404, detail="沙盒未找到")
ports = container.attrs["NetworkSettings"]["Ports"]
port_mapping = ports.get(f"{SANDBOX_PORT}/tcp", [])
if not port_mapping:
raise HTTPException(status_code=500, detail="未找到暴露的端口")
host_port = port_mapping[0]["HostPort"]
return {
"id": container.id,
"name": container.name,
"status": container.status,
"port": host_port,
}
except docker.errors.NotFound:
raise HTTPException(status_code=404, detail="沙盒未找到")
@app.post("/sandboxes/{sandbox_id}/execute")
async def execute_code(sandbox_id: str, request: ExecuteRequest):
if not request.code.strip():
raise HTTPException(status_code=400, detail="代码不能为空。")
try:
container = client.containers.get(sandbox_id)
if "sbx" not in container.labels:
raise HTTPException(status_code=404, detail="沙盒未找到")
ports = container.attrs["NetworkSettings"]["Ports"]
port_mapping = ports.get(f"{SANDBOX_PORT}/tcp", [])
if not port_mapping:
raise HTTPException(status_code=500, detail="未找到暴露的端口")
host_port = port_mapping[0]["HostPort"]
sandbox_url = f"http://localhost:{host_port}/execute"
async def stream_response():
async with hx.stream("POST", sandbox_url, json=request.dict()) as response:
if not response.is_success:
raise HTTPException(status_code=response.status_code, detail=f"执行失败")
async for chunk in response.aiter_bytes():
yield chunk
last_active[sandbox_id] = time.time()
return StreamingResponse(stream_response(), media_type="application/x-ndjson")
except docker.errors.NotFound:
raise HTTPException(status_code=404, detail="沙盒未找到")
@app.delete("/sandboxes/{sandbox_id}")
async def delete_sandbox(sandbox_id: str):
try:
container = client.containers.get(sandbox_id)
if "sbx" not in container.labels:
raise HTTPException(status_code=404, detail="沙盒未找到")
container.stop()
container.remove()
last_active.pop(sandbox_id, None)
return {"message": f"沙盒 {sandbox_id} 已删除"}
except docker.errors.NotFound:
raise HTTPException(status_code=404, detail="沙盒未找到")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
让我解释一下它是如何工作的:
在创建容器时,我们设置了标签,这样在查询运行中的沙盒时可以进行过滤。
后台任务每分钟运行一次,检查空闲的沙盒(即没有活动的沙盒)并终止它们。在实际使用中,你显然需要考虑长时间运行的沙盒等。
我无法完全摆脱服务器的状态,因为 Docker 的容器标签是静态的,不能动态更新,因此需要 last_active
字典。
5、生产环境优化
为了将其生产化,你需要做很多工作,包括:
已知问题:
- 错误处理
- 指定代码依赖项
安全性:
- 身份验证与授权
- 额外的基于虚拟机的隔离
- 审计日志
反滥用:
- 网络流量的出口过滤
- 资源限制(CPU、内存、I/O、网络等)
功能:
- 文件上传和下载
- 持久存储
- 入站连接
- 支持其他语言/Jupyter Notebook 内核
然而,这应该是一个进一步探索代码沙箱的良好起点。如果你使用了其中的任何内容,请告诉我!
6、结束语
到现在为止,你应该对在Linux上支持的不同运行不受信任代码的技术有了更好的理解,并且有一个使用Docker和gVisor的具体实现。我认为这是一个在安全性和性能之间很好的折中方案,当然,如前所述,在将其部署到生产环境之前,还有许多改进的空间。
如果你用这些内容构建了一些很棒的东西,请告诉我!🚀
原文链接:Code Sandboxes for LLMs and AI Agents
汇智网翻译整理,转载请标明出处