Python SDK
使用装饰器和工作流构建器在 Python 中编写 JamJet 工作流。
Python SDK
JamJet Python SDK 让你可以用 Python 编写工作流,而不是 YAML。两者都会编译为相同的 IR,并在相同的 Rust 运行时上运行。
安装
pip install jamjet装饰器 API
装饰器 API 是编写工作流最简洁的方式。用 @node 装饰一个函数,JamJet 会根据函数签名和返回类型推断节点类型。
from jamjet import workflow, node, State
@workflow(id="hello-agent", version="0.1.0")
class HelloWorkflow:
@node(start=True)
async def think(self, state: State) -> State:
response = await self.model(
model="claude-haiku-4-5-20251001",
prompt=f"Answer clearly: {state['query']}",
)
return {"answer": response.text}运行它:
import asyncio
from jamjet import JamJetClient
async def main():
client = JamJetClient() # connects to http://localhost:7700
result = await client.run(
HelloWorkflow,
input={"query": "What is JamJet?"}
)
print(result.state["answer"])
asyncio.run(main())工作流构建器
如需更多控制,使用构建器 API:
from jamjet.workflows import WorkflowBuilder, ModelNode, ToolNode, BranchNode
wf = (
WorkflowBuilder("research-agent", version="0.2.0")
.state_schema(query=str, results=list, answer=str)
.add_node(
ToolNode("search")
.server("brave-search")
.tool("web_search")
.arguments({"query": "{{ state.query }}", "count": 5})
.output_key("results")
.next("draft")
)
.add_node(
ModelNode("draft")
.model("claude-sonnet-4-6")
.prompt("""
Search results: {{ state.results | join('\\n') }}
Answer: {{ state.query }}
""")
.output_key("answer")
.next("end")
)
.start("search")
.build()
)状态访问
State 是一个类似字典的类型化对象。通过 state["key"] 或 state.key 访问键:
@node
async def process(self, state: State) -> State:
query = state["query"] # raises KeyError if missing
context = state.get("context") # returns None if missing
# Return a partial state patch — only keys you include are updated
return {"answer": "...", "confidence": 0.95}提示: 节点返回状态补丁,而不是完整状态。你只需返回想要更新的键。现有的键会被保留。
模型调用
在任何节点内使用 self.model() 来调用 LLM:
@node
async def think(self, state: State) -> State:
response = await self.model(
model="claude-sonnet-4-6",
prompt=f"回答:{state['query']}",
system="你简洁且准确。",
temperature=0.3,
max_tokens=512,
)
# response.text — 完整文本
# response.usage.input_tokens
# response.usage.output_tokens
# response.model
return {"answer": response.text}工具调用(MCP)
使用 self.tool() 调用已连接的 MCP 服务器中的工具:
@node
async def search(self, state: State) -> State:
result = await self.tool(
server="brave-search",
tool="web_search",
arguments={"query": state["query"], "count": 5},
)
return {"results": result.content}HTTP 调用
@node
async def fetch(self, state: State) -> State:
result = await self.http(
method="GET",
url=f"https://api.example.com/items/{state['item_id']}",
headers={"Authorization": f"Bearer {self.env('API_KEY')}"},
)
return {"raw": result.json()}分支
from jamjet import node, branch
@node
@branch(
conditions=[
("state['confidence'] >= 0.9", "done"),
("state['confidence'] >= 0.5", "refine"),
],
default="escalate",
)
async def route(self, state: State) -> State:
return {} # 分支节点读取现有状态 — 无需输出并行执行
from jamjet.workflows import ParallelNode
.add_node(
ParallelNode("gather")
.branches(["search", "fetch-docs", "check-cache"])
.join("synthesize")
)重试策略
from jamjet.workflows import RetryPolicy
ModelNode("think")
.model("claude-haiku-4-5-20251001")
.prompt("...")
.retry(RetryPolicy(
max_attempts=3,
backoff="exponential",
delay_ms=500,
))运行工作流
from jamjet import JamJetClient
client = JamJetClient(base_url="http://localhost:7700")
# 运行并等待完成
result = await client.run(wf, input={"query": "..."})
print(result.state)
print(result.execution_id)
print(result.duration_ms)
# 提交后即返回 — 立即获得执行 ID
exec_id = await client.submit(wf, input={"query": "..."})
# 轮询状态
status = await client.get_execution(exec_id)
print(status.status) # running | completed | failed
# 实时流式接收事件
async for event in client.stream(wf, input={"query": "..."}):
print(event.type, event.node_id)类型注解
SDK 附带完整的类型存根。在严格模式下:
from jamjet import State, NodeResult
from typing import TypedDict
class MyState(TypedDict):
query: str
answer: str
confidence: float
@node(start=True)
async def think(self, state: MyState) -> NodeResult[MyState]:
...
return NodeResult(answer="...", confidence=0.9)配置
from jamjet import JamJetClient, JamJetConfig
client = JamJetClient(config=JamJetConfig(
base_url="http://localhost:7700",
api_key="YOUR_API_KEY", # 用于托管/生产环境
timeout_ms=30_000,
default_model="claude-haiku-4-5-20251001",
))或通过环境变量:
export JAMJET_URL=http://localhost:7700
export JAMJET_API_KEY=...