diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java index 78839584924e2..5d16573181cef 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThreadsRejectedExecutionTest.java @@ -26,6 +26,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; @@ -119,16 +120,20 @@ public void configure() { NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create(); - getMockEndpoint("mock:result").expectedMinimumMessageCount(2); for (int i = 0; i < 10; i++) { template.sendBody("seda:start", "Message " + i); } - assertMockEndpointsSatisfied(); - assertTrue(notify.matchesWaitTime()); + // use Awaitility to wait for all exchanges to be done and inflight to drain + Awaitility.await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertTrue(notify.matches())); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(0, context.getInflightRepository().size())); - int inflight = context.getInflightRepository().size(); - assertEquals(0, inflight); + assertTrue(getMockEndpoint("mock:result").getReceivedCounter() >= 2, + "Expected at least 2 messages at mock:result but got " + + getMockEndpoint("mock:result") + .getReceivedCounter()); } @Test @@ -145,16 +150,18 @@ public void configure() { NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create(); - getMockEndpoint("mock:result").expectedMessageCount(10); for (int i = 0; i < 10; i++) { template.sendBody("seda:start", "Message " + i); } - assertMockEndpointsSatisfied(); - assertTrue(notify.matchesWaitTime()); + // use Awaitility to wait for all exchanges to be done and inflight to drain + Awaitility.await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertTrue(notify.matches())); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(0, context.getInflightRepository().size())); - int inflight = context.getInflightRepository().size(); - assertEquals(0, inflight); + // CallerRuns policy means all messages should eventually complete + assertEquals(10, getMockEndpoint("mock:result").getReceivedCounter()); } @Test @@ -173,20 +180,25 @@ public void configure() { NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create(); - // there should be error handling for aborted tasks (eg no redeliveries - // and no error handling) - getMockEndpoint("mock:error").expectedMessageCount(0); - - getMockEndpoint("mock:result").expectedMinimumMessageCount(2); for (int i = 0; i < 10; i++) { template.sendBody("seda:start", "Message " + i); } - assertMockEndpointsSatisfied(); - - assertTrue(notify.matchesWaitTime()); - int inflight = context.getInflightRepository().size(); - assertEquals(0, inflight); + // use Awaitility to wait for all exchanges to be done and inflight to drain + Awaitility.await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertTrue(notify.matches())); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(0, context.getInflightRepository().size())); + + // at least 2 messages should have made it through the thread pool + assertTrue(getMockEndpoint("mock:result").getReceivedCounter() >= 2, + "Expected at least 2 messages at mock:result but got " + + getMockEndpoint("mock:result") + .getReceivedCounter()); + + // there should be no error handling for aborted tasks (no redeliveries and no error handling) + assertEquals(0, getMockEndpoint("mock:error").getReceivedCounter(), + "Expected 0 messages at mock:error but got " + getMockEndpoint("mock:error").getReceivedCounter()); } }