Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion docs/realtime/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ Bare `RealtimeAgent` handoffs are auto-wrapped, and `realtime_handoff(...)` lets

### Guardrails

Realtime agents support output guardrails on agent responses and input guardrails on function-tool calls. Output guardrails run on debounced transcript accumulation rather than on every partial token, and they emit `guardrail_tripped` instead of raising an exception.
Realtime agents support output guardrails on agent responses and input guardrails on the user's
transcribed audio. (Function-tool calls have their own, separate tool input guardrails, which are a
distinct feature from the transcript input guardrails described here.) Output guardrails run on
debounced transcript accumulation rather than on every partial token, and they emit
`guardrail_tripped` instead of raising an exception.

```python
from agents.guardrail import GuardrailFunctionOutput, OutputGuardrail
Expand All @@ -270,6 +274,36 @@ triggered guardrail so the model can produce a replacement response. Your audio
listen for `audio_interrupted` and stop local playback immediately, because guardrails run on
debounced transcript text and some audio may already be buffered when the tripwire fires.

Realtime agents also support **input guardrails** that run on the user's transcribed audio. Configure
them via `RealtimeAgent.input_guardrails` or `RealtimeRunConfig["input_guardrails"]`; the two lists
are combined and de-duplicated per turn. They run once on the completed user transcript (the
`input_audio_transcription_completed` event), and when one trips the session emits an
`input_guardrail_tripped` event, forces `response.cancel`, and sends a follow-up user message that
names the triggered guardrail.

```python
from agents.guardrail import GuardrailFunctionOutput, InputGuardrail


def no_jailbreak(context, agent, user_input):
return GuardrailFunctionOutput(
tripwire_triggered="jailbreak" in user_input.lower(),
output_info=None,
)


