diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncBase.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncBase.java index c0f292e1e2d72..4369f29193672 100644 --- a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncBase.java +++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncBase.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -34,12 +34,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; abstract class LogWriterRollOverUpdateAsyncBase extends LogTestBase { private static final Logger LOG = LoggerFactory.getLogger(LogWriterRollOverUpdateAsyncBase.class); protected BlockingQueue entryInfos; - protected CountDownLatch latch = new CountDownLatch(1); LogWriter logWriter; File reportFile; @@ -79,41 +79,62 @@ protected void runTest(int queueCapacity) throws IOException, InterruptedExcepti entryInfos = new ArrayBlockingQueue<>(queueCapacity); final ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + final Future generateTask = executorService.submit(this::asyncGenerate); - final Future generateTask = executorService.submit(this::asyncGenerate); + final Future updateTask = executorService.submit(this::markRecordsAsCommitted); - final Future updateTask = executorService.submit(this::markRecordsAsCommitted); + executorService.shutdown(); - Assertions.assertTrue(latch.await(1, TimeUnit.MINUTES), "Failed to generate records within 1 minute"); + // Wait for both tasks to complete using Awaitility instead of only + // waiting for the generate task via a latch, which left a race window + // where the update task could still be running when verification began. + await().atMost(2, TimeUnit.MINUTES) + .until(executorService::isTerminated); - try (LogReader reader = new LogReader(reportFile, (int) RECORD_COUNT * 100)) { + // Propagate any exceptions from the tasks + try { + generateTask.get(); + updateTask.get(); + } catch (ExecutionException e) { + Assertions.fail("Task failed: " + e.getCause().getMessage(), e.getCause()); + } - Header fileHeader = reader.getHeader(); - assertEquals(Header.FORMAT_NAME, fileHeader.getFormatName().trim()); - assertEquals(Header.CURRENT_FILE_VERSION, fileHeader.getFileVersion()); + // Flush to ensure all data is persisted to disk before verification + logWriter.flush(); - int count = 0; - PersistedLogEntry entry = reader.readEntry(); - while (entry != null) { - LOG.debug("Read state: {}", entry.getEntryState()); - assertEquals(LogEntry.EntryState.PROCESSED, entry.getEntryState()); - assertEquals(0, entry.getKeyMetadata()); - assertEquals(0, entry.getValueMetadata()); + try (LogReader reader = new LogReader(reportFile, (int) RECORD_COUNT * 100)) { - String key = new String(entry.getKey()); - LOG.debug("Read record: {}", key); - Assertions.assertTrue(key.startsWith("record-")); + Header fileHeader = reader.getHeader(); + assertEquals(Header.FORMAT_NAME, fileHeader.getFormatName().trim()); + assertEquals(Header.CURRENT_FILE_VERSION, fileHeader.getFileVersion()); - ByteBuffer buffer = ByteBuffer.wrap(entry.getValue()); + int count = 0; + PersistedLogEntry entry = reader.readEntry(); + while (entry != null) { + LOG.debug("Read state: {}", entry.getEntryState()); + assertEquals(LogEntry.EntryState.PROCESSED, entry.getEntryState()); + assertEquals(0, entry.getKeyMetadata()); + assertEquals(0, entry.getValueMetadata()); - Assertions.assertTrue(buffer.getLong() > 0); + String key = new String(entry.getKey()); + LOG.debug("Read record: {}", key); + Assertions.assertTrue(key.startsWith("record-")); - count++; + ByteBuffer buffer = ByteBuffer.wrap(entry.getValue()); - entry = reader.readEntry(); - } + Assertions.assertTrue(buffer.getLong() > 0); - Assertions.assertEquals(100, count, "The number of records don't match"); + count++; + + entry = reader.readEntry(); + } + + Assertions.assertEquals(100, count, "The number of records don't match"); + } + } finally { + executorService.shutdownNow(); + executorService.awaitTermination(5, TimeUnit.SECONDS); } } diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncTest.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncTest.java index 60ebd79194082..3d1f792df6f91 100644 --- a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncTest.java +++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncTest.java @@ -38,8 +38,6 @@ protected void asyncGenerate() { } catch (IOException e) { LOG.error("Failed to generate records: {}", e.getMessage(), e); throw new RuntimeException(e); - } finally { - latch.countDown(); } } diff --git a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncWithContentionTest.java b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncWithContentionTest.java index 0e19adee0438c..ad5c78a35d02c 100644 --- a/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncWithContentionTest.java +++ b/components/camel-wal/src/test/java/org/apache/camel/component/wal/LogWriterRollOverUpdateAsyncWithContentionTest.java @@ -44,8 +44,6 @@ protected void asyncGenerate() { } catch (IOException e) { LOG.error("Failed to generate records: {}", e.getMessage(), e); throw new RuntimeException(e); - } finally { - latch.countDown(); } }