JamJet

Python SDK

使用装饰器和工作流构建器在 Python 中编写 JamJet 工作流。

Python SDK

JamJet Python SDK 让你可以用 Python 编写工作流,而不是 YAML。两者都会编译为相同的 IR,并在相同的 Rust 运行时上运行。

安装

pip install jamjet

装饰器 API

装饰器 API 是编写工作流最简洁的方式。用 @node 装饰一个函数,JamJet 会根据函数签名和返回类型推断节点类型。

from jamjet import workflow, node, State

@workflow(id="hello-agent", version="0.1.0")
class HelloWorkflow:
    @node(start=True)
    async def think(self, state: State) -> State:
        response = await self.model(
            model="claude-haiku-4-5-20251001",
            prompt=f"Answer clearly: {state['query']}",
        )
        return {"answer": response.text}

运行它:

import asyncio
from jamjet import JamJetClient

async def main():
    client = JamJetClient()  # connects to http://localhost:7700
    result = await client.run(
        HelloWorkflow,
        input={"query": "What is JamJet?"}
    )
    print(result.state["answer"])

asyncio.run(main())

工作流构建器

如需更多控制,使用构建器 API:

from jamjet.workflows import WorkflowBuilder, ModelNode, ToolNode, BranchNode

wf = (
    WorkflowBuilder("research-agent", version="0.2.0")
    .state_schema(query=str, results=list, answer=str)
    .add_node(
        ToolNode("search")
        .server("brave-search")
        .tool("web_search")
        .arguments({"query": "{{ state.query }}", "count": 5})
        .output_key("results")
        .next("draft")
    )
    .add_node(
        ModelNode("draft")
        .model("claude-sonnet-4-6")
        .prompt("""
            Search results: {{ state.results | join('\\n') }}

            Answer: {{ state.query }}
        """)
        .output_key("answer")
        .next("end")
    )
    .start("search")
    .build()
)

状态访问

State 是一个类似字典的类型化对象。通过 state["key"]state.key 访问键:

@node
async def process(self, state: State) -> State:
    query = state["query"]          # raises KeyError if missing
    context = state.get("context")  # returns None if missing

    # Return a partial state patch — only keys you include are updated
    return {"answer": "...", "confidence": 0.95}

提示: 节点返回状态补丁,而不是完整状态。你只需返回想要更新的键。现有的键会被保留。

模型调用

在任何节点内使用 self.model() 来调用 LLM:

@node
async def think(self, state: State) -> State:
    response = await self.model(
        model="claude-sonnet-4-6",
        prompt=f"回答:{state['query']}",
        system="你简洁且准确。",
        temperature=0.3,
        max_tokens=512,
    )

    # response.text — 完整文本
    # response.usage.input_tokens
    # response.usage.output_tokens
    # response.model

    return {"answer": response.text}

工具调用(MCP)

使用 self.tool() 调用已连接的 MCP 服务器中的工具:

@node
async def search(self, state: State) -> State:
    result = await self.tool(
        server="brave-search",
        tool="web_search",
        arguments={"query": state["query"], "count": 5},
    )
    return {"results": result.content}

HTTP 调用

@node
async def fetch(self, state: State) -> State:
    result = await self.http(
        method="GET",
        url=f"https://api.example.com/items/{state['item_id']}",
        headers={"Authorization": f"Bearer {self.env('API_KEY')}"},
    )
    return {"raw": result.json()}

分支

from jamjet import node, branch

@node
@branch(
    conditions=[
        ("state['confidence'] >= 0.9", "done"),
        ("state['confidence'] >= 0.5", "refine"),
    ],
    default="escalate",
)
async def route(self, state: State) -> State:
    return {}  # 分支节点读取现有状态 — 无需输出

并行执行

from jamjet.workflows import ParallelNode

.add_node(
    ParallelNode("gather")
    .branches(["search", "fetch-docs", "check-cache"])
    .join("synthesize")
)

重试策略

from jamjet.workflows import RetryPolicy

ModelNode("think")
    .model("claude-haiku-4-5-20251001")
    .prompt("...")
    .retry(RetryPolicy(
        max_attempts=3,
        backoff="exponential",
        delay_ms=500,
    ))

运行工作流

from jamjet import JamJetClient

client = JamJetClient(base_url="http://localhost:7700")

# 运行并等待完成

result = await client.run(wf, input={"query": "..."})
print(result.state)
print(result.execution_id)
print(result.duration_ms)

# 提交后即返回 — 立即获得执行 ID

exec_id = await client.submit(wf, input={"query": "..."})

# 轮询状态

status = await client.get_execution(exec_id)
print(status.status)  # running | completed | failed

# 实时流式接收事件

async for event in client.stream(wf, input={"query": "..."}):
    print(event.type, event.node_id)

类型注解

SDK 附带完整的类型存根。在严格模式下:

from jamjet import State, NodeResult
from typing import TypedDict

class MyState(TypedDict):
    query: str
    answer: str
    confidence: float

@node(start=True)
async def think(self, state: MyState) -> NodeResult[MyState]:
    ...
    return NodeResult(answer="...", confidence=0.9)

配置

from jamjet import JamJetClient, JamJetConfig

client = JamJetClient(config=JamJetConfig(
    base_url="http://localhost:7700",
    api_key="YOUR_API_KEY",  # 用于托管/生产环境
    timeout_ms=30_000,
    default_model="claude-haiku-4-5-20251001",
))

或通过环境变量:

export JAMJET_URL=http://localhost:7700
export JAMJET_API_KEY=...

On this page