Skip to main content

Server-Sent Events

Tool execution in ORS utilises Server-Sent Events (SSE) to deliver results. SSE keeps connections alive during long-running tool calls and chunks large responses for reliable delivery.

Why SSE for Tool Calls?

Traditional request-response doesn’t work well for tool execution: Problems with standard HTTP:
  • Bash commands can run for minutes
  • LLM calls take 10-30 seconds
  • File operations vary in duration
  • Connections may time out before the tool finishes
Benefits of SSE:
  • Keeps connections alive with periodic pings while tools execute
  • Chunks large responses (>4KB) into smaller pieces for reliable delivery
  • Enables client reconnection via task IDs if the connection drops
  • Built into browsers and HTTP libraries
  • Simpler than WebSockets (one-way, server → client)

SSE Basics

What is SSE?

Server-Sent Events is a standard for server-to-client streaming over HTTP:
GET /stream
Accept: text/event-stream

HTTP/1.1 200 OK
Content-Type: text/event-stream

event: message
data: {"status": "processing"}

event: message
data: {"status": "complete"}

event: done
data: finished
Format:
  • Each event has an event type and data payload
  • Events separated by blank lines
  • Connection stays open for multiple events
  • Client closes when done

Tool Call SSE Flow

Request

POST /gsm8k/call
X-Session-ID: abc-123
Accept: text/event-stream
Content-Type: application/json

{
  "name": "submit",
  "input": {"answer": 42},
  "task_id": "optional-trace-id"
}
Headers:
  • Accept: text/event-stream - Recommended (server returns SSE regardless)
  • X-Session-ID - Session identifier
Body:
  • name: Tool to call
  • input: Tool parameters
  • task_id: Optional ID for reconnection/tracing

Response Stream

The server sends events in this sequence:

1. Task ID Event

First event identifies the task:
event: task_id
data: abc-123-def-456
Purpose: Client can use this ID to reconnect if disconnected.

2. Chunk Events (if result is large)

