Implementing an ORS Client
Learn how to build clients that interact with ORS servers for running episodes, training agents, or evaluation.Two Approaches
Option 1: Use OpenReward Python SDK
Simplest approach - handles all protocol details:Copy
from openreward import OpenReward
client = OpenReward()
env = client.environments.get(
name="myenv",
base_url="http://localhost:8080"
)
# Run episode
with env.session(task=task) as session:
prompt = session.get_prompt()
result = session.call_tool("submit", {"answer": "42"})
Option 2: Custom HTTP Client
For other languages - implement HTTP protocol:Python HTTP Client Example
Copy
import httpx
import json
class ORSClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.Client(timeout=60.0)
def list_tools(self, env_name: str):
"""Get available tools"""
response = self.client.get(f"{self.base_url}/{env_name}/tools")
return response.json()["tools"]
def list_tasks(self, env_name: str, split: str):
"""Get tasks for split"""
response = self.client.post(
f"{self.base_url}/{env_name}/tasks",
json={"split": split}
)
return response.json()["tasks"]
def create_session(self):
"""Generate session ID"""
response = self.client.post(f"{self.base_url}/create_session")
return response.json()["sid"]
def create_episode(self, session_id: str, env_name: str, task_spec: dict):
"""Create episode instance"""
response = self.client.post(
f"{self.base_url}/create",
headers={"X-Session-ID": session_id},
json={
"env_name": env_name,
"task_spec": task_spec,
"secrets": {}
}
)
return response.json()
def get_prompt(self, session_id: str, env_name: str):
"""Get initial prompt"""
response = self.client.get(
f"{self.base_url}/{env_name}/prompt",
headers={"X-Session-ID": session_id}
)
return response.json()
def call_tool(self, session_id: str, env_name: str, tool_name: str, tool_input: dict):
"""Call tool (handles SSE)"""
with self.client.stream(
"POST",
f"{self.base_url}/{env_name}/call",
headers={
"X-Session-ID": session_id,
"Accept": "text/event-stream"
},
json={"name": tool_name, "input": tool_input}
) as response:
buffer = ""
for line in response.iter_lines():
if line.startswith("event: "):
event_type = line[7:]
elif line.startswith("data: "):
data = line[6:]
if event_type == "end":
complete_data = buffer + data
result = json.loads(complete_data)
if result["ok"]:
return result["output"]
else:
raise Exception(result["error"])
elif event_type == "chunk":
buffer += data
elif event_type == "error":
raise Exception(data)
def delete_episode(self, session_id: str):
"""Clean up episode"""
response = self.client.post(
f"{self.base_url}/delete",
headers={"X-Session-ID": session_id}
)
return response.json()
# Usage
client = ORSClient("http://localhost:8080")
# List available tools
tools = client.list_tools("math")
print(f"Available tools: {[t['name'] for t in tools]}")
# Get tasks
tasks = client.list_tasks("math", "train")
# Run episode
task = tasks[0]
session_id = client.create_session()
try:
client.create_episode(session_id, "math", task)
prompt = client.get_prompt(session_id, "math")
print(f"Prompt: {prompt[0]['text']}")
result = client.call_tool(session_id, "math", "submit", {"answer": "4"})
print(f"Reward: {result['reward']}")
print(f"Finished: {result['finished']}")
finally:
client.delete_episode(session_id)
JavaScript/TypeScript Client
Copy
class ORSClient {
constructor(private baseUrl: string) {}
async listTools(envName: string) {
const response = await fetch(`${this.baseUrl}/${envName}/tools`);
const data = await response.json();
return data.tools;
}
async createSession(): Promise<string> {
const response = await fetch(`${this.baseUrl}/create_session`, {
method: 'POST'
});
const data = await response.json();
return data.sid;
}
async callTool(
sessionId: string,
envName: string,
toolName: string,
toolInput: any
) {
const response = await fetch(`${this.baseUrl}/${envName}/call`, {
method: 'POST',
headers: {
'X-Session-ID': sessionId,
'Accept': 'text/event-stream',
'Content-Type': 'application/json'
},
body: JSON.stringify({ name: toolName, input: toolInput })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let eventType = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\\n');
for (const line of lines) {
if (line.startsWith('event: ')) {
eventType = line.slice(7);
} else if (line.startsWith('data: ')) {
const data = line.slice(6);
if (eventType === 'end') {
const result = JSON.parse(buffer + data);
if (result.ok) {
return result.output;
} else {
throw new Error(result.error);
}
} else if (eventType === 'chunk') {
buffer += data;
} else if (eventType === 'error') {
throw new Error(data);
}
}
}
}
}
}
// Usage
const client = new ORSClient('http://localhost:8080');
const sessionId = await client.createSession();
// ... create episode, call tools
Episode Runner
Build a reusable episode runner:Copy
class EpisodeRunner:
def __init__(self, client: ORSClient, env_name: str):
self.client = client
self.env_name = env_name
def run_episode(self, task: dict) -> dict:
"""Run one complete episode"""
session_id = self.client.create_session()
try:
# Create episode
self.client.create_episode(session_id, self.env_name, task)
# Get initial prompt
prompt = self.client.get_prompt(session_id, self.env_name)
# Agent loop
total_reward = 0.0
steps = 0
finished = False
while not finished and steps < 100: # Safety limit
# Agent selects action (placeholder)
action = self.select_action(prompt)
# Execute action
result = self.client.call_tool(
session_id,
self.env_name,
action["tool"],
action["input"]
)
# Update state
prompt = result["blocks"]
total_reward += result.get("reward", 0.0)
finished = result["finished"]
steps += 1
return {
"success": finished and total_reward > 0,
"total_reward": total_reward,
"steps": steps,
"task": task
}
finally:
self.client.delete_episode(session_id)
def select_action(self, observation):
"""Override this with your agent logic"""
raise NotImplementedError
Batch Evaluation
Run multiple episodes efficiently:Copy
def evaluate(client: ORSClient, env_name: str, tasks: list) -> dict:
"""Evaluate agent on multiple tasks"""
results = []
for task in tasks:
result = run_episode(client, env_name, task)
results.append(result)
print(f"Task {task['id']}: "
f"{'PASS' if result['success'] else 'FAIL'} "
f"(reward={result['total_reward']:.2f}, "
f"steps={result['steps']})")
# Compute metrics
success_rate = sum(r["success"] for r in results) / len(results)
avg_reward = sum(r["total_reward"] for r in results) / len(results)
avg_steps = sum(r["steps"] for r in results) / len(results)
return {
"success_rate": success_rate,
"avg_reward": avg_reward,
"avg_steps": avg_steps,
"results": results
}
# Usage
client = ORSClient("http://localhost:8080")
tasks = client.list_tasks("math", "test")
metrics = evaluate(client, "math", tasks)
print(f"Success rate: {metrics['success_rate']:.1%}")
print(f"Average reward: {metrics['avg_reward']:.3f}")
Error Handling
Copy
def safe_episode_run(client, env_name, task):
session_id = None
try:
session_id = client.create_session()
client.create_episode(session_id, env_name, task)
# ... episode logic
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
print("Session or environment not found")
elif e.response.status_code == 400:
print("Bad request - check inputs")
else:
print(f"HTTP error: {e}")
except httpx.TimeoutException:
print("Request timed out")
except Exception as e:
print(f"Error: {e}")
finally:
if session_id:
try:
client.delete_episode(session_id)
except:
pass # Cleanup already done or session expired
Parallel Execution
Run multiple episodes concurrently:Copy
import asyncio
async def run_episode_async(client, env_name, task):
"""Async episode runner"""
# Similar to sync version but with async httpx
pass
async def parallel_evaluate(client, env_name, tasks, max_concurrent=5):
"""Run episodes in parallel"""
semaphore = asyncio.Semaphore(max_concurrent)
async def run_with_limit(task):
async with semaphore:
return await run_episode_async(client, env_name, task)
results = await asyncio.gather(*[
run_with_limit(task) for task in tasks
])
return results
# Usage
results = asyncio.run(parallel_evaluate(client, "math", tasks))
Next Steps
HTTP API
Complete endpoint documentation
Testing Locally
Test your client implementation
Quick Start
See complete client example
Key Takeaway: Building an ORS client is straightforward HTTP programming. Handle SSE for tool calls, manage session IDs, and implement proper error handling. Use the Python SDK for quick development or implement custom clients in any language.

