Skip to content
40 changes: 38 additions & 2 deletions headroom/proxy/handlers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,10 @@ def _add_timing(name: str, started_at: float) -> None:

extraction_started = time.perf_counter()
candidates: list[tuple[int, tuple[str, int | None], str]] = []
# Excluded-tool outputs that are losslessly foldable (grep/log/json):
# (item_index, slot_ref, folded_text, original_text). Spliced after the
# normal candidate compression — no ML, byte/data-lossless only.
lossless_excluded: list[tuple[int, tuple[str, int | None], str, str]] = []
extraction_debug: list[dict[str, Any]] = []
for idx, item in enumerate(items):
if not isinstance(item, dict):
Expand Down Expand Up @@ -1004,12 +1008,27 @@ def _add_timing(name: str, started_at: float) -> None:
)
continue
if isinstance(call_id, str) and call_id in excluded_call_ids:
# Protected from lossy compression — but grep/log/json output
# can still be losslessly compacted. Reuse the router helper
# so the Responses path matches the chat/Anthropic behavior.
excl_out = item.get("output")
fold = (
router._lossless_compact_excluded(excl_out)
if isinstance(excl_out, str)
else None
)
if fold is not None:
lossless_excluded.append((idx, ("output", None), fold[0], excl_out))
if debug_enabled:
extraction_debug.append(
{
"index": idx,
"eligible": False,
"reason": "exclude_tools_protected",
"reason": (
"exclude_tools_lossless_fold"
if fold is not None
else "exclude_tools_protected"
),
"item_type": item_type,
"call_id": call_id,
"tool_name": function_name_by_call_id.get(call_id),
Expand Down Expand Up @@ -1068,7 +1087,7 @@ def _add_timing(name: str, started_at: float) -> None:
payload=payload,
extraction=extraction_debug,
)
if not candidates:
if not candidates and not lossless_excluded:
_log(
"codex_compression_payload_result",
modified=False,
Expand Down Expand Up @@ -1345,6 +1364,23 @@ def _record_routed_result(
transforms.append(transform)
_add_timing("compression_unit_apply_results", apply_started)

# Splice byte/data-lossless folds of excluded tool outputs (grep/log/
# json). These skip the ML compressor entirely — the fold is already
# information-preserving — so "excluded = no lossy" still holds.
for e_idx, e_slot, e_folded, e_orig in lossless_excluded:
e_target = updated_items[e_idx] if e_idx < len(updated_items) else None
if not isinstance(e_target, dict):
continue
_set_slot_text(e_target, e_slot, e_folded)
modified = True
e_before = tokenizer.count_text(e_orig)
e_saved = e_before - tokenizer.count_text(e_folded)
if e_saved > 0:
tokens_saved_total += e_saved
attempted_input_tokens += e_before
if "router:excluded:lossless" not in transforms:
transforms.append("router:excluded:lossless")

_log(
"codex_compression_payload_result",
modified=modified,
Expand Down
246 changes: 246 additions & 0 deletions headroom/transforms/content_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,92 @@ def _tool_call_args_text(raw: Any) -> str:
return " ".join(text.split())[:300]


def _tool_call_command_text(raw: Any) -> str:
"""Extract the raw shell command from a tool call's args, if present.

Anthropic ``input`` is a dict ({"command": "grep …"}); OpenAI ``arguments``
is a JSON string; Codex's shell uses a ``command`` list. Returns "" when
there is no command field (non-shell tools).
"""
if isinstance(raw, str):
try:
raw = json.loads(raw)
except (ValueError, TypeError):
return ""
if not isinstance(raw, dict):
return ""
cmd = raw.get("command", raw.get("cmd", ""))
if isinstance(cmd, list):
cmd = " ".join(str(c) for c in cmd)
return cmd if isinstance(cmd, str) else ""


# Shell wrappers that prefix the real program — peeled to find it. Shell
# grammar, not tunable policy: rtk (the user's token proxy), sudo/env/timeout/…
_SHELL_WRAPPERS = frozenset(
{
"rtk",
"sudo",
"env",
"time",
"nice",
"ionice",
"nohup",
"stdbuf",
"command",
"timeout",
"xargs",
}
)


def _bash_program(command: str) -> tuple[str, list[str]]:
"""Return ``(program_basename_lower, trailing_tokens)`` for a shell command.

Peels leading wrappers (``rtk grep`` -> ``grep``, ``timeout 30 rg`` -> ``rg``)
and env assignments (``FOO=1 grep`` -> ``grep``). Empty program when it can't
be determined. Whitespace-split is deliberately simple — the reversibility
guard downstream makes a parse miss harmless.
"""
toks = command.strip().split()
i = 0
while i < len(toks):
tok = toks[i]
if "=" in tok and not tok.startswith("-"): # VAR=val env assignment
i += 1
continue
base = tok.rsplit("/", 1)[-1].lower() # /usr/bin/grep -> grep
if base in _SHELL_WRAPPERS:
i += 1
# Skip this wrapper's own option/numeric args (timeout 30, nice -n 5).
while i < len(toks) and (
toks[i].startswith("-") or toks[i].replace(".", "", 1).isdigit()
):
i += 1
continue
return base, toks[i + 1 :]
return "", []


def _bash_command_is_search(command: str, search_commands: frozenset[str]) -> bool:
"""True when ``command`` is a read-only search whose output folds byte-
losslessly (grep/rg/git grep/…). Peels wrappers and recurses into ``sh -c``.
"""
prog, rest = _bash_program(command)
if not prog:
return False
if prog in {"sh", "bash", "zsh", "dash"} and rest:
# `bash -lc "grep …"` (Codex): the real command is the -c argument.
for j, tok in enumerate(rest):
if tok in {"-c", "-lc", "-lic", "-ic"} and j + 1 < len(rest):
inner = " ".join(rest[j + 1 :]).strip("'\"")
return _bash_command_is_search(inner, search_commands)
return False
if prog == "git" and rest and rest[0].lower() == "grep":
return True
return prog in search_commands


def _log_router_debug(event: str, **payload: Any) -> None:
if not logger.isEnabledFor(logging.DEBUG):
return
Expand Down Expand Up @@ -889,6 +975,22 @@ class ContentRouterConfig:
# Set to None to use DEFAULT_EXCLUDE_TOOLS, or provide custom set
exclude_tools: set[str] | None = None

# Excluded tools are protected only from *lossy* compression. Their output
# is still given information-preserving compaction by detected shape (grep
# -> ripgrep --heading fold; logs -> ANSI strip + run-collapse; JSON ->
# whitespace-minify, data-lossless), in every path — see
# ``_lossless_compact_excluded``. Always recoverable, so no config gate.

# Shell tool names (case-insensitive). Their output is non-excluded/lossy,
# BUT a read-only *search* run through them (grep/rg/git grep) yields byte-
# losslessly foldable output — folded instead of lossy-compressed. See
# ``_bash_search_fold``. Config so new harness tool names / search programs
# can be added without code changes.
bash_tool_names: frozenset[str] = frozenset({"bash", "shell", "local_shell"})
bash_search_commands: frozenset[str] = frozenset(
{"grep", "egrep", "fgrep", "rg", "ripgrep", "ag", "ack"}
)

# Read lifecycle management (stale/superseded detection)
read_lifecycle: ReadLifecycleConfig = field(default_factory=ReadLifecycleConfig)

Expand Down Expand Up @@ -1192,6 +1294,8 @@ def __init__(
self._relevance_prewarm_started: bool = False
# tool_call_id → compact args text, populated by _build_tool_name_map.
self._tool_call_args: dict[str, str] = {}
# tool_call_id → raw shell command (bash-search fold), same population.
self._tool_call_commands: dict[str, str] = {}

# Phase 0 (#1171): cap the input size handed to kompress (ModernBERT
# ONNX). Its inference scales O(tokens) and runs synchronously on the
Expand Down Expand Up @@ -2611,6 +2715,7 @@ def _build_tool_name_map(self, messages: list[dict[str, Any]]) -> dict[str, str]
"""
mapping: dict[str, str] = {}
args_map: dict[str, str] = {}
commands_map: dict[str, str] = {}

for msg in messages:
if msg.get("role") != "assistant":
Expand All @@ -2627,6 +2732,9 @@ def _build_tool_name_map(self, messages: list[dict[str, Any]]) -> dict[str, str]
args = _tool_call_args_text(fn.get("arguments"))
if args:
args_map[tc_id] = args
command = _tool_call_command_text(fn.get("arguments"))
if command:
commands_map[tc_id] = command

# Anthropic format: content blocks with type=tool_use
content = msg.get("content", [])
Expand All @@ -2640,8 +2748,12 @@ def _build_tool_name_map(self, messages: list[dict[str, Any]]) -> dict[str, str]
args = _tool_call_args_text(block.get("input"))
if args:
args_map[tc_id] = args
command = _tool_call_command_text(block.get("input"))
if command:
commands_map[tc_id] = command

self._tool_call_args = args_map
self._tool_call_commands = commands_map
return mapping

def _net_cost_allows(
Expand Down Expand Up @@ -3080,6 +3192,17 @@ def apply(
tool_call_id = message.get("tool_call_id", "")
if tool_call_id in excluded_tool_ids:
if messages_from_end <= read_protection_window:
# Protected from lossy compression — but grep/log/json
# output can still be losslessly compacted.
compacted = self._lossless_compact_excluded(content)
if compacted is not None:
folded, kind = compacted
result_slots[i] = {**message, "content": folded}
transforms_applied.append(f"router:excluded:lossless_{kind}")
route_counts["excluded_tool_lossless"] = (
route_counts.get("excluded_tool_lossless", 0) + 1
)
continue
# Recent — protect as before
result_slots[i] = message
transforms_applied.append("router:excluded:tool")
Expand All @@ -3092,6 +3215,18 @@ def apply(
tool_name = tool_name_map.get(tool_call_id, "")
bias = self._get_tool_bias(tool_name) if tool_name else 1.0

# Bash-search lossless pre-empt: a read-only search (grep/rg/git
# grep) run via a shell tool yields byte-losslessly foldable
# output. Fold it instead of the lossy strategy path.
bash_folded = self._bash_search_fold(tool_name, tool_call_id, content)
if bash_folded is not None:
result_slots[i] = {**message, "content": bash_folded}
transforms_applied.append("router:bash:lossless_search")
route_counts["bash_lossless_search"] = (
route_counts.get("bash_lossless_search", 0) + 1
)
continue

# Protection 1: Never compress user messages (unless overridden)
if skip_user and role == "user":
result_slots[i] = message
Expand Down Expand Up @@ -3406,6 +3541,90 @@ def apply(
timing=compressor_timing,
)

def _lossless_compact_excluded(self, content: Any) -> tuple[str, str] | None:
"""Information-preserving compaction for a protected (excluded) tool output.

Excluded tools are kept out of *lossy* compression for accuracy. This
applies only reversible/data-preserving transforms, dispatched by shape:

* SEARCH (grep ``path:line:content``) -> ripgrep --heading fold.
Byte-recoverable (``search_unheading`` reproduces the original). Gated
on the dedicated ``_try_detect_search`` — the general classifier calls
grep-over-code SOURCE_CODE and would wrongly reject it.
* LOG (build/test/app logs) -> ANSI strip + run-collapse. Recoverable
modulo non-semantic ANSI color (``expand_runs`` restores the lines).
* JSON -> whitespace-minify. **Data-lossless** (``json.loads`` equals the
original object) — same information, fewer tokens. NOT byte-exact, so a
read-then-``Edit(old_string=…)`` on the *same* JSON file could miss; the
data is fully preserved.

Returns ``(compacted, kind)`` when a recognized shape actually shrinks,
else ``None``. Source code and glob path-lists match nothing -> verbatim.
Always safe to run (information-preserving) so there is no feature gate.
Never raises.
"""
if not isinstance(content, str) or len(content) < 200:
return None
try:
from .lossless_compaction import compact_lossless

det = _try_detect_search(content)
if det is not None and det.content_type is ContentType.SEARCH_RESULTS:
out = compact_lossless(content, "search")
return (out, "search") if len(out) < len(content) else None
if _try_detect_log(content) is not None:
out = compact_lossless(content, "log")
return (out, "log") if len(out) < len(content) else None
minified = self._minify_json_data_lossless(content)
return (minified, "json") if minified is not None else None
except Exception: # noqa: BLE001
return None

@staticmethod
def _minify_json_data_lossless(content: str) -> str | None:
"""Whitespace-minify a complete JSON value: data-preserving, not byte-exact.

The ``json.loads`` parse is the data-equality guarantee (identical
object). Returns the minified form only when the content is a JSON
object/array and the result is smaller; ``None`` otherwise (source code,
partial/non-JSON).
"""
stripped = content.strip()
if not stripped or stripped[0] not in "{[":
return None
obj = json.loads(stripped)
minified = json.dumps(obj, separators=(",", ":"), ensure_ascii=False)
return minified if len(minified) < len(content) else None

def _bash_search_fold(self, tool_name: str, tool_id: str, content: Any) -> str | None:
"""Byte-lossless fold for a read-only search run through a shell tool.

``bash`` is not excluded, so its output normally takes the lossy strategy
path. But when the command is a read-only search (grep/rg/git grep/…),
its output is byte-losslessly foldable — so fold it (the same guarantee
excluded Grep gets) instead of lossy-compressing. The command whitelist
is only a *gate to attempt*: ``compact_lossless`` verifies reversibility
and returns the input unchanged when it can't safely shrink, so a mis-
gated command (``grep -l`` path-lists, ``grep -c`` counts) simply falls
through to the normal path with no accuracy risk.

Returns the folded text (smaller, recoverable) or ``None`` to fall through.
"""
if not isinstance(content, str) or len(content) < 200:
return None
if tool_name.lower() not in self.config.bash_tool_names:
return None
command = self._tool_call_commands.get(tool_id, "")
if not command or not _bash_command_is_search(command, self.config.bash_search_commands):
return None
try:
from .lossless_compaction import compact_lossless

folded = compact_lossless(content, "search")
except Exception: # noqa: BLE001
return None
return folded if len(folded) < len(content) else None

def _get_tool_bias(self, tool_name: str) -> float:
"""Look up compression bias for a tool name.

Expand Down Expand Up @@ -3528,6 +3747,19 @@ def _process_content_blocks(
tool_use_id = block.get("tool_use_id", "")
if tool_use_id in excluded_tool_ids:
if messages_from_end <= read_protection_window:
# Protected from lossy compression — but grep/log/json
# output can still be losslessly compacted.
compacted = self._lossless_compact_excluded(block.get("content"))
if compacted is not None:
folded, kind = compacted
new_blocks.append({**block, "content": folded})
transforms_applied.append(f"router:excluded:lossless_{kind}")
if route_counts is not None:
route_counts["excluded_tool_lossless"] = (
route_counts.get("excluded_tool_lossless", 0) + 1
)
any_compressed = True
continue
# Recent — protect as before
new_blocks.append(block)
transforms_applied.append("router:excluded:tool")
Expand All @@ -3551,6 +3783,20 @@ def _process_content_blocks(

tool_content = block.get("content", "")

# Bash-search lossless pre-empt (twin of the string-form path):
# fold read-only search output (grep/rg/git grep) byte-losslessly
# instead of taking the lossy strategy path.
bash_folded = self._bash_search_fold(tool_name, tool_use_id, tool_content)
if bash_folded is not None:
new_blocks.append({**block, "content": bash_folded})
transforms_applied.append("router:bash:lossless_search")
if route_counts is not None:
route_counts["bash_lossless_search"] = (
route_counts.get("bash_lossless_search", 0) + 1
)
any_compressed = True
continue

# Protection: failed tool calls / error outputs stay verbatim
# (issue #847). `is_error` is Anthropic's explicit failure
# flag and suffices alone; the indicator scan catches error
Expand Down
Loading
Loading