Skip to main content
This guide shows how to implement an ORS server, either through the Python SDK or from scratch in any language.

Two Approaches

Option 1: Use ORS Python SDK

  • Pros: Fast, handles protocol details, well-tested
  • Cons: Python only

Option 2: Implement HTTP Protocol

  • Pros: Any language, full control, no dependencies
  • Cons: More work, must handle all protocol details

Option 1: Python SDK Implementation

from ors import Environment, Server, tool, ToolOutput, TextBlock, Split
from pydantic import BaseModel

class MyToolParams(BaseModel):
    param1: str
    param2: int

class MyEnvironment(Environment):
    """Your environment description"""

    @classmethod
    def list_splits(cls):
        """Return available splits"""
        return [Split(name="train", type="train"), Split(name="test", type="test")]

    @classmethod
    def list_tasks(cls, split: str):
        """Return tasks for split"""
        if split == "train":
            return [{"description": "Task 1"}, {"description": "Task 2"}]
        return [{"description": "Test task"}]

    def get_prompt(self):
        """Generate prompt from task"""
        return [TextBlock(text=f"Task: {self.task_spec['description']}")]

    @tool
    def my_tool(self, params: MyToolParams) -> ToolOutput:
        """Tool implementation"""
        return ToolOutput(
            blocks=[TextBlock(text="Result")],
            reward=1.0,
            finished=True
        )

# Run server
if __name__ == "__main__":
    server = Server([MyEnvironment])
    server.run(port=8080)

Key Methods

list_splits(cls) - Required:
  • Class method (no instance needed)
  • Returns list of split names or Split objects
  • Example: return ["train", "validation", "test"]
list_tasks(cls, split) - Required:
  • Class method
  • Returns list of task objects (JSON)
  • Can be async (return Awaitable)
  • Task structure is environment-specific
get_prompt(self) - Required:
  • Instance method (has access to self.task_spec)
  • Returns Blocks (list of TextBlock/ImageBlock)
  • Called once per episode
  • Can be async
setup(self) - Optional:
  • Called when episode starts
  • Initialize environment state
  • Can be async
teardown(self) - Optional:
  • Called when episode ends
  • Cleanup resources
  • Can be async
num_tasks(cls, split) - Optional override:
  • Class method, async
  • Returns count of tasks for a split
  • Default: calls list_tasks() and returns len()
  • Override for efficiency with large task sets
get_task(cls, split, index) - Optional override:
  • Class method, async
  • Returns a single task by index
  • Default: calls list_tasks() and indexes into result
  • Override for lazy loading
get_task_range(cls, split, start, stop) - Optional override:
  • Class method, async
  • Returns tasks for range(start, stop), supports negative indices and None
  • Default: calls get_task() for each index

Tool Decorator

@tool
def submit(self, params: SubmitParams) -> ToolOutput:
    """Tool docstring becomes description"""
    return ToolOutput(...)
Requirements:
  • Decorated with @tool
  • Takes self and optionally one Pydantic model parameter
  • Returns ToolOutput
  • Can be async
Tools can also take zero parameters (no Pydantic model needed). In that case input_schema is null in the tool listing:
@tool
def reset(self) -> ToolOutput:
    """Reset the environment"""
    return ToolOutput(
        blocks=[TextBlock(text="Environment reset")],
        reward=0.0,
        finished=False
    )

Task-Specific Tools

By default, tools are shared and visible to all tasks via GET /{env_name}/tools. You can mark tools as task-specific so they only appear within an active session:
@tool(shared=False)
def task_only_action(self, params: ActionParams) -> ToolOutput:
    """Only visible via /{env_name}/task_tools in an active session"""
    return ToolOutput(...)
You can also override list_task_tools() to return tools dynamically based on the current task:
def list_task_tools(self) -> ListToolsOutput:
    """Override to provide task-specific tools"""
    # Return different tools based on self.task_spec
    return ListToolsOutput(tools=[...])
Clients use GET /{env_name}/task_tools (with an active session) to get the combined set of shared + task-specific tools.

Complete Example

See Quick Start for a working GSM8K environment.

Option 2: Custom HTTP Implementation

Required Endpoints

Implement these HTTP endpoints: Discovery (no session required):
  • GET /health
  • GET /list_environments
  • GET /{env_name}/tools
  • GET /{env_name}/splits
  • POST /{env_name}/tasks
  • POST /{env_name}/num_tasks
  • POST /{env_name}/task
  • POST /{env_name}/task_range
  • POST /create_session
Session Management (requires X-Session-ID):
  • POST /create
  • POST /delete
  • POST /delete_session
  • POST /ping