For results > 4KB, sent in chunks:
event: chunk
data: {"ok": true, "output": {"blocks": [{"type": "text", "text": "Part 1...

event: chunk
data: ...continuing text...

event: chunk
data: ...more text..."}], "reward": 1.0, "finished": true}}
Purpose: Deliver large results in 4KB chunks.

3. End Event

Final event with complete result:
event: end
data: {"ok": true, "output": {"blocks": [{"type": "text", "text": "Complete!"}], "reward": 1.0, "finished": true}}
For small results (under 4KB), this is the only data event.

4. Error Event (if error occurs)

If tool execution fails:
event: error
data: Tool execution failed: Invalid answer format
Note: This is an HTTP-level error (tool execution itself failed), not a tool logic error.

Complete Example

Successful tool call:
POST /gsm8k/call
Accept: text/event-stream
X-Session-ID: abc-123
Content-Type: application/json

{"name": "submit", "input": {"answer": 42}}
Response:
event: task_id
data: task-xyz-789

event: end
data: {"ok": true, "output": {"blocks": [{"type": "text", "text": "Correct!"}], "reward": 1.0, "finished": true}}
Failed tool call:
event: task_id
data: task-xyz-790

event: error
data: Session not found

Event Types

task_id

event: task_id
data: <task-identifier>
Purpose: Provide task ID for reconnection. Data: String task ID (UUID format). When: First event in every stream. Usage:
# Save task_id for reconnection
task_id = None
for event in stream:
    if event.event == "task_id":
        task_id = event.data

chunk

event: chunk
data: <partial-json>
Purpose: Deliver large results in 4KB chunks. Data: Partial JSON string (may not be valid JSON until fully assembled). When: For results > 4KB, sent progressively. Usage:
# Accumulate chunks
buffer = ""
for event in stream:
    if event.event == "chunk":
        buffer += event.data

end

event: end
data: <complete-json>
Purpose: Final result (either complete small result or last chunk of large result). Data: Complete JSON for RunToolOutput:
{
  "ok": true,
  "output": {
    "blocks": [...],
    "reward": 1.0,
    "finished": true
  }
}
When: Last event in successful stream. Usage:
for event in stream:
    if event.event == "end":
        result = json.loads(event.data)
        return result

error

event: error
data: <error-message>
Purpose: Indicate tool execution failure. Data: String error message. When: Tool execution failed at HTTP/server level. Note: This is different from tool logic errors, which return {"ok": false, "error": "..."} in an end event.

Handling SSE in Clients

Python (httpx)

import httpx
import json

def call_tool(base_url, session_id, tool_name, tool_input):
    url = f"{base_url}/gsm8k/call"

    with httpx.stream(
        "POST",
        url,
        headers={
            "X-Session-ID": session_id,
            "Accept": "text/event-stream"
        },
        json={
            "name": tool_name,
            "input": tool_input
        },
        timeout=60.0
    ) as response:
        buffer = ""
        event_type = ""

        for line in response.iter_lines():
            if line.startswith("event: "):
                event_type = line[7:]
            elif line.startswith("data: "):
                data = line[6:]

                if event_type == "end":
                    # Combine buffer + data for complete result
                    complete_data = buffer + data
                    return json.loads(complete_data)

                elif event_type == "chunk":
                    buffer += data

                elif event_type == "error":
                    raise Exception(f"Tool error: {data}")

                elif event_type == "task_id":
                    task_id = data  # Save for reconnection

Python (ORS SDK)

The Python SDK handles SSE automatically:
from ors.client import ORS

client = ORS(base_url="http://localhost:8080")
env = client.environment("gsm8k")

with env.session(task=task) as session:
    result = session.call_tool("submit", {"answer": 42})
    # SDK handles SSE internally
    print(result.reward)  # 1.0

JavaScript (fetch)

async function callTool(baseUrl, sessionId, toolName, toolInput) {
  const response = await fetch(`${baseUrl}/gsm8k/call`, {
    method: 'POST',
    headers: {
      'X-Session-ID': sessionId,
      'Accept': 'text/event-stream',
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      name: toolName,
      input: toolInput
    })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';
  let eventType = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split(/\r?\n/);

    for (const line of lines) {
      if (line.startsWith('event: ')) {
        eventType = line.slice(7).trim();
      } else if (line.startsWith('data: ')) {
        const data = line.slice(6).trim();

        if (eventType === 'end') {
          return JSON.parse(buffer + data);
        } else if (eventType === 'chunk') {
          buffer += data;
        } else if (eventType === 'error') {
          throw new Error(data);
        }
      }
    }
  }
}

curl

# Stream events to stdout
curl -X POST http://localhost:8080/gsm8k/call \
  -H "X-Session-ID: abc-123" \
  -H "Accept: text/event-stream" \
  -H "Content-Type: application/json" \
  -d '{"name": "submit", "input": {"answer": 42}}' \
  -N

# Output:
# event: task_id
# data: task-123
#
# event: end
# data: {"ok": true, "output": {...}}

Reconnection

If the SSE connection drops, clients can reconnect using the task_id:

Save Task ID

task_id = None

# First connection
for event in stream:
    if event.event == "task_id":
        task_id = event.data
        # Save for reconnection

Reconnect with Task ID

# If connection drops, reconnect with saved task_id
response = httpx.post(
    f"{base_url}/gsm8k/call",
    headers={"X-Session-ID": session_id},
    json={
        "name": tool_name,
        "input": tool_input,
        "task_id": task_id  # ← Use saved ID
    }
)
What happens:
  • Server checks if task is already running
  • If still running, stream events from current state
  • If completed, immediately send cached result
  • If unknown, returns an error event with unknown task_id
Caching: Completed task results are cached for 60 seconds.

Server-Side Considerations

Implementing SSE in Your Server

If implementing an ORS server from scratch:
from sse_starlette import EventSourceResponse

@app.post("/{env_name}/call")
async def call_tool(tool_call: ToolCall, env: Environment):
    async def event_generator():
        # 1. Send task ID
        task_id = generate_task_id()
        yield {"event": "task_id", "data": task_id}

        # 2. Execute tool
        try:
            result = await env.call_tool(
                tool_call.name,
                tool_call.input
            )

            # 3. Serialize result
            result_json = json.dumps(result)

            # 4. Chunk if large (>4KB)
            if len(result_json) > 4096:
                for i in range(0, len(result_json), 4096):
                    chunk = result_json[i:i+4096]
                    event = "end" if i + 4096 >= len(result_json) else "chunk"
                    yield {"event": event, "data": chunk}
            else:
                # 5. Send complete result
                yield {"event": "end", "data": result_json}

        except Exception as e:
            # 6. Send error
            yield {"event": "error", "data": str(e)}

    return EventSourceResponse(event_generator())

Keep-Alive Pings

The server sends periodic pings (every 10 seconds) to keep the connection alive:
: ping

: ping
These are SSE comments (lines starting with :) and are ignored by clients.

Error Handling

HTTP-Level Errors

Tool execution fails at server level:
event: error
data: Session not found
Causes:
  • Session doesn’t exist
  • Session timed out
  • Tool name not recognized
  • Server internal error
Client handling:
for event in stream:
    if event.event == "error":
        raise ToolExecutionError(event.data)

Tool Logic Errors

Tool executes but returns error:
event: end
data: {"ok": false, "error": "Invalid answer format"}
Causes:
  • Tool input validation failed
  • Tool logic error (e.g., file not found)
  • Expected failure (e.g., incorrect answer)
Client handling:
result = json.loads(event.data)
if not result["ok"]:
    print(f"Tool failed: {result['error']}")

Performance Considerations

Chunking

Results > 4KB are automatically chunked:
  • Chunk size: 4KB (4096 bytes)
  • Purpose: Prevent memory issues with large outputs
  • Client: Reassemble chunks before parsing JSON

Timeouts

SSE streams can run indefinitely:
  • Connection: Set reasonable timeout on client (e.g., 60s for quick tools, 600s for bash)
  • Keep-alive: Server sends pings every 10s to prevent idle timeout
  • Session: 15-minute session timeout still applies

Connection Limits

Be mindful of concurrent SSE connections:
  • Each connection holds server resources
  • Limit concurrent tool calls per agent
  • Use connection pooling in clients

Debugging SSE

View Raw SSE Stream

# See all events
curl -N http://localhost:8080/gsm8k/call \
  -H "X-Session-ID: abc-123" \
  -H "Accept: text/event-stream" \
  -d '{"name": "submit", "input": {"answer": 42}}'

Common Issues

Issue: “Connection closed immediately”
  • Cause: Missing Accept: text/event-stream header
  • Fix: Add header to request
Issue: “Response not streaming”
  • Cause: Client buffering responses
  • Fix: Use streaming API (e.g., httpx.stream(), not httpx.post())
Issue: “Incomplete JSON in chunk”
  • Cause: Not accumulating chunks before parsing
  • Fix: Buffer all chunks until event: end

Next Steps

Implementing a Client

Build a client that handles SSE responses

Implementing a Server

Implement SSE responses in your server

HTTP API Reference

See complete endpoint documentation

Key Takeaway: SSE keeps connections alive during tool execution and chunks large results for reliable delivery. The protocol is simple: task_id → chunks (optional) → end/error. Clients reassemble chunks and parse the final JSON result.