From 464a615b3856148b20a4fb9c61adc9bb4fac86ec Mon Sep 17 00:00:00 2001 From: Haoran Date: Thu, 25 Jun 2026 22:57:20 +0800 Subject: [PATCH] Lead loses teammate results when a turn ends without a tool call (#291) After the Lead spawns teammate agents, if the current round terminates with plain text (stop_reason != tool_use), agent_loop returns and the REPL falls back to a blocking input(). Teammates run as daemon threads and finish later, so the results they send to the Lead's inbox stay unconsumed until the user submits their next input. The cause is that result delivery is bound to the turn loop, whose lifetime ends when the model stops calling tools, while a teammate completes on its own clock. Blocking a teammate is harmless because it runs as a background thread; the problem is specific to the Lead, which owns the user prompt. The fix follows real Claude Code (useInboxPoller, described in the appendix): decouple input() from turn execution. MessageBus.peek(agent) reports whether the inbox has unread messages without consuming them. In __main__, input() moves to a dedicated daemon thread and an inbox poller thread peeks the Lead's inbox every second; both push to one shared event queue. The main thread runs one turn per event, woken by either user input or an incoming teammate message, so teammate results become new turns without waiting for the user. Repeated inbox wakeups are idempotent: an empty read is skipped when a prior read_inbox already drained the messages. Only the Lead's main loop changes; the teammates' idle loops are untouched. --- s15_agent_teams/code.py | 66 +++++++++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 16 deletions(-) diff --git a/s15_agent_teams/code.py b/s15_agent_teams/code.py index 53324b5a1..f48f60ffc 100644 --- a/s15_agent_teams/code.py +++ b/s15_agent_teams/code.py @@ -20,7 +20,7 @@ Teammate: inbox → LLM → bash/read/write/send → loop (max 10 turns) """ -import os, subprocess, json, time, random, threading +import os, subprocess, json, time, random, threading, queue from pathlib import Path from datetime import datetime from dataclasses import dataclass, asdict @@ -617,6 +617,13 @@ def read_inbox(self, agent: str) -> list[dict]: inbox.unlink() # consume: read + delete return msgs + def peek(self, agent: str) -> bool: + """Non-destructive: True if the agent has unread inbox messages. + The Lead's inbox poller uses this to decide whether to wake a turn + without consuming the mailbox.""" + inbox = MAILBOX_DIR / f"{agent}.jsonl" + return inbox.exists() and inbox.stat().st_size > 0 + BUS = MessageBus() @@ -903,26 +910,53 @@ def agent_loop(messages: list, context: dict): print("Enter a question, press Enter to send. Type q to quit.\n") history = [] context = update_context({}, []) + + # input() and a 1s inbox poller feed one event queue (issue #291). + events = queue.Queue() + + def input_reader(): + while True: + try: + line = input("\033[36ms15 >> \033[0m") + except (EOFError, KeyboardInterrupt): + events.put(("quit", None)) + return + events.put(("user", line)) + + def inbox_poller(): + # Poll ~1s and submit the Lead's inbox as a new turn. Don't gate on + # active_teammates: a teammate sends its result and then removes itself, + # so the final message can outlive its registry entry. + while True: + time.sleep(1) + if BUS.peek("lead"): + events.put(("inbox", None)) + + threading.Thread(target=input_reader, daemon=True).start() + threading.Thread(target=inbox_poller, daemon=True).start() + while True: - try: - query = input("\033[36ms15 >> \033[0m") - except (EOFError, KeyboardInterrupt): - break - if query.strip().lower() in ("q", "exit", ""): + kind, payload = events.get() + if kind == "quit": break - history.append({"role": "user", "content": query}) + if kind == "user": + if payload.strip().lower() in ("q", "exit", ""): + break + history.append({"role": "user", "content": payload}) + else: # "inbox": a teammate message woke the Lead + inbox = BUS.read_inbox("lead") + if not inbox: + continue # already drained by an earlier wake (idempotent) + inbox_text = "\n".join( + f"From {m['from']}: {m['content'][:200]}" for m in inbox) + history.append({"role": "user", + "content": f"[Inbox]\n{inbox_text}"}) + print(f"\n\033[33m[Inbox: {len(inbox)} messages → new turn]\033[0m") + + # One turn for whichever source woke us. agent_loop(history, context) context = update_context(context, history) for block in history[-1]["content"]: if getattr(block, "type", None) == "text": print(block.text) - - # Check inbox for teammate results → inject into history - inbox = BUS.read_inbox("lead") - if inbox: - inbox_text = "\n".join( - f"From {m['from']}: {m['content'][:200]}" for m in inbox) - history.append({"role": "user", - "content": f"[Inbox]\n{inbox_text}"}) - print(f"\n\033[33m[Inbox: {len(inbox)} messages injected]\033[0m") print()