diff --git a/pr_agent/servers/github_app.py b/pr_agent/servers/github_app.py index b94b79e32f..59c4a91b54 100644 --- a/pr_agent/servers/github_app.py +++ b/pr_agent/servers/github_app.py @@ -74,8 +74,11 @@ async def get_body(request): return body -_duplicate_push_triggers = DefaultDictWithTimeout(ttl=get_settings().github_app.push_trigger_pending_tasks_ttl) -_pending_task_duplicate_push_conditions = DefaultDictWithTimeout(asyncio.locks.Condition, ttl=get_settings().github_app.push_trigger_pending_tasks_ttl) +# No TTL eviction: these caches are bounded by the deterministic cleanup in +# handle_push_trigger_for_new_commits (entries are removed once no task remains). +# TTL-based eviction could otherwise delete state for a still-in-flight task. +_duplicate_push_triggers = DefaultDictWithTimeout(ttl=None) +_pending_task_duplicate_push_conditions = DefaultDictWithTimeout(asyncio.locks.Condition, ttl=None) async def handle_comments_on_pr(body: Dict[str, Any], event: str, @@ -172,22 +175,24 @@ async def handle_push_trigger_for_new_commits(body: Dict[str, Any], # We let the second event wait instead of discarding it because while the first event was being processed, # more commits may have been pushed that led to the subsequent events, # so we keep just one waiting as a delegate to trigger the processing for the new commits when done waiting. - current_active_tasks = _duplicate_push_triggers.setdefault(api_url, 0) - max_active_tasks = 2 if get_settings().github_app.push_trigger_pending_tasks_backlog else 1 - if current_active_tasks < max_active_tasks: + # Admission and cleanup mutate the counter and condition for this api_url, so + # both run under the same per-PR condition lock to keep their lifecycle + # consistent across concurrent tasks. + async with _pending_task_duplicate_push_conditions[api_url]: + current_active_tasks = _duplicate_push_triggers.setdefault(api_url, 0) + max_active_tasks = 2 if get_settings().github_app.push_trigger_pending_tasks_backlog else 1 + if current_active_tasks >= max_active_tasks: + get_logger().info( + f"Skipping push trigger for {api_url=} because another event already triggered the same processing" + ) + return {} # first task can enter, and second tasks too if backlog is enabled get_logger().info( f"Continue processing push trigger for {api_url=} because there are {current_active_tasks} active tasks" ) _duplicate_push_triggers[api_url] += 1 - else: - get_logger().info( - f"Skipping push trigger for {api_url=} because another event already triggered the same processing" - ) - return {} - async with _pending_task_duplicate_push_conditions[api_url]: if current_active_tasks == 1: - # second task waits + # second task waits for the in-progress task to finish get_logger().info( f"Waiting to process push trigger for {api_url=} because the first task is still in progress" ) @@ -200,10 +205,20 @@ async def handle_push_trigger_for_new_commits(body: Dict[str, Any], await _perform_auto_commands_github("push_commands", agent, body, api_url, log_context) finally: - # release the waiting task block + # Release the next waiting task, then remove the shared per-PR state once + # no task remains. The decrement, the "is anyone left?" check, and the + # removal run under the same condition lock that guards admission, so a + # newly admitted task cannot interleave between the decision and removal. async with _pending_task_duplicate_push_conditions[api_url]: _pending_task_duplicate_push_conditions[api_url].notify(1) _duplicate_push_triggers[api_url] -= 1 + if _duplicate_push_triggers[api_url] <= 0: + # pop() (not del) keeps DefaultDictWithTimeout's internal + # key-time map in sync and tolerates an already-missing key. + _duplicate_push_triggers.pop(api_url, None) + _pending_task_duplicate_push_conditions.pop(api_url, None) + + return {} def handle_closed_pr(body, event, action, log_context): diff --git a/pr_agent/servers/utils.py b/pr_agent/servers/utils.py index 2169eb59bb..e2223277aa 100644 --- a/pr_agent/servers/utils.py +++ b/pr_agent/servers/utils.py @@ -84,3 +84,10 @@ def __setitem__(self, __key, __value): def __delitem__(self, __key): del self.__key_times[__key] return super().__delitem__(__key) + + def pop(self, __key, *args): + # Keep the internal key-time map in sync. Unlike __delitem__, pop must + # tolerate a missing key (callers rely on the ``default`` argument), so + # the timestamp is discarded with pop(..., None) rather than del. + self.__key_times.pop(__key, None) + return super().pop(__key, *args) diff --git a/pr_agent/settings/configuration.toml b/pr_agent/settings/configuration.toml index 515c66f56b..474a420f57 100644 --- a/pr_agent/settings/configuration.toml +++ b/pr_agent/settings/configuration.toml @@ -257,7 +257,6 @@ push_trigger_ignore_bot_commits = true push_trigger_ignore_merge_commits = true push_trigger_wait_for_initial_review = true push_trigger_pending_tasks_backlog = true -push_trigger_pending_tasks_ttl = 300 push_commands = [ "/describe", "/review", diff --git a/tests/unittest/test_github_app_timeout_core.py b/tests/unittest/test_github_app_timeout_core.py index 1ecbf8cb36..265454f59f 100644 --- a/tests/unittest/test_github_app_timeout_core.py +++ b/tests/unittest/test_github_app_timeout_core.py @@ -137,6 +137,40 @@ def test_delitem_removes_key_time(self, fake_clock): assert "a" not in d assert "a" not in _key_times(d) + def test_pop_removes_key_time(self, fake_clock): + d = DefaultDictWithTimeout(lambda: 0, ttl=10, refresh_interval=1000) + d["a"] = 1 + # Call pop() outside the assert: an assert expression must be + # side-effect free (it is skipped entirely under `python -O`). + popped = d.pop("a") + assert popped == 1 + assert "a" not in d + assert "a" not in _key_times(d) + + def test_pop_missing_key_with_default_does_not_raise(self, fake_clock): + d = DefaultDictWithTimeout(lambda: 0, ttl=10, refresh_interval=1000) + result = d.pop("missing", "fallback") + assert result == "fallback" + assert "missing" not in _key_times(d) + + def test_pop_then_refresh_does_not_raise(self, fake_clock): + # Regression: pop() must drop the key-time too. Otherwise a later + # __refresh() builds `to_delete` from a stale key-time and runs + # `del self[key]` for a key already gone, raising KeyError. + d = DefaultDictWithTimeout( + lambda: 0, ttl=2, refresh_interval=5, update_key_time_on_get=False + ) + d["a"] = 1 + d.pop("a") + + # Advance past both the TTL and the refresh interval so __refresh runs + # its deletion pass on the next access. + fake_clock["t"] += 10 + + # Must not raise while refreshing. + _ = d["fresh"] + assert "fresh" in d + def test_refresh_runs_after_long_idle_period(self, fake_clock): d = DefaultDictWithTimeout( lambda: 0, ttl=2, refresh_interval=5, update_key_time_on_get=False @@ -417,7 +451,7 @@ def _push_body(): class TestPushTriggerDedupe: - def test_first_event_runs_perform_and_decrements_counter(self, push_trigger_env): + def test_first_event_runs_perform_and_cleans_up_entries(self, push_trigger_env): body = _push_body() api_url = body["pull_request"]["url"] @@ -428,8 +462,10 @@ def test_first_event_runs_perform_and_decrements_counter(self, push_trigger_env) ) assert push_trigger_env["count"] == 1 - # Counter incremented then decremented back to 0. - assert github_app._duplicate_push_triggers[api_url] == 0 + # Counter incremented to 1, decremented back to 0, then both dedupe + # entries are removed so the caches don't grow without bound. + assert api_url not in github_app._duplicate_push_triggers + assert api_url not in github_app._pending_task_duplicate_push_conditions def test_skips_when_before_equals_after(self, push_trigger_env): body = _push_body() @@ -495,3 +531,80 @@ def test_invalid_pr_event_short_circuits(self, push_trigger_env): ) assert push_trigger_env["count"] == 0 + + def test_does_not_clean_up_while_another_task_active( + self, push_trigger_env, monkeypatch + ): + # A second push arriving mid-processing bumps the counter to 2. When the + # first task finishes, the counter decrements to 1 (still active), so the + # cleanup must be skipped and the dedupe entries preserved for the waiter. + body = _push_body() + api_url = body["pull_request"]["url"] + + async def perform_then_simulate_concurrent(*args, **kwargs): + push_trigger_env["count"] += 1 + github_app._duplicate_push_triggers[api_url] += 1 + + monkeypatch.setattr( + github_app, "_perform_auto_commands_github", perform_then_simulate_concurrent + ) + + asyncio.run( + github_app.handle_push_trigger_for_new_commits( + body, "push", "alice", "1", "synchronize", {}, agent=None + ) + ) + + assert push_trigger_env["count"] == 1 + # Counter left at 1 (the still-active concurrent task); nothing evicted. + assert github_app._duplicate_push_triggers[api_url] == 1 + assert api_url in github_app._pending_task_duplicate_push_conditions + + def test_backlog_second_task_waits_then_both_clean_up( + self, push_trigger_env, monkeypatch + ): + # With backlog enabled, a second concurrent push for the same PR waits for + # the first task to finish, then runs. Admission and cleanup share the + # per-PR condition lock, so the shared entries are removed only once both + # tasks have drained. + github_app.get_settings().github_app.push_trigger_pending_tasks_backlog = True + api_url = _push_body()["pull_request"]["url"] + + order = [] + calls = {"n": 0} + + async def perform(*args, **kwargs): + calls["n"] += 1 + if calls["n"] == 1: + # Hold the first task until the second has been admitted and is + # waiting (counter == 2) so the wait/notify path is exercised. + for _ in range(1000): + if github_app._duplicate_push_triggers.get(api_url, 0) >= 2: + break + await asyncio.sleep(0) + order.append("first") + else: + order.append("second") + + monkeypatch.setattr(github_app, "_perform_auto_commands_github", perform) + + async def scenario(): + await asyncio.wait_for( + asyncio.gather( + github_app.handle_push_trigger_for_new_commits( + _push_body(), "push", "alice", "1", "synchronize", {}, agent=None + ), + github_app.handle_push_trigger_for_new_commits( + _push_body(), "push", "bob", "2", "synchronize", {}, agent=None + ), + ), + timeout=5, + ) + + asyncio.run(scenario()) + + assert calls["n"] == 2 + assert order == ["first", "second"] + # Both tasks drained -> shared state fully cleaned up. + assert api_url not in github_app._duplicate_push_triggers + assert api_url not in github_app._pending_task_duplicate_push_conditions