Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 50 additions & 16 deletions s15_agent_teams/code.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
Teammate: inbox → LLM → bash/read/write/send → loop (max 10 turns)
"""

import os, subprocess, json, time, random, threading
import os, subprocess, json, time, random, threading, queue
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict
Expand Down Expand Up @@ -617,6 +617,13 @@ def read_inbox(self, agent: str) -> list[dict]:
inbox.unlink() # consume: read + delete
return msgs

def peek(self, agent: str) -> bool:
"""Non-destructive: True if the agent has unread inbox messages.
The Lead's inbox poller uses this to decide whether to wake a turn
without consuming the mailbox."""
inbox = MAILBOX_DIR / f"{agent}.jsonl"
return inbox.exists() and inbox.stat().st_size > 0


BUS = MessageBus()

Expand Down Expand Up @@ -903,26 +910,53 @@ def agent_loop(messages: list, context: dict):
print("Enter a question, press Enter to send. Type q to quit.\n")
history = []
context = update_context({}, [])

# input() and a 1s inbox poller feed one event queue (issue #291).
events = queue.Queue()

def input_reader():
while True:
try:
line = input("\033[36ms15 >> \033[0m")
except (EOFError, KeyboardInterrupt):
events.put(("quit", None))
return
events.put(("user", line))

def inbox_poller():
# Poll ~1s and submit the Lead's inbox as a new turn. Don't gate on
# active_teammates: a teammate sends its result and then removes itself,
# so the final message can outlive its registry entry.
while True:
time.sleep(1)
if BUS.peek("lead"):
events.put(("inbox", None))

threading.Thread(target=input_reader, daemon=True).start()
threading.Thread(target=inbox_poller, daemon=True).start()

while True:
try:
query = input("\033[36ms15 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
kind, payload = events.get()
if kind == "quit":
break
history.append({"role": "user", "content": query})
if kind == "user":
if payload.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": payload})
else: # "inbox": a teammate message woke the Lead
inbox = BUS.read_inbox("lead")
if not inbox:
continue # already drained by an earlier wake (idempotent)
inbox_text = "\n".join(
f"From {m['from']}: {m['content'][:200]}" for m in inbox)
history.append({"role": "user",
"content": f"[Inbox]\n{inbox_text}"})
print(f"\n\033[33m[Inbox: {len(inbox)} messages → new turn]\033[0m")

# One turn for whichever source woke us.
agent_loop(history, context)
context = update_context(context, history)
for block in history[-1]["content"]:
if getattr(block, "type", None) == "text":
print(block.text)

# Check inbox for teammate results → inject into history
inbox = BUS.read_inbox("lead")
if inbox:
inbox_text = "\n".join(
f"From {m['from']}: {m['content'][:200]}" for m in inbox)
history.append({"role": "user",
"content": f"[Inbox]\n{inbox_text}"})
print(f"\n\033[33m[Inbox: {len(inbox)} messages injected]\033[0m")
print()