JamJet

Conceptos Fundamentales

Comprende agentes, nodos, estado y durabilidad en JamJet.

Conceptos Fundamentales

JamJet es un runtime duradero para agentes de IA. La idea clave es simple: los flujos de trabajo se ejecutan como transiciones de estado recuperables, no como llamadas de función efímeras. Eso es lo que hace posible la reproducción, la recuperación ante fallos, las pausas de aprobación y los rastreos exactos. Esta página cubre los primitivos clave.

Elegir un enfoque de creación

JamJet tiene tres formas de construir agentes. Las tres se compilan al mismo IR y se ejecutan en el mismo runtime de Rust — difieren en cuándo usar cada una.

Diagrama de decisión

¿Tu agente llama a LLMs con estrategias de razonamiento
(react, plan-and-execute, critic)?

├─ SÍ → Usa el SDK de Agent + Workflow
│        (from jamjet import Agent, Workflow, tool)

└─ NO → ¿El flujo es principalmente orquestación de herramientas, enrutamiento,
        o configuración que quieres cambiar sin despliegues de código?

        ├─ SÍ → Usa workflows YAML

        └─ NO → Usa la API de decoradores de Python
                 (@workflow, @node)

Los tres enfoques

EnfoqueIdeal paraEstadoEjemplo
SDK de Agent + WorkflowSistemas multi-agente con razonamientoPydantic BaseModel (tipado, validado)claims-processing, wealth-management
Workflows YAMLPipelines de herramientas, flujos basados en configuraciónSchema en YAML (tipado)rag-assistant
API de decoradores de PythonLógica personalizada, acceso a bibliotecas de PythonState basado en diccionarioDocumentación del SDK de Python

SDK de Agent + Workflow

Usa esto cuando tus agentes necesiten pensar — elegir herramientas, razonar sobre resultados, refinar salidas. Cada agente obtiene una estrategia de razonamiento adecuada para su tarea.

from jamjet import Agent, Workflow, tool
from pydantic import BaseModel

@tool
def search_web(query: str) -> dict:
    """Buscar información en la web."""
    ...

researcher = Agent(
    name="researcher",
    model="claude-sonnet-4-6",
    tools=[search_web],
    instructions="Encuentra información precisa sobre el tema dado.",
    strategy="react",        # bucle ajustado de uso de herramientas
    max_iterations=5,
)

writer = Agent(
    name="writer",
    model="claude-sonnet-4-6",
    instructions="Escribe un resumen claro a partir de la investigación.",
    strategy="critic",       # borrador → evaluar → refinar
    max_iterations=3,
)

workflow = Workflow("research-writer", version="0.1.0")

@workflow.state
class ResearchState(BaseModel):
    topic: str
    research: str | None = None
    summary: str | None = None

@workflow.step
async def research(state: ResearchState) -> ResearchState:
    result = await researcher.run(f"Investigar: {state.topic}")
    return state.model_copy(update={"research": result.output})

@workflow.step
async def write(state: ResearchState) -> ResearchState:
    result = await writer.run(f"Resumir:\n{state.research}")
    return state.model_copy(update={"summary": result.output})

Elige esto cuando: los agentes necesiten estrategias (react, plan-and-execute, critic, consensus, debate), quieras estado tipado con Pydantic, necesites human-in-the-loop (human_approval=True), o múltiples agentes especialistas colaboren.

Flujos de trabajo YAML

Usa esto cuando el flujo sea orquestación determinista — llama a esta herramienta, luego a ese modelo, enruta según una condición. No se necesita razonamiento del agente.

id: summarize-url
version: "0.1.0"

nodes:
  fetch:
    type: tool
    server: web-fetcher
    tool: fetch_page
    arguments:
      url: "{{ state.url }}"
    output_key: page_content
    next: summarize

  summarize:
    type: model
    model: claude-haiku-4-5-20251001
    prompt: "Summarize: {{ state.page_content }}"
    output_key: summary

Elige esto cuando: el flujo sea un pipeline fijo de herramientas y modelos, quieras que personas no técnicas editen el flujo, o necesites cambiar el enrutamiento sin redesplegar código.

API de decoradores Python

Usa esto cuando necesites lógica Python personalizada dentro de los nodos — transformaciones de datos, llamadas a bibliotecas, lógica condicional demasiado compleja para YAML.

from jamjet import workflow, node, State

@workflow(id="data-processor", version="0.1.0")
class DataProcessor:
    @node(start=True)
    async def process(self, state: State) -> State:
        import pandas as pd
        df = pd.read_csv(state["file_path"])
        summary = df.describe().to_dict()
        return {"analysis": summary}

    @node
    async def report(self, state: State) -> State:
        response = await self.model(
            model="claude-sonnet-4-6",
            prompt=f"Write a report from this analysis:\n{state['analysis']}",
        )
        return {"report": response.text}

