From 3bb838d10c4ed0bbd5e38bdd8dd438cbe8337dc7 Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Fri, 3 Jul 2026 22:09:39 +0200 Subject: [PATCH 1/2] [FLINK-40069][tests] Wait for OTel collector readiness in OtelTestContainer The default HostPortWaitStrategy reports the shell-less collector image as ready before its OTLP receiver accepts connections, so the test's first export could fail against a not-yet-listening receiver. Wait for the collector's readiness log line, which is only emitted after all components including the receivers have started, with a bounded startup timeout. Generated-by: Claude Opus 4.8 (1M context) --- .../apache/flink/metrics/otel/OtelTestContainer.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OtelTestContainer.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OtelTestContainer.java index d246d42fc99443..602a14be89f405 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OtelTestContainer.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OtelTestContainer.java @@ -20,6 +20,7 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.Base58; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile; @@ -27,6 +28,7 @@ import java.io.File; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.Locale; /** {@link OtelTestContainer} provides an {@code Otel} test instance. */ @@ -53,6 +55,14 @@ public OtelTestContainer(Path outputDir) { new File(outputDir.toFile(), LOG_FILE).getAbsolutePath(), 755), getOutputLogPath().toString()); withCommand("--config", "otel-config.yaml"); + // The default HostPortWaitStrategy reports this shell-less image as ready before the OTLP + // receiver accepts connections; the readiness log line is only emitted after all + // components, including the receivers, have started. + waitingFor( + Wait.forLogMessage( + ".*Everything is ready\\. Begin running and processing data\\..*", + 1) + .withStartupTimeout(Duration.ofMinutes(1))); } public Path getOutputLogPath() { From 82eb1cd90f4044f1b9ebebbe8d74875190dd16b6 Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Fri, 3 Jul 2026 22:09:39 +0200 Subject: [PATCH 2/2] [FLINK-40069][tests] Re-export metrics inside the retry loop The metric protocol test exported exactly once before polling the collector output file, so any single failed export (racing collector startup, or a transient HTTP 404) left the file empty and doomed the whole retry budget. Re-invoke report() inside each eventually() iteration via a pre-attempt hook; the assertion reads only the last line, so repeated exports are safe. Generated-by: Claude Opus 4.8 (1M context) --- .../OpenTelemetryMetricReporterProtocolTest.java | 6 ++++-- .../flink/metrics/otel/OpenTelemetryTestBase.java | 12 ++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterProtocolTest.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterProtocolTest.java index 61da17ecf61bf6..4a9a1f2de3aa0a 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterProtocolTest.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterProtocolTest.java @@ -57,13 +57,15 @@ protected void setupAndReport(MetricConfig config) { reporter.open(config); SimpleCounter counter = new SimpleCounter(); reporter.notifyOfAddedMetric(counter, TEST_METRIC_NAME, metricGroup); - reporter.report(); - reporter.waitForLastReportToComplete(); } @Override protected void assertReported() throws Exception { eventuallyConsumeJson( + () -> { + reporter.report(); + reporter.waitForLastReportToComplete(); + }, json -> assertThat(extractMetricNames(json)).contains(EXPECTED_METRIC_NAME)); } diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java index 7fba6b23e6a260..741266f8d595f5 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java @@ -90,8 +90,20 @@ public static MetricConfig createMetricConfig() { public static void eventuallyConsumeJson(ThrowingConsumer jsonConsumer) throws Exception { + eventuallyConsumeJson(() -> {}, jsonConsumer); + } + + /** + * Runs {@code beforeAttempt} at the start of every retry iteration, so on-demand reporters can + * re-export instead of letting a single failed export doom the whole retry budget. + */ + public static void eventuallyConsumeJson( + ThrowingRunnable beforeAttempt, + ThrowingConsumer jsonConsumer) + throws Exception { eventually( () -> { + beforeAttempt.run(); // otel-collector dumps every report in a new line, so in order to re-use the // same collector across multiple tests, let's read only the last line getOtelContainer()