JamJet

Python SDK

デコレータとワークフロービルダーを使用してPythonでJamJetワークフローを記述します。

Python SDK

JamJet Python SDKを使用すると、YAMLの代わりにPythonでワークフローを記述できます。両方とも同じIRにコンパイルされ、同じRustランタイム上で実行されます。

インストール

pip install jamjet

デコレータAPI

デコレータAPIは、ワークフローを記述する最も簡潔な方法です。関数を@nodeでデコレートすると、JamJetは関数シグネチャと戻り値の型からノードタイプを推論します。

from jamjet import workflow, node, State

@workflow(id="hello-agent", version="0.1.0")
class HelloWorkflow:
    @node(start=True)
    async def think(self, state: State) -> State:
        response = await self.model(
            model="claude-haiku-4-5-20251001",
            prompt=f"明確に答えてください: {state['query']}",
        )
        return {"answer": response.text}

実行方法:

import asyncio
from jamjet import JamJetClient

async def main():
    client = JamJetClient()  # http://localhost:7700に接続
    result = await client.run(
        HelloWorkflow,
        input={"query": "JamJetとは何ですか?"}
    )
    print(result.state["answer"])

asyncio.run(main())

ワークフロービルダー

より細かい制御が必要な場合は、ビルダーAPIを使用します:

from jamjet.workflows import WorkflowBuilder, ModelNode, ToolNode, BranchNode

wf = (
    WorkflowBuilder("research-agent", version="0.2.0")
    .state_schema(query=str, results=list, answer=str)
    .add_node(
        ToolNode("search")
        .server("brave-search")
        .tool("web_search")
        .arguments({"query": "{{ state.query }}", "count": 5})
        .output_key("results")
        .next("draft")
    )
    .add_node(
        ModelNode("draft")
        .model("claude-sonnet-4-6")
        .prompt("""
            検索結果: {{ state.results | join('\\n') }}

            回答: {{ state.query }}
        """)
        .output_key("answer")
        .next("end")
    )
    .start("search")
    .build()
)

Stateアクセス

Stateは型付きのdict風オブジェクトです。state["key"]またはstate.keyでキーにアクセスします:

@node
async def process(self, state: State) -> State:
    query = state["query"]          # 存在しない場合はKeyErrorを発生
    context = state.get("context")  # 存在しない場合はNoneを返す

    # 部分的なstateパッチを返す — 含めたキーのみが更新されます
    return {"answer": "...", "confidence": 0.95}

tip: ノードはstateパッチを返し、完全なstateは返しません。更新したいキーのみを返せばよく、既存のキーは保持されます。

モデル呼び出し

任意のノード内で self.model() を使用してLLMを呼び出します:

@node
async def think(self, state: State) -> State:
    response = await self.model(
        model="claude-sonnet-4-6",
        prompt=f"回答: {state['query']}",
        system="簡潔かつ正確に回答してください。",
        temperature=0.3,
        max_tokens=512,
    )

    # response.text — 完全なテキスト
    # response.usage.input_tokens
    # response.usage.output_tokens
    # response.model

    return {"answer": response.text}

ツール呼び出し(MCP)

接続されたMCPサーバーからツールを呼び出すには self.tool() を使用します:

@node
async def search(self, state: State) -> State:
    result = await self.tool(
        server="brave-search",
        tool="web_search",
        arguments={"query": state["query"], "count": 5},
    )
    return {"results": result.content}

HTTP呼び出し

@node
async def fetch(self, state: State) -> State:
    result = await self.http(
        method="GET",
        url=f"https://api.example.com/items/{state['item_id']}",
        headers={"Authorization": f"Bearer {self.env('API_KEY')}"},
    )
    return {"raw": result.json()}

分岐処理

from jamjet import node, branch

@node
@branch(
    conditions=[
        ("state['confidence'] >= 0.9", "done"),
        ("state['confidence'] >= 0.5", "refine"),
    ],
    default="escalate",
)
async def route(self, state: State) -> State:
    return {}  # branchノードは既存のstateを読み取ります — 出力は不要です

並列実行

from jamjet.workflows import ParallelNode

.add_node(
    ParallelNode("gather")
    .branches(["search", "fetch-docs", "check-cache"])
    .join("synthesize")
)

リトライポリシー

from jamjet.workflows import RetryPolicy

ModelNode("think")
    .model("claude-haiku-4-5-20251001")
    .prompt("...")
    .retry(RetryPolicy(
        max_attempts=3,
        backoff="exponential",
        delay_ms=500,
    ))

ワークフローの実行

from jamjet import JamJetClient

client = JamJetClient(base_url="http://localhost:7700")

# 実行して完了を待機

result = await client.run(wf, input={"query": "..."})
print(result.state)
print(result.execution_id)
print(result.duration_ms)

# 非同期実行 — 即座に実行IDを取得

exec_id = await client.submit(wf, input={"query": "..."})

# ステータスをポーリング

status = await client.get_execution(exec_id)
print(status.status)  # running | completed | failed

# イベントをリアルタイムでストリーミング

async for event in client.stream(wf, input={"query": "..."}):
    print(event.type, event.node_id)

型アノテーション

SDKは完全な型スタブを提供しています。strictモードでは:

from jamjet import State, NodeResult
from typing import TypedDict

class MyState(TypedDict):
    query: str
    answer: str
    confidence: float

@node(start=True)
async def think(self, state: MyState) -> NodeResult[MyState]:
    ...
    return NodeResult(answer="...", confidence=0.9)

設定

from jamjet import JamJetClient, JamJetConfig

client = JamJetClient(config=JamJetConfig(
    base_url="http://localhost:7700",
    api_key="YOUR_API_KEY",  # ホスティング/本番環境用
    timeout_ms=30_000,
    default_model="claude-haiku-4-5-20251001",
))

または環境変数経由で:

export JAMJET_URL=http://localhost:7700
export JAMJET_API_KEY=...

On this page