Elige esto cuando: necesites bibliotecas Python dentro de los nodos, la lógica sea demasiado compleja para YAML pero no requiera razonamiento del agente, o prefieras flujos de trabajo basados en clases.

¿Puedo combinarlos?

Sí. Los tres se compilan a la misma IR. Un flujo de trabajo YAML puede llamar a un agente como herramienta. Un paso de flujo Python puede invocar un sub-flujo definido en YAML. El runtime no distingue qué superficie de creación produjo la IR.


Flujos de trabajo

Un flujo de trabajo es un grafo dirigido de nodos. Tiene:

  • Un id y version únicos
  • Un state_schema — la estructura tipada de los datos que fluyen por el grafo
  • Un nodo start donde comienza la ejecución
  • Uno o más nodos end
workflow:
  id: my-agent
  version: 0.1.0
  state_schema:
    query: str
    answer: str
    confidence: float
  start: think

Los flujos de trabajo se compilan a un grafo IR (Representación Intermedia) antes de ejecutarse. La IR es lo que el planificador Rust realmente ejecuta — YAML y Python son solo superficies de creación.

Nodos

Los nodos son las unidades de cómputo en un flujo de trabajo. Cada nodo tiene un type que determina qué hace:

Tipo de nodoQué hace
modelLlama a un LLM (Claude, GPT-4, Gemini, etc.)
toolLlama a una herramienta externa vía MCP
httpRealiza una solicitud HTTP
branchEnruta la ejecución según una condición
parallelDistribuye hacia múltiples ramas simultáneamente
waitPausa hasta que ocurra un evento externo
evalEvalúa la calidad del output (rúbrica, aserción, latencia)
endTermina el flujo de trabajo

Cada nodo lee y escribe en el estado.

Estado

El estado es el almacén de datos compartido para la ejecución de un flujo de trabajo. Persiste a través de los nodos y a través de reinicios.

state_schema:
  query: str        # entrada del usuario
  search_results: list[str]  # datos intermedios
  answer: str       # salida final

El estado está tipado — el esquema se valida en tiempo de compilación. En tiempo de ejecución, cada nodo puede leer cualquier clave de estado y escribir en su output_key.

consejo: El estado se almacena en la base de datos, no en memoria. Si el runtime falla durante la ejecución, el estado se recupera completamente y la ejecución se reanuda desde el último checkpoint.

Ejecuciones

Una ejecución es una única corrida de un flujo de trabajo con una entrada específica. Cada ejecución obtiene un ID único (por ejemplo, exec_01JM4X8NKWP2).

Las ejecuciones son:

  • Durables — almacenadas en la base de datos, sobreviven reinicios
  • Observables — cada transición de estado se registra como un evento
  • Inspeccionables — visualiza el estado completo, la línea de tiempo de eventos y el uso de tokens con jamjet inspect

Durabilidad

La durabilidad es la garantía central de JamJet: las ejecuciones siempre se completan, incluso si el runtime falla.

Esto funciona mediante event sourcing:

  1. Antes de que cada nodo se ejecute, se escribe un evento node_started en la base de datos
  2. Después de que cada nodo se completa, se escribe un evento node_completed con el parche de estado
  3. Al reiniciar, el scheduler reproduce el registro de eventos para reconstruir exactamente dónde se detuvo la ejecución
  4. La ejecución se reanuda desde el primer nodo incompleto

No se pierde trabajo. Ningún nodo se ejecuta dos veces.

nota: Esto es diferente de la entrega "al menos una vez". El scheduler de JamJet usa bloqueos distribuidos para garantizar que cada nodo se ejecute exactamente una vez, incluso con múltiples procesos worker.

Agentes

Un agente es un workflow que puede:

  • Ser descubierto y llamado por otros agentes (mediante Agent Cards)
  • Delegar tareas a otros agentes (mediante el protocolo A2A)
  • Mantener estado de larga duración a través de múltiples interacciones con el usuario

Cada agente tiene una Agent Card — una descripción legible por máquina de sus capacidades, endpoints y esquema de entrada/salida. Esta es la base del protocolo A2A.

El Scheduler

El scheduler de JamJet está escrito en Rust y se ejecuta como parte de jamjet dev (localmente) o el runtime alojado (en producción).

Realiza:

  1. Polling de la cola de ejecución en busca de trabajo pendiente
  2. Adquisición de un bloqueo sobre la ejecución para prevenir ejecuciones duplicadas
  3. Despacho de nodos a hilos worker
  4. Escritura de checkpoints después de cada nodo

El scheduler es la razón por la cual los workflows de JamJet son durables por defecto — nunca olvida una ejecución.

Local vs. Producción

Característicajamjet dev (local)Alojado / auto-alojado
AlmacenamientoSQLitePostgreSQL
WorkersProceso únicoDistribuido
Servidores MCPstdio localSSE/HTTP remoto
AutenticaciónNingunamTLS / claves API

El modelo de programación es idéntico — el mismo código YAML o Python se ejecuta sin cambios en ambos entornos.


Próximos pasos

On this page