From 6c0aefad6fffbc91246283a7dec69e334949ca20 Mon Sep 17 00:00:00 2001 From: 2001y Date: Thu, 25 Jun 2026 15:15:07 +0900 Subject: [PATCH 1/2] feat(proxy): expose pipeline extensions for responses payloads --- headroom/pipeline.py | 8 +- headroom/proxy/handlers/openai.py | 320 ++++++++++++++++-- ...est_openai_responses_context_compaction.py | 86 +++++ tests/test_pipeline.py | 3 + 4 files changed, 380 insertions(+), 37 deletions(-) diff --git a/headroom/pipeline.py b/headroom/pipeline.py index 868943cab..04cd89b7b 100644 --- a/headroom/pipeline.py +++ b/headroom/pipeline.py @@ -48,8 +48,9 @@ class PipelineStage(str, Enum): class PipelineEvent: """Event emitted at a canonical pipeline stage. - Extensions may mutate ``messages``, ``tools``, ``headers``, or ``metadata`` in - place, or return a replacement ``PipelineEvent`` from ``on_pipeline_event``. + Extensions may mutate ``messages``, ``tools``, ``headers``, ``payload``, or + ``metadata`` in place, or return a replacement ``PipelineEvent`` from + ``on_pipeline_event``. """ stage: PipelineStage @@ -60,6 +61,7 @@ class PipelineEvent: messages: list[dict[str, Any]] | None = None tools: list[dict[str, Any]] | None = None headers: dict[str, str] | None = None + payload: dict[str, Any] | None = None response: Any = None metadata: dict[str, Any] = field(default_factory=dict) @@ -140,6 +142,7 @@ def emit( messages: list[dict[str, Any]] | None = None, tools: list[dict[str, Any]] | None = None, headers: dict[str, str] | None = None, + payload: dict[str, Any] | None = None, response: Any = None, metadata: dict[str, Any] | None = None, ) -> PipelineEvent: @@ -154,6 +157,7 @@ def emit( messages=messages, tools=tools, headers=headers, + payload=payload, response=response, metadata=metadata or {}, ) diff --git a/headroom/proxy/handlers/openai.py b/headroom/proxy/handlers/openai.py index 1460b9db2..22c6565b9 100644 --- a/headroom/proxy/handlers/openai.py +++ b/headroom/proxy/handlers/openai.py @@ -796,6 +796,72 @@ def _resolve_openai_upstream(self, request: Request) -> str: custom = request.headers.get("x-headroom-base-url", "").strip() return custom or self.OPENAI_API_URL + def _emit_openai_responses_payload_stage( + self, + stage: PipelineStage, + *, + request_id: str, + model: str, + payload: dict[str, Any], + headers: dict[str, str] | None = None, + transport: str, + stream: bool | None = None, + frame_index: int | None = None, + frame_type: str | None = None, + metadata: dict[str, Any] | None = None, + bypass: bool = False, + ) -> tuple[dict[str, Any], dict[str, str] | None, bool]: + """Emit canonical pipeline events for OpenAI Responses payloads. + + The classic OpenAI chat and Anthropic handlers expose ``messages`` to + pipeline extensions. The Responses API is payload-native instead, so + extensions need the whole mutable Responses payload to inspect or + rewrite ``input`` items such as ``function_call_output`` before the + built-in Responses compressor runs. Explicit bypass is a full passthrough + contract, so payload extensions must not observe or mutate the payload. + """ + + if bypass: + return payload, headers, False + + manager = getattr(self, "pipeline_extensions", None) + if manager is None or not getattr(manager, "enabled", False): + return payload, headers, False + + before_payload = copy.deepcopy(payload) + before_headers = copy.deepcopy(headers) if headers is not None else None + event_metadata: dict[str, Any] = { + "path": "/v1/responses", + "api_style": "responses", + "transport": transport, + } + if stream is not None: + event_metadata["stream"] = stream + if frame_index is not None: + event_metadata["frame_index"] = frame_index + if frame_type is not None: + event_metadata["frame_type"] = frame_type + if metadata: + event_metadata.update(metadata) + + event = manager.emit( + stage, + operation="proxy.request", + request_id=request_id, + provider="openai", + model=model, + headers=headers, + payload=payload, + metadata=event_metadata, + ) + if isinstance(event.payload, dict): + payload = event.payload + if event.headers is not None: + headers = event.headers + + changed = payload != before_payload or headers != before_headers + return payload, headers, changed + @staticmethod def _strict_previous_turn_frozen_count( messages: list[dict[str, Any]], @@ -3030,6 +3096,18 @@ async def handle_openai_responses( request_id, ) + body, _, _ = self._emit_openai_responses_payload_stage( + PipelineStage.INPUT_RECEIVED, + request_id=request_id, + model=str(model or ""), + payload=body, + transport="http", + stream=bool(stream), + bypass=_bypass, + ) + model = body.get("model", model) + stream = body.get("stream", stream) + from headroom.proxy.helpers import capture_codex_wire_debug capture_codex_wire_debug( @@ -3484,6 +3562,37 @@ async def handle_openai_responses( }, ) from _e + body, headers, _ = self._emit_openai_responses_payload_stage( + PipelineStage.INPUT_COMPRESSED, + request_id=request_id, + model=str(model or ""), + payload=body, + headers=headers, + transport="http", + stream=bool(stream), + metadata={ + "modified": bool(tokens_saved), + "tokens_saved": tokens_saved, + "attempted_input_tokens": attempted_input_tokens, + "transforms_applied": transforms_applied, + }, + ) + + body, headers, _ = self._emit_openai_responses_payload_stage( + PipelineStage.PRE_SEND, + request_id=request_id, + model=str(model or ""), + payload=body, + headers=headers, + transport="http", + stream=bool(stream), + metadata={ + "tokens_saved": tokens_saved, + "transforms_applied": transforms_applied, + }, + ) + headers = headers or {} + capture_codex_wire_debug( "http_upstream_request", request_id=request_id, @@ -4308,6 +4417,32 @@ def _log_ws_passthrough( request_id, ) + _first_pipeline_changed = False + if body: + _first_wrapped = isinstance(body.get("response"), dict) + _first_inner = body["response"] if _first_wrapped else body + if isinstance(_first_inner, dict): + _first_model = str(_first_inner.get("model") or body.get("model") or "") + _first_inner, _, _first_pipeline_changed = ( + self._emit_openai_responses_payload_stage( + PipelineStage.INPUT_RECEIVED, + request_id=request_id, + model=_first_model, + payload=_first_inner, + headers=dict(ws_headers), + transport="websocket", + stream=True, + frame_index=1, + frame_type=str(body.get("type") or "response.create"), + ) + ) + if _first_pipeline_changed: + if _first_wrapped: + body["response"] = _first_inner + else: + body = _first_inner + first_msg_raw = json.dumps(body) + capture_codex_wire_debug( "ws_inbound_first_frame", request_id=request_id, @@ -4682,6 +4817,45 @@ def _prepare_ws_performance_metrics() -> tuple[float, float, dict[str, float]]: _first_frame_compression_elapsed_ms, ) _record_ws_compression_overhead(_first_frame_compression_elapsed_ms) + _compressed_pipeline_changed = False + _presend_pipeline_changed = False + if isinstance(_new_inner, dict): + _new_inner, _, _compressed_pipeline_changed = ( + self._emit_openai_responses_payload_stage( + PipelineStage.INPUT_COMPRESSED, + request_id=request_id, + model=str(_model or ""), + payload=_new_inner, + headers=dict(ws_headers), + transport="websocket", + stream=True, + frame_index=1, + frame_type=str(_send_body.get("type") or "response.create"), + metadata={ + "modified": bool(_modified), + "tokens_saved": int(_ws_saved), + "attempted_input_tokens": int(_ws_attempted_tokens), + "transforms_applied": _ws_transforms, + }, + ) + ) + _new_inner, _, _presend_pipeline_changed = ( + self._emit_openai_responses_payload_stage( + PipelineStage.PRE_SEND, + request_id=request_id, + model=str(_model or ""), + payload=_new_inner, + headers=dict(ws_headers), + transport="websocket", + stream=True, + frame_index=1, + frame_type=str(_send_body.get("type") or "response.create"), + metadata={ + "tokens_saved": int(_ws_saved), + "transforms_applied": _ws_transforms, + }, + ) + ) record_frame = getattr( getattr(self, "metrics", None), "record_codex_ws_frame", None ) @@ -4696,7 +4870,12 @@ def _prepare_ws_performance_metrics() -> tuple[float, float, dict[str, float]]: strategy_chain=_codex_ws_strategy_chain(_ws_transforms), final_strategies=_codex_ws_final_strategies(_ws_compression_timing), ) - if _modified: + _pipeline_payload_changed = ( + _first_pipeline_changed + or _compressed_pipeline_changed + or _presend_pipeline_changed + ) + if _modified or _pipeline_payload_changed: if isinstance(_new_inner, dict): _rewrite_started = time.perf_counter() if _wrapped: @@ -4710,23 +4889,29 @@ def _prepare_ws_performance_metrics() -> tuple[float, float, dict[str, float]]: _rewrite_ms, ) _record_ws_compression_overhead(_rewrite_ms) - tokens_saved += int(_ws_saved) - attempted_input_tokens_total += int(_ws_attempted_tokens) - for _t in _ws_transforms: - if _t not in transforms_applied: - transforms_applied.append(_t) - logger.info( - "[%s] WS /v1/responses compressed " - "%d→%d bytes (%d tokens saved, " - "auth_mode=%s, transforms=%s)", - request_id, - _bytes_before, - _bytes_after, - int(_ws_saved), - _ws_auth_mode.value, - transforms_applied, - ) - ws_frames_compressed += 1 + if _modified: + tokens_saved += int(_ws_saved) + attempted_input_tokens_total += int(_ws_attempted_tokens) + for _t in _ws_transforms: + if _t not in transforms_applied: + transforms_applied.append(_t) + logger.info( + "[%s] WS /v1/responses compressed " + "%d→%d bytes (%d tokens saved, " + "auth_mode=%s, transforms=%s)", + request_id, + _bytes_before, + _bytes_after, + int(_ws_saved), + _ws_auth_mode.value, + transforms_applied, + ) + ws_frames_compressed += 1 + else: + logger.info( + "[%s] WS /v1/responses pipeline extension modified first frame", + request_id, + ) else: _log_ws_passthrough( _ws_reason or "no_compression", @@ -4928,9 +5113,23 @@ async def _maybe_compress_response_create_frame( frame_type="response.create", ) return raw_msg, False, "invalid_inner_payload" + model_for_frame = str(inner_payload.get("model") or "") + inner_payload, _, pre_pipeline_changed = ( + self._emit_openai_responses_payload_stage( + PipelineStage.INPUT_RECEIVED, + request_id=request_id, + model=model_for_frame, + payload=inner_payload, + headers=dict(ws_headers), + transport="websocket", + stream=True, + frame_index=frame_index, + frame_type="response.create", + ) + ) frame_compression_elapsed_ms = 0.0 try: - model_for_frame = inner_payload.get("model") or "" + model_for_frame = str(inner_payload.get("model") or model_for_frame) _frame_auth_mode = classify_auth_mode(ws_headers) _preflight_ms = (time.perf_counter() - _preflight_started) * 1000.0 _record_ws_compression_timing( @@ -4969,6 +5168,45 @@ async def _maybe_compress_response_create_frame( frame_compression_elapsed_ms, ) _record_ws_compression_overhead(frame_compression_elapsed_ms) + compressed_pipeline_changed = False + presend_pipeline_changed = False + if isinstance(new_inner, dict): + new_inner, _, compressed_pipeline_changed = ( + self._emit_openai_responses_payload_stage( + PipelineStage.INPUT_COMPRESSED, + request_id=request_id, + model=model_for_frame, + payload=new_inner, + headers=dict(ws_headers), + transport="websocket", + stream=True, + frame_index=frame_index, + frame_type="response.create", + metadata={ + "modified": bool(modified), + "tokens_saved": int(frame_saved), + "attempted_input_tokens": int(frame_attempted_tokens), + "transforms_applied": frame_transforms, + }, + ) + ) + new_inner, _, presend_pipeline_changed = ( + self._emit_openai_responses_payload_stage( + PipelineStage.PRE_SEND, + request_id=request_id, + model=model_for_frame, + payload=new_inner, + headers=dict(ws_headers), + transport="websocket", + stream=True, + frame_index=frame_index, + frame_type="response.create", + metadata={ + "tokens_saved": int(frame_saved), + "transforms_applied": frame_transforms, + }, + ) + ) record_frame = getattr( getattr(self, "metrics", None), "record_codex_ws_frame", @@ -5015,7 +5253,12 @@ async def _maybe_compress_response_create_frame( model=str(inner_payload.get("model") or "unknown"), ) return raw_msg, False, "compression_exception" - if not modified: + pipeline_payload_changed = ( + pre_pipeline_changed + or compressed_pipeline_changed + or presend_pipeline_changed + ) + if not modified and not pipeline_payload_changed: reason = frame_reason or "no_compression" _log_ws_passthrough( reason, @@ -5047,24 +5290,31 @@ async def _maybe_compress_response_create_frame( _rewrite_ms, ) _record_ws_compression_overhead(_rewrite_ms) - tokens_saved += int(frame_saved) - attempted_input_tokens_total += int(frame_attempted_tokens) - for t in frame_transforms: - if t not in transforms_applied: - transforms_applied.append(t) - ws_frames_compressed += 1 + if modified: + tokens_saved += int(frame_saved) + attempted_input_tokens_total += int(frame_attempted_tokens) + for t in frame_transforms: + if t not in transforms_applied: + transforms_applied.append(t) + ws_frames_compressed += 1 + logger.info( + "[%s] WS /v1/responses frame compressed " + "%d→%d bytes (%d tokens saved, " + "auth_mode=%s, frame=%d)", + request_id, + bytes_before, + bytes_after, + int(frame_saved), + _frame_auth_mode.value, + ws_frames_compressed, + ) + return rewritten, True, frame_reason or "compressed" logger.info( - "[%s] WS /v1/responses frame compressed " - "%d→%d bytes (%d tokens saved, " - "auth_mode=%s, frame=%d)", + "[%s] WS /v1/responses pipeline extension modified frame=%d", request_id, - bytes_before, - bytes_after, - int(frame_saved), - _frame_auth_mode.value, - ws_frames_compressed, + frame_index, ) - return rewritten, True, frame_reason or "compressed" + return rewritten, True, "pipeline_modified" async def _client_to_upstream() -> None: nonlocal client_relay_error, ws_response_create_frames diff --git a/tests/test_openai_responses_context_compaction.py b/tests/test_openai_responses_context_compaction.py index 6f5eff3ec..77cc169e4 100644 --- a/tests/test_openai_responses_context_compaction.py +++ b/tests/test_openai_responses_context_compaction.py @@ -3,6 +3,7 @@ from types import SimpleNamespace from typing import Any +from headroom.pipeline import PipelineEvent, PipelineExtensionManager, PipelineStage from headroom.proxy.handlers.openai import ( OpenAIHandlerMixin, _compact_openai_responses_tools, @@ -224,6 +225,91 @@ class _HandlerHarness(OpenAIHandlerMixin): def __init__(self, router: ContentRouter): self.openai_pipeline: Any = _StubPipeline(router) self.openai_provider: Any = _StubProvider() + self.pipeline_extensions: Any = PipelineExtensionManager(discover=False) + + +class _RecordingResponsesExtension: + def __init__(self) -> None: + self.events: list[PipelineEvent] = [] + + def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent: + self.events.append(event) + if event.payload is not None and event.stage == PipelineStage.INPUT_RECEIVED: + event.payload["extension_marker"] = event.stage.value + if event.headers is not None and event.stage == PipelineStage.PRE_SEND: + event.headers["x-test-extension"] = "seen" + return event + + +def test_responses_pipeline_payload_stage_allows_payload_and_header_mutation() -> None: + router = ContentRouter(ContentRouterConfig()) + handler = _HandlerHarness(router) + extension = _RecordingResponsesExtension() + handler.pipeline_extensions = PipelineExtensionManager( + extensions=[extension], + discover=False, + ) + + payload: dict[str, Any] = { + "model": "gpt-5.5", + "input": [{"type": "function_call_output", "output": "build log"}], + } + headers = {"authorization": "Bearer test"} + + updated, updated_headers, changed = handler._emit_openai_responses_payload_stage( + PipelineStage.INPUT_RECEIVED, + request_id="req-responses-ext", + model="gpt-5.5", + payload=payload, + headers=headers, + transport="websocket", + stream=True, + frame_index=2, + frame_type="response.create", + ) + updated, updated_headers, presend_changed = handler._emit_openai_responses_payload_stage( + PipelineStage.PRE_SEND, + request_id="req-responses-ext", + model="gpt-5.5", + payload=updated, + headers=updated_headers, + transport="websocket", + stream=True, + frame_index=2, + frame_type="response.create", + ) + + assert changed is True + assert presend_changed is True + assert updated["extension_marker"] == "input_received" + assert updated_headers == {"authorization": "Bearer test", "x-test-extension": "seen"} + assert [event.stage for event in extension.events] == [ + PipelineStage.INPUT_RECEIVED, + PipelineStage.PRE_SEND, + ] + assert extension.events[0].payload is payload + assert extension.events[0].metadata["api_style"] == "responses" + assert extension.events[0].metadata["transport"] == "websocket" + assert extension.events[0].metadata["frame_index"] == 2 + + +def test_responses_pipeline_payload_stage_is_noop_without_extensions() -> None: + router = ContentRouter(ContentRouterConfig()) + handler = _HandlerHarness(router) + payload: dict[str, Any] = {"model": "gpt-5.5", "input": "hello"} + + updated, headers, changed = handler._emit_openai_responses_payload_stage( + PipelineStage.INPUT_RECEIVED, + request_id="req-noop", + model="gpt-5.5", + payload=payload, + transport="http", + stream=False, + ) + + assert updated is payload + assert headers is None + assert changed is False def test_codex_input_list_payload_reaches_router_without_skip() -> None: diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 1fec1f8b3..6ece458a5 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -86,6 +86,7 @@ def on_pipeline_event(self, event: PipelineEvent): # noqa: ANN001, ANN201 messages=event.messages, tools=event.tools, headers=event.headers, + payload=event.payload, response=event.response, metadata={**event.metadata, "replaced": True}, ) @@ -114,12 +115,14 @@ def on_pipeline_event(self, event: PipelineEvent): # noqa: ANN001, ANN201 provider="openai", model="gpt-4o", messages=[{"role": "user", "content": "hello"}], + payload={"model": "gpt-4o", "input": "hello"}, metadata={"start": True}, ) assert hook.seen == ["input_received"] assert event.metadata == {"start": True, "hook": True, "replaced": True} assert event.request_id == "req-1" + assert event.payload == {"model": "gpt-4o", "input": "hello"} disabled = PipelineExtensionManager(discover=False) assert disabled.enabled is False From d9bb8692566d0fd9ed003e9e95cd47f56e646475 Mon Sep 17 00:00:00 2001 From: 2001y Date: Tue, 30 Jun 2026 15:08:36 +0900 Subject: [PATCH 2/2] fix(proxy): preserve responses bypass before payload hooks --- headroom/proxy/handlers/openai.py | 10 +++ tests/test_proxy_byte_faithful_forwarding.py | 64 +++++++++++++++++++- 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/headroom/proxy/handlers/openai.py b/headroom/proxy/handlers/openai.py index 22c6565b9..bac6b1f1a 100644 --- a/headroom/proxy/handlers/openai.py +++ b/headroom/proxy/handlers/openai.py @@ -3086,6 +3086,13 @@ async def handle_openai_responses( }, ) + _bypass = self._headroom_bypass_enabled(request.headers) + if _bypass: + logger.info( + "[%s] Responses passthrough reason=bypass_header mutation=disabled", + request_id, + ) + model = body.get("model", "unknown") stream = body.get("stream", False) body_mutation_tracker = BodyMutationTracker() @@ -3576,6 +3583,7 @@ async def handle_openai_responses( "attempted_input_tokens": attempted_input_tokens, "transforms_applied": transforms_applied, }, + bypass=_bypass, ) body, headers, _ = self._emit_openai_responses_payload_stage( @@ -3590,6 +3598,7 @@ async def handle_openai_responses( "tokens_saved": tokens_saved, "transforms_applied": transforms_applied, }, + bypass=_bypass, ) headers = headers or {} @@ -4434,6 +4443,7 @@ def _log_ws_passthrough( stream=True, frame_index=1, frame_type=str(body.get("type") or "response.create"), + bypass=_ws_bypass, ) ) if _first_pipeline_changed: diff --git a/tests/test_proxy_byte_faithful_forwarding.py b/tests/test_proxy_byte_faithful_forwarding.py index b720720f8..ed6820c51 100644 --- a/tests/test_proxy_byte_faithful_forwarding.py +++ b/tests/test_proxy_byte_faithful_forwarding.py @@ -29,7 +29,7 @@ import pytest from fastapi.testclient import TestClient -from headroom.pipeline import PipelineStage +from headroom.pipeline import PipelineEvent, PipelineExtensionManager, PipelineStage from headroom.proxy.helpers import ( BodyMutationTracker, append_text_to_latest_user_chat_message, @@ -911,6 +911,68 @@ async def _fake_retry(method, url, headers, body, stream=False, **kwargs): # no assert sent["messages"][1]["content"] == "hi" +def test_openai_responses_bypass_skips_payload_extensions() -> None: + """Bypass is a full passthrough: payload extensions must not rewrite Responses.""" + config = ProxyConfig( + optimize=False, + cache_enabled=False, + rate_limit_enabled=False, + cost_tracking_enabled=False, + log_requests=False, + ccr_inject_tool=False, + ccr_handle_responses=False, + ccr_context_tracking=False, + image_optimize=False, + ) + app = create_app(config) + proxy = app.state.proxy + + class MutatingResponsesExtension: + def __init__(self) -> None: + self.events: list[PipelineEvent] = [] + + def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent: + self.events.append(event) + if event.payload is not None: + event.payload["extension_marker"] = event.stage.value + return event + + extension = MutatingResponsesExtension() + proxy.pipeline_extensions = PipelineExtensionManager(extensions=[extension], discover=False) + + captured: dict[str, object] = {} + + async def _fake_retry(method, url, headers, body, stream=False, **kwargs): # noqa: ANN001 + captured["body"] = body + return httpx.Response( + 200, + json={ + "id": "resp_1", + "object": "response", + "model": "gpt-5.5", + "output": [], + "usage": {"input_tokens": 1, "output_tokens": 1}, + }, + ) + + proxy._retry_request = _fake_retry + client = TestClient(app) + + inbound = {"model": "gpt-5.5", "input": "do not touch me", "stream": False} + resp = client.post( + "/v1/responses", + headers={ + "authorization": "Bearer sk-test", + "x-headroom-bypass": "true", + }, + json=inbound, + ) + + assert resp.status_code == 200, resp.text + assert captured["body"] == inbound + assert extension.events == [] + + # --------------------------------------------------------------------------- # Streaming forwarder byte-faithfulness # ---------------------------------------------------------------------------