Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions pr_agent/servers/github_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ async def get_body(request):
return body


_duplicate_push_triggers = DefaultDictWithTimeout(ttl=get_settings().github_app.push_trigger_pending_tasks_ttl)
_pending_task_duplicate_push_conditions = DefaultDictWithTimeout(asyncio.locks.Condition, ttl=get_settings().github_app.push_trigger_pending_tasks_ttl)
# No TTL eviction: these caches are bounded by the deterministic cleanup in
# handle_push_trigger_for_new_commits (entries are removed once no task remains).
# TTL-based eviction could otherwise delete state for a still-in-flight task.
_duplicate_push_triggers = DefaultDictWithTimeout(ttl=None)
_pending_task_duplicate_push_conditions = DefaultDictWithTimeout(asyncio.locks.Condition, ttl=None)

async def handle_comments_on_pr(body: Dict[str, Any],
event: str,
Expand Down Expand Up @@ -172,22 +175,24 @@ async def handle_push_trigger_for_new_commits(body: Dict[str, Any],
# We let the second event wait instead of discarding it because while the first event was being processed,
# more commits may have been pushed that led to the subsequent events,
# so we keep just one waiting as a delegate to trigger the processing for the new commits when done waiting.
current_active_tasks = _duplicate_push_triggers.setdefault(api_url, 0)
max_active_tasks = 2 if get_settings().github_app.push_trigger_pending_tasks_backlog else 1
if current_active_tasks < max_active_tasks:
# Admission and cleanup mutate the counter and condition for this api_url, so
# both run under the same per-PR condition lock to keep their lifecycle
# consistent across concurrent tasks.
async with _pending_task_duplicate_push_conditions[api_url]:
current_active_tasks = _duplicate_push_triggers.setdefault(api_url, 0)
max_active_tasks = 2 if get_settings().github_app.push_trigger_pending_tasks_backlog else 1
if current_active_tasks >= max_active_tasks:
get_logger().info(
f"Skipping push trigger for {api_url=} because another event already triggered the same processing"
)
return {}
# first task can enter, and second tasks too if backlog is enabled
get_logger().info(
f"Continue processing push trigger for {api_url=} because there are {current_active_tasks} active tasks"
)
_duplicate_push_triggers[api_url] += 1
else:
get_logger().info(
f"Skipping push trigger for {api_url=} because another event already triggered the same processing"
)
return {}
async with _pending_task_duplicate_push_conditions[api_url]:
if current_active_tasks == 1:
# second task waits
# second task waits for the in-progress task to finish
get_logger().info(
f"Waiting to process push trigger for {api_url=} because the first task is still in progress"
)
Expand All @@ -200,10 +205,20 @@ async def handle_push_trigger_for_new_commits(body: Dict[str, Any],
await _perform_auto_commands_github("push_commands", agent, body, api_url, log_context)

finally:
# release the waiting task block
# Release the next waiting task, then remove the shared per-PR state once
# no task remains. The decrement, the "is anyone left?" check, and the
# removal run under the same condition lock that guards admission, so a
# newly admitted task cannot interleave between the decision and removal.
async with _pending_task_duplicate_push_conditions[api_url]:
_pending_task_duplicate_push_conditions[api_url].notify(1)
_duplicate_push_triggers[api_url] -= 1
if _duplicate_push_triggers[api_url] <= 0:
# pop() (not del) keeps DefaultDictWithTimeout's internal
# key-time map in sync and tolerates an already-missing key.
_duplicate_push_triggers.pop(api_url, None)
_pending_task_duplicate_push_conditions.pop(api_url, None)

return {}


def handle_closed_pr(body, event, action, log_context):
Expand Down
7 changes: 7 additions & 0 deletions pr_agent/servers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,10 @@ def __setitem__(self, __key, __value):
def __delitem__(self, __key):
del self.__key_times[__key]
return super().__delitem__(__key)

def pop(self, __key, *args):
# Keep the internal key-time map in sync. Unlike __delitem__, pop must
# tolerate a missing key (callers rely on the ``default`` argument), so
# the timestamp is discarded with pop(..., None) rather than del.
self.__key_times.pop(__key, None)
return super().pop(__key, *args)
1 change: 0 additions & 1 deletion pr_agent/settings/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ push_trigger_ignore_bot_commits = true
push_trigger_ignore_merge_commits = true
push_trigger_wait_for_initial_review = true
push_trigger_pending_tasks_backlog = true
push_trigger_pending_tasks_ttl = 300
push_commands = [
"/describe",
"/review",
Expand Down
119 changes: 116 additions & 3 deletions tests/unittest/test_github_app_timeout_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,40 @@ def test_delitem_removes_key_time(self, fake_clock):
assert "a" not in d
assert "a" not in _key_times(d)

def test_pop_removes_key_time(self, fake_clock):
d = DefaultDictWithTimeout(lambda: 0, ttl=10, refresh_interval=1000)
d["a"] = 1
# Call pop() outside the assert: an assert expression must be
# side-effect free (it is skipped entirely under `python -O`).
popped = d.pop("a")
assert popped == 1
assert "a" not in d
assert "a" not in _key_times(d)

def test_pop_missing_key_with_default_does_not_raise(self, fake_clock):
d = DefaultDictWithTimeout(lambda: 0, ttl=10, refresh_interval=1000)
result = d.pop("missing", "fallback")
assert result == "fallback"
assert "missing" not in _key_times(d)

def test_pop_then_refresh_does_not_raise(self, fake_clock):
# Regression: pop() must drop the key-time too. Otherwise a later
# __refresh() builds `to_delete` from a stale key-time and runs
# `del self[key]` for a key already gone, raising KeyError.
d = DefaultDictWithTimeout(
lambda: 0, ttl=2, refresh_interval=5, update_key_time_on_get=False
)
d["a"] = 1
d.pop("a")

# Advance past both the TTL and the refresh interval so __refresh runs
# its deletion pass on the next access.
fake_clock["t"] += 10

# Must not raise while refreshing.
_ = d["fresh"]
assert "fresh" in d

def test_refresh_runs_after_long_idle_period(self, fake_clock):
d = DefaultDictWithTimeout(
lambda: 0, ttl=2, refresh_interval=5, update_key_time_on_get=False
Expand Down Expand Up @@ -417,7 +451,7 @@ def _push_body():


class TestPushTriggerDedupe:
def test_first_event_runs_perform_and_decrements_counter(self, push_trigger_env):
def test_first_event_runs_perform_and_cleans_up_entries(self, push_trigger_env):
body = _push_body()
api_url = body["pull_request"]["url"]

Expand All @@ -428,8 +462,10 @@ def test_first_event_runs_perform_and_decrements_counter(self, push_trigger_env)
)

assert push_trigger_env["count"] == 1
# Counter incremented then decremented back to 0.
assert github_app._duplicate_push_triggers[api_url] == 0
# Counter incremented to 1, decremented back to 0, then both dedupe
# entries are removed so the caches don't grow without bound.
assert api_url not in github_app._duplicate_push_triggers
assert api_url not in github_app._pending_task_duplicate_push_conditions

def test_skips_when_before_equals_after(self, push_trigger_env):
body = _push_body()
Expand Down Expand Up @@ -495,3 +531,80 @@ def test_invalid_pr_event_short_circuits(self, push_trigger_env):
)

assert push_trigger_env["count"] == 0

def test_does_not_clean_up_while_another_task_active(
self, push_trigger_env, monkeypatch
):
# A second push arriving mid-processing bumps the counter to 2. When the
# first task finishes, the counter decrements to 1 (still active), so the
# cleanup must be skipped and the dedupe entries preserved for the waiter.
body = _push_body()
api_url = body["pull_request"]["url"]

async def perform_then_simulate_concurrent(*args, **kwargs):
push_trigger_env["count"] += 1
github_app._duplicate_push_triggers[api_url] += 1

monkeypatch.setattr(
github_app, "_perform_auto_commands_github", perform_then_simulate_concurrent
)

asyncio.run(
github_app.handle_push_trigger_for_new_commits(
body, "push", "alice", "1", "synchronize", {}, agent=None
)
)

assert push_trigger_env["count"] == 1
# Counter left at 1 (the still-active concurrent task); nothing evicted.
assert github_app._duplicate_push_triggers[api_url] == 1
assert api_url in github_app._pending_task_duplicate_push_conditions

def test_backlog_second_task_waits_then_both_clean_up(
self, push_trigger_env, monkeypatch
):
# With backlog enabled, a second concurrent push for the same PR waits for
# the first task to finish, then runs. Admission and cleanup share the
# per-PR condition lock, so the shared entries are removed only once both
# tasks have drained.
github_app.get_settings().github_app.push_trigger_pending_tasks_backlog = True
api_url = _push_body()["pull_request"]["url"]

order = []
calls = {"n": 0}

async def perform(*args, **kwargs):
calls["n"] += 1
if calls["n"] == 1:
# Hold the first task until the second has been admitted and is
# waiting (counter == 2) so the wait/notify path is exercised.
for _ in range(1000):
if github_app._duplicate_push_triggers.get(api_url, 0) >= 2:
break
await asyncio.sleep(0)
order.append("first")
else:
order.append("second")

monkeypatch.setattr(github_app, "_perform_auto_commands_github", perform)

async def scenario():
await asyncio.wait_for(
asyncio.gather(
github_app.handle_push_trigger_for_new_commits(
_push_body(), "push", "alice", "1", "synchronize", {}, agent=None
),
github_app.handle_push_trigger_for_new_commits(
_push_body(), "push", "bob", "2", "synchronize", {}, agent=None
),
),
timeout=5,
)

asyncio.run(scenario())

assert calls["n"] == 2
assert order == ["first", "second"]
# Both tasks drained -> shared state fully cleaned up.
assert api_url not in github_app._duplicate_push_triggers
assert api_url not in github_app._pending_task_duplicate_push_conditions
Loading