agent = RealtimeAgent(
name="Assistant",
instructions="...",
input_guardrails=[InputGuardrail(guardrail_function=no_jailbreak)],
)
```

Two limitations are worth noting. Input guardrails only run on transcribed audio, so text sent
through `session.send_message()` is not checked. And because guardrails run in a background task,
the forced cancel reliably interrupts a response that is already in flight, but a response created
in the narrow window after the guardrail resolves may not be cancelled.

## SIP and telephony

The Python SDK includes a first-class SIP attach flow via [`OpenAIRealtimeSIPModel`][agents.realtime.openai_realtime.OpenAIRealtimeSIPModel].
Expand Down
1 change: 1 addition & 0 deletions docs/ref/realtime/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

### Guardrail Events
::: agents.realtime.events.RealtimeGuardrailTripped
::: agents.realtime.events.RealtimeInputGuardrailTripped

### History Events
::: agents.realtime.events.RealtimeHistoryAdded
Expand Down
4 changes: 4 additions & 0 deletions examples/realtime/app/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
base_event["guardrail_results"] = [
{"name": result.guardrail.name} for result in event.guardrail_results
]
elif event.type == "input_guardrail_tripped":
base_event["guardrail_results"] = [
{"name": result.guardrail.name} for result in event.guardrail_results
]
elif event.type == "raw_model_event":
base_event["raw_model_event"] = {
"type": event.data.type,
Expand Down
2 changes: 2 additions & 0 deletions src/agents/realtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
RealtimeHandoffEvent,
RealtimeHistoryAdded,
RealtimeHistoryUpdated,
RealtimeInputGuardrailTripped,
RealtimeRawModelEvent,
RealtimeSessionEvent,
RealtimeToolApprovalRequired,
Expand Down Expand Up @@ -132,6 +133,7 @@
"RealtimeHandoffEvent",
"RealtimeHistoryAdded",
"RealtimeHistoryUpdated",
"RealtimeInputGuardrailTripped",
"RealtimeRawModelEvent",
"RealtimeSessionEvent",
"RealtimeToolApprovalRequired",
Expand Down
9 changes: 8 additions & 1 deletion src/agents/realtime/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from agents.prompts import Prompt

from ..agent import AgentBase
from ..guardrail import OutputGuardrail
from ..guardrail import InputGuardrail, OutputGuardrail
from ..handoffs import Handoff
from ..lifecycle import AgentHooksBase, RunHooksBase
from ..logger import logger
Expand Down Expand Up @@ -79,6 +79,13 @@ class RealtimeAgent(AgentBase, Generic[TContext]):
"""A class that receives callbacks on various lifecycle events for this agent.
"""

input_guardrails: list[InputGuardrail[TContext]] = field(default_factory=list)
"""A list of checks that run on the user's transcribed audio input. They run once on the
completed user transcript and, when tripped, force a cancel of the in-progress response. This
reliably interrupts a response that is already in flight, but a response created after the
guardrail resolves may not be interrupted. Text input sent via `send_message` is not checked.
"""

def __post_init__(self) -> None:
if not isinstance(self.name, str):
raise TypeError(f"RealtimeAgent name must be a string, got {type(self.name).__name__}")
Expand Down
5 changes: 4 additions & 1 deletion src/agents/realtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from agents.prompts import Prompt

from ..guardrail import OutputGuardrail
from ..guardrail import InputGuardrail, OutputGuardrail
from ..handoffs import Handoff
from ..model_settings import ToolChoice
from ..run_config import ToolErrorFormatter
Expand Down Expand Up @@ -279,6 +279,9 @@ class RealtimeRunConfig(TypedDict):
tool_error_formatter: NotRequired[ToolErrorFormatter]
"""Optional callback that formats tool error messages returned to the model."""

input_guardrails: NotRequired[list[InputGuardrail[Any]]]
"""List of input guardrails to run on the user's transcribed audio input."""

# TODO (rm) Add history audio storage config


Expand Down
26 changes: 25 additions & 1 deletion src/agents/realtime/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass
from typing import Any, Literal, TypeAlias

from ..guardrail import OutputGuardrailResult
from ..guardrail import InputGuardrailResult, OutputGuardrailResult
from ..run_context import RunContextWrapper
from ..tool import Tool
from .agent import RealtimeAgent
Expand Down Expand Up @@ -243,6 +243,29 @@ class RealtimeGuardrailTripped:
type: Literal["guardrail_tripped"] = "guardrail_tripped"


@dataclass
class RealtimeInputGuardrailTripped:
"""An input guardrail has been tripped on the user's transcribed input.

When a guardrail trips, the session forces a cancel of the in-progress response. This
reliably interrupts a response that is already in flight. Because guardrails run in a
background task, a response that is created in the narrow window after the guardrail
resolves but before the cancel can take effect may not be interrupted.
"""

guardrail_results: list[InputGuardrailResult]
"""The tripped input guardrail result(s). The session cancels on the first tripwire, so this
normally contains a single result."""

message: str
"""The user transcript that triggered the guardrail."""

info: RealtimeEventInfo
"""Common info for all events, such as the context."""

type: Literal["input_guardrail_tripped"] = "input_guardrail_tripped"


@dataclass
class RealtimeInputAudioTimeoutTriggered:
"""Called when the model detects a period of inactivity/silence from the user."""
Expand All @@ -268,6 +291,7 @@ class RealtimeInputAudioTimeoutTriggered:
| RealtimeHistoryUpdated
| RealtimeHistoryAdded
| RealtimeGuardrailTripped
| RealtimeInputGuardrailTripped
| RealtimeInputAudioTimeoutTriggered
)
"""An event emitted by the realtime session."""
123 changes: 123 additions & 0 deletions src/agents/realtime/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from ..agent import Agent
from ..exceptions import ToolInputGuardrailTripwireTriggered, UserError
from ..guardrail import InputGuardrail, InputGuardrailResult
from ..handoffs import Handoff
from ..items import ToolApprovalItem
from ..logger import logger
Expand All @@ -43,6 +44,7 @@
RealtimeHistoryAdded,
RealtimeHistoryUpdated,
RealtimeInputAudioTimeoutTriggered,
RealtimeInputGuardrailTripped,
RealtimeRawModelEvent,
RealtimeSessionEvent,
RealtimeToolApprovalRequired,
Expand Down Expand Up @@ -202,6 +204,8 @@ def __init__(

# Guardrails state tracking
self._interrupted_response_ids: set[str] = set()
# User item_ids for which an input guardrail has already interrupted the response.
self._interrupted_input_item_ids: set[str] = set()
self._item_transcripts: dict[str, str] = {} # item_id -> accumulated transcript
self._item_guardrail_run_counts: dict[str, int] = {} # item_id -> run count
self._debounce_text_length = self._run_config.get("guardrails_settings", {}).get(
Expand Down Expand Up @@ -365,6 +369,10 @@ async def on_event(self, event: RealtimeModelEvent) -> None:
await self._put_event(
RealtimeHistoryUpdated(info=self._event_info, history=self._history)
)
# Run input guardrails on the finalized user transcript. The transcription completes
# around the time the server begins generating a response, so we mirror the
# output-guardrail trip behavior and force a response cancel when a guardrail trips.
self._enqueue_input_guardrail_task(event.transcript, event.item_id)
elif event.type == "input_audio_timeout_triggered":
await self._put_event(
RealtimeInputAudioTimeoutTriggered(
Expand Down Expand Up @@ -1263,6 +1271,94 @@ async def _run_output_guardrails(self, text: str, response_id: str) -> bool:

return False

async def _run_input_guardrails(
self,
text: str,
item_id: str,
agent: RealtimeAgent,
input_guardrails: list[InputGuardrail[Any]],
) -> bool:
"""Run input guardrails on the user's transcribed input. Returns True if any guardrail was
triggered.

``agent`` and ``input_guardrails`` are snapshotted when the transcription event is handled
so that a concurrent ``update_agent()`` or handoff cannot swap in a different agent's
guardrails before this background task runs.
"""
# If we've already interrupted the response for this user item, skip.
if not input_guardrails or item_id in self._interrupted_input_item_ids:
return False

async def _run_one(guardrail: InputGuardrail[Any]) -> InputGuardrailResult | None:
try:
return await guardrail.run(
# TODO (rm) Remove this cast, it's wrong
cast(Agent[Any], agent),
text,
self._context_wrapper,
)
except Exception as exc:
logger.warning(
"Input guardrail %r raised %s: %s; skipping it.",
guardrail.get_name(),
type(exc).__name__,
exc,
)
logger.debug("Input guardrail failure details.", exc_info=True)
return None

# Run the guardrails concurrently and act on the first tripwire as soon as it is available,
# cancelling the rest. This mirrors the streamed input-guardrail path: a slow guardrail
# cannot delay the forced cancel behind unrelated guardrails, so the unsafe turn is
# interrupted as early as possible instead of waiting for every guardrail to finish.
guardrail_tasks = [
asyncio.create_task(_run_one(guardrail)) for guardrail in input_guardrails
]
triggered_results: list[InputGuardrailResult] = []
try:
for completed in asyncio.as_completed(guardrail_tasks):
result = await completed
if result is not None and result.output.tripwire_triggered:
triggered_results.append(result)
break
finally:
for task in guardrail_tasks:
if not task.done():
task.cancel()
await asyncio.gather(*guardrail_tasks, return_exceptions=True)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Interrupt before awaiting cancelled guardrails

Fresh evidence: this version now uses as_completed, but this await asyncio.gather(...) still runs before the forced cancel at line 1348. When one input guardrail trips quickly and another model-backed guardrail is slow to acknowledge cancellation or does cleanup, the session waits here before sending response.cancel, so the unsafe realtime response can continue generating for that latency; request the interrupt and mark the item interrupted before awaiting sibling task cleanup.

Useful? React with 👍 / 👎.


if triggered_results:
# Double-check: bail if already interrupted for this user item.
if item_id in self._interrupted_input_item_ids:
return False

# Mark as interrupted immediately (before any awaits) to minimize the race window.
self._interrupted_input_item_ids.add(item_id)

# Emit input guardrail tripped event.
await self._put_event(
RealtimeInputGuardrailTripped(
guardrail_results=triggered_results,
message=text,
info=self._event_info,
)
)

# Interrupt the model, forcing a cancel of any in-progress response.
await self._model.send_event(RealtimeModelSendInterrupt(force_response_cancel=True))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid cancelling later turns for stale input guardrails

When a model-backed input guardrail finishes after the guarded audio turn has already ended and a later user turn is active, this sends an untargeted forced response.cancel; _send_interrupt cancels whatever response is currently ongoing, and these input guardrail tasks are not cancelled on turn_ended. In that latency scenario a trip for item A can interrupt item B's unrelated response and enqueue the guardrail notification into the wrong turn, so the trip should be ignored/cancelled once the guarded turn is over or correlated to the response for this item_id before cancelling.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid cancelling a later response from stale guardrails

When an input guardrail is slower than the response it is checking (for example, the guarded turn reaches turn_ended and the user starts another turn before the guardrail finishes), this unscoped forced interrupt cancels whatever response is active at completion time; the model interrupt path is not tied to the guarded item_id. A trip for an earlier transcript can therefore interrupt an unrelated later answer and enqueue the guardrail follow-up into the wrong turn, so stale input-guardrail tasks should be ignored/cancelled after their turn ends or correlated to the response they are meant to cancel.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point but this is how output guardrail happens too. For the sake of parity, I will acknowledge this as a drawback but not change this, if this is desired, it should be raised in a separate PR


# Send guardrail triggered message.
guardrail_names = [result.guardrail.get_name() for result in triggered_results]
await self._model.send_event(
RealtimeModelSendUserInput(
user_input=f"input guardrail triggered: {', '.join(guardrail_names)}"
)
)

return True

return False

def _enqueue_guardrail_task(self, text: str, response_id: str) -> None:
# Runs the guardrails in a separate task to avoid blocking the main loop

Expand All @@ -1272,6 +1368,33 @@ def _enqueue_guardrail_task(self, text: str, response_id: str) -> None:
# Add callback to remove completed tasks and handle exceptions
task.add_done_callback(self._on_guardrail_task_done)

def _enqueue_input_guardrail_task(self, text: str, item_id: str) -> None:
# Snapshot the active agent and its guardrails now; a later update_agent()/handoff must not
# change which guardrails run against this transcript.
agent = self._current_agent
combined_guardrails = agent.input_guardrails + self._run_config.get("input_guardrails", [])

seen_ids: set[int] = set()
input_guardrails: list[InputGuardrail[Any]] = []
for guardrail in combined_guardrails:
guardrail_id = id(guardrail)
if guardrail_id not in seen_ids:
input_guardrails.append(guardrail)
seen_ids.add(guardrail_id)

# Skip creating a no-op task when no input guardrails are configured.
if not input_guardrails:
return

# Runs the input guardrails in a separate task to avoid blocking the main loop.
task = asyncio.create_task(
self._run_input_guardrails(text, item_id, agent, input_guardrails)
)
# Reuse the shared guardrail task set + done callback so completed tasks are removed,
# exceptions surface as events, and close() cancels any still-running task.
self._guardrail_tasks.add(task)
task.add_done_callback(self._on_guardrail_task_done)

def _on_guardrail_task_done(self, task: asyncio.Task[Any]) -> None:
"""Handle completion of a guardrail task."""
# Remove from tracking set
Expand Down
Loading