From dffa243c1e4ee16256a4851b5d0335d08b18c7e8 Mon Sep 17 00:00:00 2001 From: MahmoudHaouachi2 Date: Thu, 2 Jul 2026 11:50:11 +0200 Subject: [PATCH 1/4] feat: add CI artifact context injection for GitHub Action Adds a registry-based artifact parser system that reads a CI-produced file (e.g. terraform plan output, test report) and injects its content into tool extra_instructions as extra review context. New files: - pr_agent/algo/artifacts.py: core module with resolve_artifact_path, load_artifact_content, and three built-in parsers (generic, terraform_plan, test_report) - tests/unittest/test_artifacts.py: full unit test suite Configuration (off by default, opt-in via artifact_path action input): [artifacts] enable = false artifact_path = "" artifact_type = "generic" target_tools = ["pr_reviewer", "pr_description", "pr_code_suggestions"] max_artifact_size = 50000 --- pr_agent/algo/artifacts.py | 132 ++++++++++++++++++++ pr_agent/settings/configuration.toml | 14 +++ tests/unittest/test_artifacts.py | 176 +++++++++++++++++++++++++++ 3 files changed, 322 insertions(+) create mode 100644 pr_agent/algo/artifacts.py create mode 100644 tests/unittest/test_artifacts.py diff --git a/pr_agent/algo/artifacts.py b/pr_agent/algo/artifacts.py new file mode 100644 index 0000000000..fc81a00023 --- /dev/null +++ b/pr_agent/algo/artifacts.py @@ -0,0 +1,132 @@ +import os +from pathlib import Path +from typing import Optional + +from pr_agent.config_loader import get_settings +from pr_agent.log import get_logger + + +ARTIFACT_PARSERS = {} + + +def register_parser(artifact_type: str): + def decorator(func): + ARTIFACT_PARSERS[artifact_type] = func + return func + return decorator + + +@register_parser("generic") +def parse_generic(content: str, label: str) -> str: + header = f"CI Artifact: {label}" if label else "CI Artifact" + return ( + f"{header}\n" + f"=====\n" + f"{content}\n" + f"=====\n" + f"Consider this artifact as additional context when analyzing the PR. " + f"It was produced by a prior CI step." + ) + + +@register_parser("terraform_plan") +def parse_terraform_plan(content: str, label: str) -> str: + header = f"Terraform Plan Output: {label}" if label else "Terraform Plan Output" + return ( + f"{header}\n" + f"=====\n" + f"{content}\n" + f"=====\n" + f"This is the Terraform execution plan for the infrastructure changes in this PR. " + f"Use it to verify that the code changes produce the intended infrastructure modifications. " + f"Flag any unexpected resource deletions or risky changes." + ) + + +@register_parser("test_report") +def parse_test_report(content: str, label: str) -> str: + header = f"Test Results: {label}" if label else "Test Results" + return ( + f"{header}\n" + f"=====\n" + f"{content}\n" + f"=====\n" + f"These are the test results from the CI pipeline for this PR. " + f"If there are failures, correlate them with the code changes in the diff. " + f"Note any tests that are newly failing." + ) + + +def resolve_artifact_path(path: str) -> Optional[Path]: + if not path: + return None + + artifact_path = Path(path) + if artifact_path.is_absolute(): + return artifact_path if artifact_path.is_file() else None + + workspace = os.environ.get("GITHUB_WORKSPACE", "") + if workspace: + resolved = Path(workspace) / artifact_path + if resolved.is_file(): + return resolved + + resolved = artifact_path.resolve() + if resolved.is_file(): + return resolved + + return None + + +def _read_and_truncate(path: Path, max_size: int) -> str: + try: + content = path.read_text(encoding="utf-8", errors="replace") + except (OSError, IOError) as e: + get_logger().warning(f"Failed to read artifact file {path}: {e}") + return "" + + if len(content) > max_size: + content = content[:max_size] + "\n\n[... content truncated due to size limit ...]" + return content + + +def load_artifact_content(tool_key: str) -> str: + try: + artifacts_settings = get_settings().get("ARTIFACTS", {}) + except AttributeError: + return "" + + if not artifacts_settings: + return "" + + enabled = artifacts_settings.get("enable", False) + if not enabled: + return "" + + artifact_path_str = artifacts_settings.get("artifact_path", "") + if not artifact_path_str: + return "" + + target_tools = artifacts_settings.get("target_tools", + ["pr_reviewer", "pr_description", "pr_code_suggestions"]) + if tool_key not in target_tools: + return "" + + artifact_path = resolve_artifact_path(artifact_path_str) + if not artifact_path: + get_logger().warning( + f"Artifact file not found: '{artifact_path_str}' " + f"(GITHUB_WORKSPACE={os.environ.get('GITHUB_WORKSPACE', 'not set')})" + ) + return "" + + max_size = int(artifacts_settings.get("max_artifact_size", 50000)) + content = _read_and_truncate(artifact_path, max_size) + if not content: + return "" + + artifact_type = artifacts_settings.get("artifact_type", "generic") + label = artifacts_settings.get("artifact_label", "") or artifact_path.name + + parser = ARTIFACT_PARSERS.get(artifact_type, ARTIFACT_PARSERS["generic"]) + return parser(content, label) diff --git a/pr_agent/settings/configuration.toml b/pr_agent/settings/configuration.toml index 675f5dad9e..2f9ae0fdf4 100644 --- a/pr_agent/settings/configuration.toml +++ b/pr_agent/settings/configuration.toml @@ -368,6 +368,20 @@ extra_instructions = "" # public - extra instructions to the auto best practices content = "" max_patterns = 5 # max number of patterns to be detected +[artifacts] +# Enable artifact injection into tool prompts (off by default; auto-enabled when artifact_path input is set) +enable = false +# File path to the artifact (relative to GITHUB_WORKSPACE, or absolute) +artifact_path = "" +# Parser type: "generic", "terraform_plan", "test_report" +artifact_type = "generic" +# Label shown to the AI — defaults to the filename when empty +artifact_label = "" +# Which tools receive artifact context +target_tools = ["pr_reviewer", "pr_description", "pr_code_suggestions"] +# Max artifact size in characters (content is truncated if exceeded) +max_artifact_size = 50000 + [azure_devops] default_comment_status = "closed" diff --git a/tests/unittest/test_artifacts.py b/tests/unittest/test_artifacts.py new file mode 100644 index 0000000000..0b3fb0109a --- /dev/null +++ b/tests/unittest/test_artifacts.py @@ -0,0 +1,176 @@ +import os +import tempfile +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +from pr_agent.algo.artifacts import ( + resolve_artifact_path, + load_artifact_content, + _read_and_truncate, + ARTIFACT_PARSERS, + parse_generic, + parse_terraform_plan, + parse_test_report, +) + + +class TestResolveArtifactPath: + def test_empty_path_returns_none(self): + assert resolve_artifact_path("") is None + assert resolve_artifact_path(None) is None + + def test_absolute_path_existing_file(self, tmp_path): + f = tmp_path / "plan.txt" + f.write_text("content") + assert resolve_artifact_path(str(f)) == f + + def test_absolute_path_missing_file(self): + assert resolve_artifact_path("/nonexistent/path/file.txt") is None + + def test_relative_path_with_github_workspace(self, tmp_path): + f = tmp_path / "output" / "plan.txt" + f.parent.mkdir(parents=True) + f.write_text("terraform plan") + + with patch.dict(os.environ, {"GITHUB_WORKSPACE": str(tmp_path)}): + result = resolve_artifact_path("output/plan.txt") + assert result == f + + def test_relative_path_without_workspace_falls_back_to_cwd(self, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + f = tmp_path / "plan.txt" + f.write_text("content") + + with patch.dict(os.environ, {}, clear=True): + os.environ.pop("GITHUB_WORKSPACE", None) + result = resolve_artifact_path("plan.txt") + assert result == f + + def test_relative_path_not_found_returns_none(self): + with patch.dict(os.environ, {"GITHUB_WORKSPACE": "/tmp/nonexistent_workspace_xyz"}): + assert resolve_artifact_path("missing.txt") is None + + +class TestReadAndTruncate: + def test_reads_file_content(self, tmp_path): + f = tmp_path / "artifact.txt" + f.write_text("hello world") + assert _read_and_truncate(f, 50000) == "hello world" + + def test_truncates_large_content(self, tmp_path): + f = tmp_path / "big.txt" + f.write_text("x" * 1000) + result = _read_and_truncate(f, 100) + assert len(result) < 1000 + assert result.startswith("x" * 100) + assert "[... content truncated due to size limit ...]" in result + + def test_returns_empty_on_read_error(self, tmp_path): + missing = tmp_path / "no_such_file.txt" + assert _read_and_truncate(missing, 50000) == "" + + +class TestParsers: + def test_parser_registry_has_expected_types(self): + assert "generic" in ARTIFACT_PARSERS + assert "terraform_plan" in ARTIFACT_PARSERS + assert "test_report" in ARTIFACT_PARSERS + + def test_generic_parser(self): + result = parse_generic("some output", "build.log") + assert "CI Artifact: build.log" in result + assert "some output" in result + assert "additional context" in result + + def test_generic_parser_no_label(self): + result = parse_generic("output", "") + assert "CI Artifact\n" in result + + def test_terraform_plan_parser(self): + result = parse_terraform_plan("+ aws_instance.web", "plan.txt") + assert "Terraform Plan Output: plan.txt" in result + assert "+ aws_instance.web" in result + assert "infrastructure" in result + + def test_test_report_parser(self): + result = parse_test_report("FAILED: test_login", "results.xml") + assert "Test Results: results.xml" in result + assert "FAILED: test_login" in result + assert "failures" in result + + +class TestLoadArtifactContent: + def _mock_settings(self, artifacts_config): + settings = MagicMock() + settings.get.side_effect = lambda key, default=None: ( + artifacts_config if key == "ARTIFACTS" else default + ) + return settings + + def test_returns_empty_when_no_config(self): + with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: + mock_gs.return_value.get.return_value = {} + assert load_artifact_content("pr_reviewer") == "" + + def test_returns_empty_when_disabled(self): + with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: + mock_gs.return_value.get.return_value = {"enable": False, "artifact_path": "plan.txt"} + assert load_artifact_content("pr_reviewer") == "" + + def test_returns_empty_when_no_path(self): + with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: + mock_gs.return_value.get.return_value = {"enable": True, "artifact_path": ""} + assert load_artifact_content("pr_reviewer") == "" + + def test_returns_empty_when_tool_not_targeted(self): + with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: + mock_gs.return_value.get.return_value = { + "enable": True, + "artifact_path": "plan.txt", + "target_tools": ["pr_reviewer"], + } + assert load_artifact_content("pr_description") == "" + + def test_returns_empty_when_file_not_found(self): + with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: + mock_gs.return_value.get.return_value = { + "enable": True, + "artifact_path": "/nonexistent/file.txt", + "target_tools": ["pr_reviewer"], + } + assert load_artifact_content("pr_reviewer") == "" + + def test_loads_and_formats_artifact(self, tmp_path): + f = tmp_path / "plan.txt" + f.write_text("+ aws_s3_bucket.data") + + with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: + mock_gs.return_value.get.return_value = { + "enable": True, + "artifact_path": str(f), + "artifact_type": "terraform_plan", + "artifact_label": "", + "target_tools": ["pr_reviewer", "pr_description", "pr_code_suggestions"], + "max_artifact_size": 50000, + } + result = load_artifact_content("pr_reviewer") + assert "Terraform Plan Output: plan.txt" in result + assert "+ aws_s3_bucket.data" in result + + def test_falls_back_to_generic_for_unknown_type(self, tmp_path): + f = tmp_path / "output.log" + f.write_text("some output") + + with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: + mock_gs.return_value.get.return_value = { + "enable": True, + "artifact_path": str(f), + "artifact_type": "unknown_type", + "artifact_label": "", + "target_tools": ["pr_reviewer"], + "max_artifact_size": 50000, + } + result = load_artifact_content("pr_reviewer") + assert "CI Artifact: output.log" in result From 029d654ea20f3186b7f57a648d95f2727412118c Mon Sep 17 00:00:00 2001 From: MahmoudHaouachi2 Date: Thu, 2 Jul 2026 11:51:25 +0200 Subject: [PATCH 2/4] feat: handle workflow_run events and inject artifact context in runner Two additions to github_action_runner.py: 1. Artifact injection: reads ARTIFACT_PATH / ARTIFACT_TYPE env vars before event dispatch and appends parsed artifact content to extra_instructions for pr_description, pr_code_suggestions, and pr_reviewer. Delegates to pr_agent/algo/artifacts.py. No-op when ARTIFACT_PATH is not set. 2. workflow_run handler: new elif branch that handles GITHUB_EVENT_NAME=workflow_run. Extracts the PR API URL from workflow_run.pull_requests[0].url and runs the same auto tools (PRDescription, PRReviewer, PRCodeSuggestions) as the pull_request handler. Guards: skips non-pull_request origins and empty pull_requests arrays (fork PRs). This enables the common pattern of downloading cross-workflow artifacts before running PR-Agent: on: workflow_run: workflows: ["CI"] types: [completed] steps: - uses: actions/download-artifact@v4 with: run-id: ${{ github.event.workflow_run.id }} name: terraform-plan - uses: The-PR-Agent/pr-agent@main with: artifact_path: plan.txt artifact_type: terraform_plan --- pr_agent/servers/github_action_runner.py | 77 +++++++++++++++ .../test_github_action_runner_core.py | 95 +++++++++++++++++++ 2 files changed, 172 insertions(+) diff --git a/pr_agent/servers/github_action_runner.py b/pr_agent/servers/github_action_runner.py index bae6c943b9..38c5599e3e 100644 --- a/pr_agent/servers/github_action_runner.py +++ b/pr_agent/servers/github_action_runner.py @@ -106,6 +106,40 @@ async def run_action(): setting.extra_instructions = updated_instructions except Exception as e: get_logger().info(f"github action: failed to apply language-specific instructions: {e}") + + # Inject artifact content into extra_instructions for configured tools + try: + ARTIFACT_PATH = os.environ.get('ARTIFACT_PATH') or os.environ.get('PR_AGENT_ARTIFACT_PATH') + ARTIFACT_TYPE = os.environ.get('ARTIFACT_TYPE') or os.environ.get('PR_AGENT_ARTIFACT_TYPE') + if ARTIFACT_PATH: + get_settings().set("ARTIFACTS.ENABLE", True) + get_settings().set("ARTIFACTS.ARTIFACT_PATH", ARTIFACT_PATH) + if ARTIFACT_TYPE: + get_settings().set("ARTIFACTS.ARTIFACT_TYPE", ARTIFACT_TYPE) + + artifacts_enabled = get_settings().get("ARTIFACTS.ENABLE", False) + if is_true(artifacts_enabled): + from pr_agent.algo.artifacts import load_artifact_content + + get_logger().info("Artifact injection enabled, processing artifacts") + for key in get_settings(): + setting = get_settings().get(key) + if str(type(setting)) == "": + if key.lower() in ['pr_description', 'pr_code_suggestions', 'pr_reviewer']: + artifact_text = load_artifact_content(key.lower()) + if artifact_text: + if hasattr(setting, 'extra_instructions'): + extra_instructions = setting.extra_instructions + separator = "\n======\n\nCI Artifact Context:\n" + updated_instructions = ( + str(extra_instructions) + separator + artifact_text + if extra_instructions else artifact_text + ) + setting.extra_instructions = updated_instructions + get_logger().info(f"Injected artifact context into {key}") + except Exception as e: + get_logger().info(f"github action: failed to process artifacts: {e}") + # Handle pull request opened event if GITHUB_EVENT_NAME == "pull_request" or GITHUB_EVENT_NAME == "pull_request_target": action = event_payload.get("action") @@ -189,6 +223,49 @@ async def run_action(): else: await PRAgent().handle_request(url, body) + # Handle workflow_run event (triggered after another workflow completes, e.g. after a terraform plan) + elif GITHUB_EVENT_NAME == "workflow_run": + workflow_run = event_payload.get("workflow_run", {}) + if workflow_run.get("event") != "pull_request": + get_logger().info(f"Skipping workflow_run: originating event is '{workflow_run.get('event')}', not 'pull_request'") + return + + pull_requests = workflow_run.get("pull_requests", []) + if not pull_requests: + get_logger().info("Skipping workflow_run: no pull_requests found in payload (fork PRs are not supported)") + return + + pr_url = pull_requests[0].get("url") + if not pr_url: + get_logger().info("Skipping workflow_run: pull_requests[0] has no url") + return + + try: + apply_repo_settings(pr_url) + except Exception as e: + get_logger().info(f"github action: failed to apply repo settings for workflow_run: {e}") + + auto_review = get_setting_or_env("GITHUB_ACTION.AUTO_REVIEW", None) + if auto_review is None: + auto_review = get_setting_or_env("GITHUB_ACTION_CONFIG.AUTO_REVIEW", None) + auto_describe = get_setting_or_env("GITHUB_ACTION.AUTO_DESCRIBE", None) + if auto_describe is None: + auto_describe = get_setting_or_env("GITHUB_ACTION_CONFIG.AUTO_DESCRIBE", None) + auto_improve = get_setting_or_env("GITHUB_ACTION.AUTO_IMPROVE", None) + if auto_improve is None: + auto_improve = get_setting_or_env("GITHUB_ACTION_CONFIG.AUTO_IMPROVE", None) + + get_settings().config.is_auto_command = True + get_settings().pr_description.final_update_message = False + get_logger().info(f"Running auto actions for workflow_run: auto_describe={auto_describe}, auto_review={auto_review}, auto_improve={auto_improve}") + + if auto_describe is None or is_true(auto_describe): + await PRDescription(pr_url).run() + if auto_review is None or is_true(auto_review): + await PRReviewer(pr_url).run() + if auto_improve is None or is_true(auto_improve): + await PRCodeSuggestions(pr_url).run() + if __name__ == '__main__': asyncio.run(run_action()) diff --git a/tests/unittest/test_github_action_runner_core.py b/tests/unittest/test_github_action_runner_core.py index dc23a382e3..adb58e5395 100644 --- a/tests/unittest/test_github_action_runner_core.py +++ b/tests/unittest/test_github_action_runner_core.py @@ -186,3 +186,98 @@ async def test_issue_comment_from_user_is_processed(monkeypatch, tmp_path, resto await github_action_runner.run_action() assert handled == [("https://api.github.com/repos/org/repo/pulls/1", "/review")] + + +def _write_workflow_run_event(tmp_path, originating_event="pull_request", pull_requests=None): + if pull_requests is None: + pull_requests = [{"url": "https://api.github.com/repos/org/repo/pulls/42", "number": 42}] + event_path = tmp_path / "event.json" + event_path.write_text(json.dumps({ + "action": "completed", + "workflow_run": { + "id": 9999, + "event": originating_event, + "conclusion": "success", + "pull_requests": pull_requests, + }, + })) + return event_path + + +def _patch_workflow_run_deps(monkeypatch, runs): + monkeypatch.setattr(github_action_runner, "apply_repo_settings", lambda pr_url: None) + + class FakeTool: + name = "base" + + def __init__(self, pr_url): + self.pr_url = pr_url + + async def run(self): + runs.append((self.name, self.pr_url)) + + class FakeDescription(FakeTool): + name = "describe" + + class FakeReviewer(FakeTool): + name = "review" + + class FakeSuggestions(FakeTool): + name = "improve" + + monkeypatch.setattr(github_action_runner, "PRDescription", FakeDescription) + monkeypatch.setattr(github_action_runner, "PRReviewer", FakeReviewer) + monkeypatch.setattr(github_action_runner, "PRCodeSuggestions", FakeSuggestions) + + +@pytest.mark.asyncio +async def test_workflow_run_runs_auto_tools(monkeypatch, tmp_path, restore_github_settings): + runs = [] + _patch_workflow_run_deps(monkeypatch, runs) + monkeypatch.setenv("GITHUB_EVENT_NAME", "workflow_run") + monkeypatch.setenv("GITHUB_EVENT_PATH", str(_write_workflow_run_event(tmp_path))) + monkeypatch.setenv("GITHUB_TOKEN", "token") + + def fake_get_setting_or_env(key, default=None): + values = { + "GITHUB_ACTION.AUTO_DESCRIBE": True, + "GITHUB_ACTION.AUTO_REVIEW": True, + "GITHUB_ACTION.AUTO_IMPROVE": False, + "GITHUB_ACTION_CONFIG.ENABLE_OUTPUT": True, + } + return values.get(key, default) + + monkeypatch.setattr(github_action_runner, "get_setting_or_env", fake_get_setting_or_env) + + await github_action_runner.run_action() + + assert runs == [ + ("describe", "https://api.github.com/repos/org/repo/pulls/42"), + ("review", "https://api.github.com/repos/org/repo/pulls/42"), + ] + + +@pytest.mark.asyncio +async def test_workflow_run_skips_non_pull_request_origin(monkeypatch, tmp_path, restore_github_settings): + runs = [] + _patch_workflow_run_deps(monkeypatch, runs) + monkeypatch.setenv("GITHUB_EVENT_NAME", "workflow_run") + monkeypatch.setenv("GITHUB_EVENT_PATH", str(_write_workflow_run_event(tmp_path, originating_event="push"))) + monkeypatch.setenv("GITHUB_TOKEN", "token") + + await github_action_runner.run_action() + + assert runs == [] + + +@pytest.mark.asyncio +async def test_workflow_run_skips_when_pull_requests_empty(monkeypatch, tmp_path, restore_github_settings): + runs = [] + _patch_workflow_run_deps(monkeypatch, runs) + monkeypatch.setenv("GITHUB_EVENT_NAME", "workflow_run") + monkeypatch.setenv("GITHUB_EVENT_PATH", str(_write_workflow_run_event(tmp_path, pull_requests=[]))) + monkeypatch.setenv("GITHUB_TOKEN", "token") + + await github_action_runner.run_action() + + assert runs == [] From 732efc20c475fe18102bc5b6e4a89726bd52edbb Mon Sep 17 00:00:00 2001 From: MahmoudHaouachi2 Date: Thu, 2 Jul 2026 11:54:25 +0200 Subject: [PATCH 3/4] feat: add artifact_path/artifact_type inputs to action.yaml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Exposes two optional inputs for the GitHub Action: - artifact_path: path to a CI artifact file (relative to GITHUB_WORKSPACE or absolute). When set, artifact injection is automatically enabled. - artifact_type: parser to apply — generic (default), terraform_plan, or test_report. Both default to empty/generic so existing users see no behavior change. Also adds `set -e` to entrypoint.sh for fail-fast behavior. --- action.yaml | 12 ++++++++++++ github_action/entrypoint.sh | 1 + 2 files changed, 13 insertions(+) diff --git a/action.yaml b/action.yaml index 077ec41ff1..205fb7e227 100644 --- a/action.yaml +++ b/action.yaml @@ -3,6 +3,18 @@ description: 'Summarize, review and suggest improvements for pull requests' branding: icon: 'award' color: 'green' +inputs: + artifact_path: + description: 'Path to a CI artifact file (relative to GITHUB_WORKSPACE or absolute) to include as extra context in PR analysis. Leave empty to disable artifact injection.' + required: false + default: '' + artifact_type: + description: 'Parser type for the artifact content. Supported values: generic, terraform_plan, test_report' + required: false + default: 'generic' runs: using: 'docker' image: 'Dockerfile.github_action_dockerhub' + env: + ARTIFACT_PATH: ${{ inputs.artifact_path }} + ARTIFACT_TYPE: ${{ inputs.artifact_type }} diff --git a/github_action/entrypoint.sh b/github_action/entrypoint.sh index 4d493c7c79..1ed53d29a3 100644 --- a/github_action/entrypoint.sh +++ b/github_action/entrypoint.sh @@ -1,2 +1,3 @@ #!/bin/bash +set -e python /app/pr_agent/servers/github_action_runner.py From 19a1c0add0b1dd10cf1addb718472506d638ba86 Mon Sep 17 00:00:00 2001 From: MahmoudHaouachi2 Date: Sat, 4 Jul 2026 17:30:07 +0200 Subject: [PATCH 4/4] feat: simplify artifact context injection Replace parser registry (generic/terraform_plan/test_report) with a single format_artifact_content() and user-configurable artifact_instructions input. Fix path traversal security bug, full-file-read perf bug, and broad exception swallowing flagged in code review. Co-Authored-By: Claude Sonnet 4.6 --- action.yaml | 8 +- pr_agent/algo/artifacts.py | 114 +++++++----------- pr_agent/servers/github_action_runner.py | 51 ++++---- pr_agent/settings/configuration.toml | 4 +- tests/unittest/test_artifacts.py | 142 ++++++++++++----------- 5 files changed, 146 insertions(+), 173 deletions(-) diff --git a/action.yaml b/action.yaml index 205fb7e227..b84a27c7df 100644 --- a/action.yaml +++ b/action.yaml @@ -8,13 +8,13 @@ inputs: description: 'Path to a CI artifact file (relative to GITHUB_WORKSPACE or absolute) to include as extra context in PR analysis. Leave empty to disable artifact injection.' required: false default: '' - artifact_type: - description: 'Parser type for the artifact content. Supported values: generic, terraform_plan, test_report' + artifact_instructions: + description: 'Custom instructions telling the AI how to interpret the artifact. Leave empty for a sensible default.' required: false - default: 'generic' + default: '' runs: using: 'docker' image: 'Dockerfile.github_action_dockerhub' env: ARTIFACT_PATH: ${{ inputs.artifact_path }} - ARTIFACT_TYPE: ${{ inputs.artifact_type }} + ARTIFACT_INSTRUCTIONS: ${{ inputs.artifact_instructions }} diff --git a/pr_agent/algo/artifacts.py b/pr_agent/algo/artifacts.py index fc81a00023..b798f8f457 100644 --- a/pr_agent/algo/artifacts.py +++ b/pr_agent/algo/artifacts.py @@ -6,81 +6,45 @@ from pr_agent.log import get_logger -ARTIFACT_PARSERS = {} - - -def register_parser(artifact_type: str): - def decorator(func): - ARTIFACT_PARSERS[artifact_type] = func - return func - return decorator - - -@register_parser("generic") -def parse_generic(content: str, label: str) -> str: - header = f"CI Artifact: {label}" if label else "CI Artifact" - return ( - f"{header}\n" - f"=====\n" - f"{content}\n" - f"=====\n" - f"Consider this artifact as additional context when analyzing the PR. " - f"It was produced by a prior CI step." - ) - - -@register_parser("terraform_plan") -def parse_terraform_plan(content: str, label: str) -> str: - header = f"Terraform Plan Output: {label}" if label else "Terraform Plan Output" - return ( - f"{header}\n" - f"=====\n" - f"{content}\n" - f"=====\n" - f"This is the Terraform execution plan for the infrastructure changes in this PR. " - f"Use it to verify that the code changes produce the intended infrastructure modifications. " - f"Flag any unexpected resource deletions or risky changes." - ) - - -@register_parser("test_report") -def parse_test_report(content: str, label: str) -> str: - header = f"Test Results: {label}" if label else "Test Results" - return ( - f"{header}\n" - f"=====\n" - f"{content}\n" - f"=====\n" - f"These are the test results from the CI pipeline for this PR. " - f"If there are failures, correlate them with the code changes in the diff. " - f"Note any tests that are newly failing." - ) +DEFAULT_ARTIFACT_INSTRUCTIONS = ( + "Consider this CI artifact as additional context when analyzing the PR. " + "It was produced by a prior CI step." +) def resolve_artifact_path(path: str) -> Optional[Path]: if not path: return None + workspace = os.environ.get("GITHUB_WORKSPACE", "") + artifact_path = Path(path) if artifact_path.is_absolute(): - return artifact_path if artifact_path.is_file() else None + resolved = artifact_path.resolve() + elif workspace: + resolved = (Path(workspace) / artifact_path).resolve() + else: + resolved = artifact_path.resolve() - workspace = os.environ.get("GITHUB_WORKSPACE", "") if workspace: - resolved = Path(workspace) / artifact_path - if resolved.is_file(): - return resolved - - resolved = artifact_path.resolve() - if resolved.is_file(): - return resolved + workspace_resolved = Path(workspace).resolve() + under_workspace = ( + str(resolved).startswith(str(workspace_resolved) + os.sep) + or resolved == workspace_resolved + ) + if not under_workspace: + get_logger().warning( + f"Artifact path '{path}' resolves outside GITHUB_WORKSPACE: {resolved}" + ) + return None - return None + return resolved if resolved.is_file() else None def _read_and_truncate(path: Path, max_size: int) -> str: try: - content = path.read_text(encoding="utf-8", errors="replace") + with open(path, "r", encoding="utf-8", errors="replace") as f: + content = f.read(max_size + 1) except (OSError, IOError) as e: get_logger().warning(f"Failed to read artifact file {path}: {e}") return "" @@ -90,7 +54,19 @@ def _read_and_truncate(path: Path, max_size: int) -> str: return content -def load_artifact_content(tool_key: str) -> str: +def format_artifact_content(content: str, label: str, instructions: str) -> str: + header = f"CI Artifact: {label}" if label else "CI Artifact" + instructions = instructions or DEFAULT_ARTIFACT_INSTRUCTIONS + return ( + f"{header}\n" + f"=====\n" + f"{content}\n" + f"=====\n" + f"{instructions}" + ) + + +def load_artifact() -> str: try: artifacts_settings = get_settings().get("ARTIFACTS", {}) except AttributeError: @@ -99,23 +75,17 @@ def load_artifact_content(tool_key: str) -> str: if not artifacts_settings: return "" - enabled = artifacts_settings.get("enable", False) - if not enabled: + if not artifacts_settings.get("enable", False): return "" artifact_path_str = artifacts_settings.get("artifact_path", "") if not artifact_path_str: return "" - target_tools = artifacts_settings.get("target_tools", - ["pr_reviewer", "pr_description", "pr_code_suggestions"]) - if tool_key not in target_tools: - return "" - artifact_path = resolve_artifact_path(artifact_path_str) if not artifact_path: get_logger().warning( - f"Artifact file not found: '{artifact_path_str}' " + f"Artifact file not found or path rejected: '{artifact_path_str}' " f"(GITHUB_WORKSPACE={os.environ.get('GITHUB_WORKSPACE', 'not set')})" ) return "" @@ -125,8 +95,6 @@ def load_artifact_content(tool_key: str) -> str: if not content: return "" - artifact_type = artifacts_settings.get("artifact_type", "generic") label = artifacts_settings.get("artifact_label", "") or artifact_path.name - - parser = ARTIFACT_PARSERS.get(artifact_type, ARTIFACT_PARSERS["generic"]) - return parser(content, label) + instructions = artifacts_settings.get("artifact_instructions", "") + return format_artifact_content(content, label, instructions) diff --git a/pr_agent/servers/github_action_runner.py b/pr_agent/servers/github_action_runner.py index 38c5599e3e..5cd5866937 100644 --- a/pr_agent/servers/github_action_runner.py +++ b/pr_agent/servers/github_action_runner.py @@ -109,36 +109,37 @@ async def run_action(): # Inject artifact content into extra_instructions for configured tools try: - ARTIFACT_PATH = os.environ.get('ARTIFACT_PATH') or os.environ.get('PR_AGENT_ARTIFACT_PATH') - ARTIFACT_TYPE = os.environ.get('ARTIFACT_TYPE') or os.environ.get('PR_AGENT_ARTIFACT_TYPE') - if ARTIFACT_PATH: + artifact_path_env = os.environ.get('ARTIFACT_PATH') or os.environ.get('PR_AGENT_ARTIFACT_PATH') + artifact_instructions_env = os.environ.get('ARTIFACT_INSTRUCTIONS') or os.environ.get('PR_AGENT_ARTIFACT_INSTRUCTIONS') + if artifact_path_env: get_settings().set("ARTIFACTS.ENABLE", True) - get_settings().set("ARTIFACTS.ARTIFACT_PATH", ARTIFACT_PATH) - if ARTIFACT_TYPE: - get_settings().set("ARTIFACTS.ARTIFACT_TYPE", ARTIFACT_TYPE) + get_settings().set("ARTIFACTS.ARTIFACT_PATH", artifact_path_env) + if artifact_instructions_env: + get_settings().set("ARTIFACTS.ARTIFACT_INSTRUCTIONS", artifact_instructions_env) artifacts_enabled = get_settings().get("ARTIFACTS.ENABLE", False) if is_true(artifacts_enabled): - from pr_agent.algo.artifacts import load_artifact_content - - get_logger().info("Artifact injection enabled, processing artifacts") - for key in get_settings(): - setting = get_settings().get(key) - if str(type(setting)) == "": - if key.lower() in ['pr_description', 'pr_code_suggestions', 'pr_reviewer']: - artifact_text = load_artifact_content(key.lower()) - if artifact_text: - if hasattr(setting, 'extra_instructions'): - extra_instructions = setting.extra_instructions - separator = "\n======\n\nCI Artifact Context:\n" - updated_instructions = ( - str(extra_instructions) + separator + artifact_text - if extra_instructions else artifact_text - ) - setting.extra_instructions = updated_instructions - get_logger().info(f"Injected artifact context into {key}") + from pr_agent.algo.artifacts import load_artifact + + artifact_text = load_artifact() + if artifact_text: + target_tools = get_settings().get( + "ARTIFACTS.TARGET_TOOLS", + ["pr_reviewer", "pr_description", "pr_code_suggestions"] + ) + separator = "\n======\n\n" + for key in get_settings(): + setting = get_settings().get(key) + if str(type(setting)) == "": + if key.lower() in target_tools and hasattr(setting, 'extra_instructions'): + extra_instructions = setting.extra_instructions + setting.extra_instructions = ( + str(extra_instructions) + separator + artifact_text + if extra_instructions else artifact_text + ) + get_logger().info(f"Injected artifact context into tools: {target_tools}") except Exception as e: - get_logger().info(f"github action: failed to process artifacts: {e}") + get_logger().warning(f"github action: failed to process artifacts: {e}", exc_info=True) # Handle pull request opened event if GITHUB_EVENT_NAME == "pull_request" or GITHUB_EVENT_NAME == "pull_request_target": diff --git a/pr_agent/settings/configuration.toml b/pr_agent/settings/configuration.toml index 2f9ae0fdf4..6deff76e00 100644 --- a/pr_agent/settings/configuration.toml +++ b/pr_agent/settings/configuration.toml @@ -373,8 +373,8 @@ max_patterns = 5 # max number of patterns to be detected enable = false # File path to the artifact (relative to GITHUB_WORKSPACE, or absolute) artifact_path = "" -# Parser type: "generic", "terraform_plan", "test_report" -artifact_type = "generic" +# Custom instructions appended after the artifact content (leave empty for a sensible default) +artifact_instructions = "" # Label shown to the AI — defaults to the filename when empty artifact_label = "" # Which tools receive artifact context diff --git a/tests/unittest/test_artifacts.py b/tests/unittest/test_artifacts.py index 0b3fb0109a..9ae9679e65 100644 --- a/tests/unittest/test_artifacts.py +++ b/tests/unittest/test_artifacts.py @@ -6,13 +6,11 @@ import pytest from pr_agent.algo.artifacts import ( + DEFAULT_ARTIFACT_INSTRUCTIONS, + format_artifact_content, + load_artifact, resolve_artifact_path, - load_artifact_content, _read_and_truncate, - ARTIFACT_PARSERS, - parse_generic, - parse_terraform_plan, - parse_test_report, ) @@ -24,10 +22,12 @@ def test_empty_path_returns_none(self): def test_absolute_path_existing_file(self, tmp_path): f = tmp_path / "plan.txt" f.write_text("content") - assert resolve_artifact_path(str(f)) == f + with patch.dict(os.environ, {"GITHUB_WORKSPACE": str(tmp_path)}): + assert resolve_artifact_path(str(f)) == f.resolve() - def test_absolute_path_missing_file(self): - assert resolve_artifact_path("/nonexistent/path/file.txt") is None + def test_absolute_path_missing_file(self, tmp_path): + with patch.dict(os.environ, {"GITHUB_WORKSPACE": str(tmp_path)}): + assert resolve_artifact_path(str(tmp_path / "nonexistent.txt")) is None def test_relative_path_with_github_workspace(self, tmp_path): f = tmp_path / "output" / "plan.txt" @@ -36,7 +36,7 @@ def test_relative_path_with_github_workspace(self, tmp_path): with patch.dict(os.environ, {"GITHUB_WORKSPACE": str(tmp_path)}): result = resolve_artifact_path("output/plan.txt") - assert result == f + assert result == f.resolve() def test_relative_path_without_workspace_falls_back_to_cwd(self, tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) @@ -46,12 +46,35 @@ def test_relative_path_without_workspace_falls_back_to_cwd(self, tmp_path, monke with patch.dict(os.environ, {}, clear=True): os.environ.pop("GITHUB_WORKSPACE", None) result = resolve_artifact_path("plan.txt") - assert result == f + assert result == f.resolve() def test_relative_path_not_found_returns_none(self): with patch.dict(os.environ, {"GITHUB_WORKSPACE": "/tmp/nonexistent_workspace_xyz"}): assert resolve_artifact_path("missing.txt") is None + def test_rejects_path_traversal_above_workspace(self, tmp_path): + outside = tmp_path / "outside" / "secret.txt" + outside.parent.mkdir(parents=True) + outside.write_text("secret") + + workspace = tmp_path / "workspace" + workspace.mkdir() + + with patch.dict(os.environ, {"GITHUB_WORKSPACE": str(workspace)}): + result = resolve_artifact_path("../outside/secret.txt") + assert result is None + + def test_rejects_absolute_path_outside_workspace(self, tmp_path): + outside = tmp_path / "outside.txt" + outside.write_text("secret") + + workspace = tmp_path / "workspace" + workspace.mkdir() + + with patch.dict(os.environ, {"GITHUB_WORKSPACE": str(workspace)}): + result = resolve_artifact_path(str(outside)) + assert result is None + class TestReadAndTruncate: def test_reads_file_content(self, tmp_path): @@ -63,7 +86,6 @@ def test_truncates_large_content(self, tmp_path): f = tmp_path / "big.txt" f.write_text("x" * 1000) result = _read_and_truncate(f, 100) - assert len(result) < 1000 assert result.startswith("x" * 100) assert "[... content truncated due to size limit ...]" in result @@ -71,78 +93,57 @@ def test_returns_empty_on_read_error(self, tmp_path): missing = tmp_path / "no_such_file.txt" assert _read_and_truncate(missing, 50000) == "" + def test_does_not_read_entire_large_file(self, tmp_path): + f = tmp_path / "huge.txt" + f.write_text("x" * 1_000_000) + result = _read_and_truncate(f, 100) + # Should contain exactly 100 chars of content + truncation marker + assert len(result) < 200 + -class TestParsers: - def test_parser_registry_has_expected_types(self): - assert "generic" in ARTIFACT_PARSERS - assert "terraform_plan" in ARTIFACT_PARSERS - assert "test_report" in ARTIFACT_PARSERS +class TestFormatArtifactContent: + def test_with_label_and_custom_instructions(self): + result = format_artifact_content("plan output", "plan.txt", "Check for deletions.") + assert "CI Artifact: plan.txt" in result + assert "plan output" in result + assert "Check for deletions." in result - def test_generic_parser(self): - result = parse_generic("some output", "build.log") + def test_with_label_uses_default_instructions_when_empty(self): + result = format_artifact_content("some output", "build.log", "") assert "CI Artifact: build.log" in result - assert "some output" in result - assert "additional context" in result + assert DEFAULT_ARTIFACT_INSTRUCTIONS in result - def test_generic_parser_no_label(self): - result = parse_generic("output", "") + def test_without_label(self): + result = format_artifact_content("output", "", "") assert "CI Artifact\n" in result + assert DEFAULT_ARTIFACT_INSTRUCTIONS in result - def test_terraform_plan_parser(self): - result = parse_terraform_plan("+ aws_instance.web", "plan.txt") - assert "Terraform Plan Output: plan.txt" in result - assert "+ aws_instance.web" in result - assert "infrastructure" in result - - def test_test_report_parser(self): - result = parse_test_report("FAILED: test_login", "results.xml") - assert "Test Results: results.xml" in result - assert "FAILED: test_login" in result - assert "failures" in result - - -class TestLoadArtifactContent: - def _mock_settings(self, artifacts_config): - settings = MagicMock() - settings.get.side_effect = lambda key, default=None: ( - artifacts_config if key == "ARTIFACTS" else default - ) - return settings +class TestLoadArtifact: def test_returns_empty_when_no_config(self): with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: mock_gs.return_value.get.return_value = {} - assert load_artifact_content("pr_reviewer") == "" + assert load_artifact() == "" def test_returns_empty_when_disabled(self): with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: mock_gs.return_value.get.return_value = {"enable": False, "artifact_path": "plan.txt"} - assert load_artifact_content("pr_reviewer") == "" + assert load_artifact() == "" def test_returns_empty_when_no_path(self): with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: mock_gs.return_value.get.return_value = {"enable": True, "artifact_path": ""} - assert load_artifact_content("pr_reviewer") == "" - - def test_returns_empty_when_tool_not_targeted(self): - with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: - mock_gs.return_value.get.return_value = { - "enable": True, - "artifact_path": "plan.txt", - "target_tools": ["pr_reviewer"], - } - assert load_artifact_content("pr_description") == "" + assert load_artifact() == "" def test_returns_empty_when_file_not_found(self): with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: mock_gs.return_value.get.return_value = { "enable": True, "artifact_path": "/nonexistent/file.txt", - "target_tools": ["pr_reviewer"], } - assert load_artifact_content("pr_reviewer") == "" + assert load_artifact() == "" - def test_loads_and_formats_artifact(self, tmp_path): + def test_loads_and_formats_with_default_instructions(self, tmp_path): f = tmp_path / "plan.txt" f.write_text("+ aws_s3_bucket.data") @@ -150,27 +151,30 @@ def test_loads_and_formats_artifact(self, tmp_path): mock_gs.return_value.get.return_value = { "enable": True, "artifact_path": str(f), - "artifact_type": "terraform_plan", + "artifact_instructions": "", "artifact_label": "", - "target_tools": ["pr_reviewer", "pr_description", "pr_code_suggestions"], "max_artifact_size": 50000, } - result = load_artifact_content("pr_reviewer") - assert "Terraform Plan Output: plan.txt" in result + with patch.dict(os.environ, {"GITHUB_WORKSPACE": str(tmp_path)}): + result = load_artifact() + assert "CI Artifact: plan.txt" in result assert "+ aws_s3_bucket.data" in result + assert DEFAULT_ARTIFACT_INSTRUCTIONS in result - def test_falls_back_to_generic_for_unknown_type(self, tmp_path): - f = tmp_path / "output.log" - f.write_text("some output") + def test_loads_and_formats_with_custom_instructions(self, tmp_path): + f = tmp_path / "results.xml" + f.write_text("FAILED: test_login") with patch("pr_agent.algo.artifacts.get_settings") as mock_gs: mock_gs.return_value.get.return_value = { "enable": True, "artifact_path": str(f), - "artifact_type": "unknown_type", - "artifact_label": "", - "target_tools": ["pr_reviewer"], + "artifact_instructions": "Flag any test failures.", + "artifact_label": "Test Results", "max_artifact_size": 50000, } - result = load_artifact_content("pr_reviewer") - assert "CI Artifact: output.log" in result + with patch.dict(os.environ, {"GITHUB_WORKSPACE": str(tmp_path)}): + result = load_artifact() + assert "CI Artifact: Test Results" in result + assert "FAILED: test_login" in result + assert "Flag any test failures." in result