Episode Interaction (requires X-Session-ID):
  • GET /{env_name}/prompt
  • GET /{env_name}/task_tools
  • POST /{env_name}/call (returns result via SSE)
See HTTP API Reference for complete specs.

Example: Node.js/TypeScript

import express from 'express';
import crypto from 'crypto';

const app = express();
app.use(express.json());

const CHUNK_SIZE = 4096;

// In-memory session store
const sessions = new Map<string, EnvironmentInstance>();

// Health check
app.get('/health', (req, res) => {
  res.json({ status: 'ok' });
});

// List environments
app.get('/list_environments', (req, res) => {
  res.json(['myenvironment']);
});

// List tools
app.get('/:envName/tools', (req, res) => {
  res.json({
    tools: [
      {
        name: 'submit',
        description: 'Submit answer',
        input_schema: {
          type: 'object',
          properties: {
            answer: { type: 'string' }
          },
          required: ['answer']
        }
      }
    ]
  });
});

// Create session (no X-Session-ID required)
app.post('/create_session', (req, res) => {
  const sid = crypto.randomUUID();
  res.json({ sid });
});

// Create episode
app.post('/create', (req, res) => {
  const sid = req.headers['x-session-id'] as string;
  const { env_name, task_spec, split, index, secrets } = req.body;

  // Resolve task_spec from split/index if not provided directly
  let resolvedTaskSpec = task_spec;
  if (!task_spec && split !== undefined && index !== undefined) {
    resolvedTaskSpec = getTaskByIndex(split, index);
  }

  const env = new EnvironmentInstance(resolvedTaskSpec);
  sessions.set(sid, env);

  res.json({ sid });
});

// Call tool (SSE with chunking)
app.post('/:envName/call', async (req, res) => {
  const sid = req.headers['x-session-id'] as string;
  const { name, input } = req.body;

  const env = sessions.get(sid);
  if (!env) {
    return res.status(404).json({ error: 'Session not found' });
  }

  // Set SSE headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  // Send task ID
  const taskId = crypto.randomUUID();
  res.write(`event: task_id\ndata: ${taskId}\n\n`);

  // Execute tool
  try {
    const result = await env.callTool(name, input);
    const resultJson = JSON.stringify({ ok: true, output: result });

    // Chunk large responses (>4KB)
    if (resultJson.length > CHUNK_SIZE) {
      for (let i = 0; i < resultJson.length; i += CHUNK_SIZE) {
        const chunk = resultJson.slice(i, i + CHUNK_SIZE);
        const event = i + CHUNK_SIZE >= resultJson.length ? 'end' : 'chunk';
        res.write(`event: ${event}\ndata: ${chunk}\n\n`);
      }
    } else {
      res.write(`event: end\ndata: ${resultJson}\n\n`);
    }
  } catch (error) {
    res.write(`event: error\ndata: ${error.message}\n\n`);
  }

  res.end();
});

app.listen(8080, () => {
  console.log('ORS server running on port 8080');
});
The SSE chunking protocol is important for interoperability. Results larger than 4KB must be split into chunk events followed by a final end event. Clients concatenate all chunk data with the end data to reconstruct the full JSON. See the SSE specification for details.

Session Management

Key points:
  • Store session ID → environment instance mapping
  • /create_session generates a session ID (no headers required)
  • /create binds a session to an environment + task (requires X-Session-ID)
  • /create accepts either task_spec directly, or split + index to resolve the task server-side
  • Implement 15-minute inactivity timeout (reset on any request with that session ID)
  • Clean up on /delete (call environment teardown)
  • Handle concurrent sessions

Error Handling

Return proper HTTP status codes:
  • 400 - Bad request (invalid input, missing X-Session-ID)
  • 404 - Not found (session, tool, environment)
  • 410 - Gone (session was deleted but still referenced)
  • 500 - Server error
For tool errors, use SSE error event.

Testing Your Server

Quick test:
# Start server
python server.py

# Test in another terminal
curl http://localhost:8080/health
curl http://localhost:8080/list_environments

Deployment

Local Development

python server.py

Production Deployment

Docker:
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "server.py"]
Run:
docker build -t my-ors-server .
docker run -p 8080:8080 my-ors-server

Cloud Deployment

Deploy to any cloud platform:
  • AWS: ECS, Lambda, EC2
  • GCP: Cloud Run, Compute Engine
  • Azure: Container Instances, App Service
  • Fly.io: fly launch
  • Railway: Connect GitHub repo

Next Steps

HTTP API

Complete endpoint documentation

Quick Start

See a complete working example

Key Takeaway: Implementing an ORS server is straightforward. Use the Python SDK for quick development, or implement the HTTP protocol in any language for full control. Focus on proper reward design and episode termination.