diff --git a/core/src/main/java/com/taobao/arthas/core/command/basic1000/Base64Command.java b/core/src/main/java/com/taobao/arthas/core/command/basic1000/Base64Command.java index 16f344cd0ac..23778b9dd12 100644 --- a/core/src/main/java/com/taobao/arthas/core/command/basic1000/Base64Command.java +++ b/core/src/main/java/com/taobao/arthas/core/command/basic1000/Base64Command.java @@ -126,8 +126,9 @@ public void process(CommandProcess process) { if (this.output != null) { int readableBytes = convertResult.readableBytes(); - OutputStream out = new FileOutputStream(this.output); - convertResult.readBytes(out, readableBytes); + try (OutputStream out = new FileOutputStream(this.output)) { + convertResult.readBytes(out, readableBytes); + } process.appendResult(new Base64Model(null)); } else { String base64Str = convertResult.toString(CharsetUtil.UTF_8); diff --git a/core/src/main/java/com/taobao/arthas/core/command/basic1000/KeymapCommand.java b/core/src/main/java/com/taobao/arthas/core/command/basic1000/KeymapCommand.java index 0d19640ef0c..28f740af26d 100644 --- a/core/src/main/java/com/taobao/arthas/core/command/basic1000/KeymapCommand.java +++ b/core/src/main/java/com/taobao/arthas/core/command/basic1000/KeymapCommand.java @@ -47,7 +47,7 @@ public void process(CommandProcess process) { label("Description").style(Decoration.bold.bold()), label("Name").style(Decoration.bold.bold())); - BufferedReader br = new BufferedReader(new InputStreamReader(inputrc)); + try (BufferedReader br = new BufferedReader(new InputStreamReader(inputrc))) { String line; while ((line = br.readLine()) != null) { line = line.trim(); diff --git a/core/src/main/java/com/taobao/arthas/core/command/monitor200/ThreadContentionCommand.java b/core/src/main/java/com/taobao/arthas/core/command/monitor200/ThreadContentionCommand.java new file mode 100644 index 00000000000..7acd5e9e5e0 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/monitor200/ThreadContentionCommand.java @@ -0,0 +1,321 @@ +package com.taobao.arthas.core.command.monitor200; + +import com.taobao.arthas.core.command.Constants; +import com.taobao.arthas.core.shell.command.AnnotatedCommand; +import com.taobao.arthas.core.shell.command.CommandProcess; +import com.taobao.arthas.core.shell.command.ExitStatus; +import com.taobao.arthas.core.util.StringUtils; +import com.taobao.arthas.core.util.ThreadUtil; +import com.taobao.middleware.cli.annotations.Description; +import com.taobao.middleware.cli.annotations.Name; +import com.taobao.middleware.cli.annotations.Option; +import com.taobao.middleware.cli.annotations.Summary; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.*; +import java.util.stream.Collectors; + +/** + * Command to analyze thread lock contention, detect deadlock candidates, + * and visualize thread wait chains. + * + * @author spatchava + */ +@Name("thread-contention") +@Summary("Analyze thread lock contention and detect potential deadlocks") +@Description(Constants.EXAMPLE + + " thread-contention\n" + + " thread-contention --top 5\n" + + " thread-contention --deadlock\n" + + " thread-contention --interval 1000\n" + + Constants.WIKI + Constants.WIKI_HOME + "thread-contention") +public class ThreadContentionCommand extends AnnotatedCommand { + + private static final String ANSI_RESET = "\033[0m"; + private static final String ANSI_RED = "\033[31m"; + private static final String ANSI_YELLOW = "\033[33m"; + private static final String ANSI_GREEN = "\033[32m"; + private static final String ANSI_CYAN = "\033[36m"; + private static final String ANSI_BOLD = "\033[1m"; + + private int topN = 10; + private boolean deadlockOnly = false; + private long samplingInterval = 500; + + @Option(shortName = "n", longName = "top") + @Description("Show top N contended locks (default: 10)") + public void setTopN(int topN) { + this.topN = topN; + } + + @Option(shortName = "d", longName = "deadlock", flag = true) + @Description("Check for deadlocks only") + public void setDeadlockOnly(boolean deadlockOnly) { + this.deadlockOnly = deadlockOnly; + } + + @Option(shortName = "i", longName = "interval") + @Description("Sampling interval in milliseconds (default: 500)") + public void setSamplingInterval(long samplingInterval) { + this.samplingInterval = samplingInterval; + } + + @Override + public void process(CommandProcess process) { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] threadInfos = null; + // Capture original monitoring state so finally can restore it; otherwise we + // could leave per-JVM contention monitoring permanently enabled, which adds + // measurable overhead to every monitorenter/monitorexit. + boolean originalContentionEnabled = threadMXBean.isThreadContentionMonitoringEnabled(); + boolean weEnabledContention = false; + try { + if (!originalContentionEnabled) { + threadMXBean.setThreadContentionMonitoringEnabled(true); + weEnabledContention = true; + } + + // Allow contention data to accumulate over the sampling interval + try { + Thread.sleep(samplingInterval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + process.end(-1, "Interrupted during sampling"); + return; + } + + threadInfos = threadMXBean.dumpAllThreads(true, true); + + if (threadInfos == null || threadInfos.length == 0) { + process.write("No thread information available.\n"); + process.end(); + return; + } + + StringBuilder sb = new StringBuilder(4096); + + // Always check for deadlocks + appendDeadlockAnalysis(sb, threadMXBean, threadInfos); + + if (!deadlockOnly) { + appendContentionAnalysis(sb, threadInfos); + appendWaitChainVisualization(sb, threadInfos); + } + + process.write(sb.toString()); + process.end(); + + } catch (Exception e) { + process.end(-1, "Error analyzing thread contention: " + e.getMessage()); + } finally { + // Restore monitoring state if we changed it. Without this we leave + // contention monitoring permanently enabled when the user runs the + // command on a JVM that had it disabled, defeating the perf-cost + // intent of the JVM's default-off setting. + if (weEnabledContention) { + try { + threadMXBean.setThreadContentionMonitoringEnabled(false); + } catch (Exception ignore) { + // best-effort restore; do not mask the original error + } + } + threadInfos = null; + } + } + + private void appendDeadlockAnalysis(StringBuilder sb, ThreadMXBean threadMXBean, + ThreadInfo[] threadInfos) { + sb.append(ANSI_BOLD).append("\n=== Deadlock Analysis ===\n").append(ANSI_RESET); + + long[] deadlockedThreadIds = threadMXBean.findDeadlockedThreads(); + if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) { + sb.append(ANSI_RED).append("DEADLOCK DETECTED! ") + .append(deadlockedThreadIds.length).append(" threads involved:\n").append(ANSI_RESET); + + ThreadInfo[] deadlockedInfos = threadMXBean.getThreadInfo(deadlockedThreadIds, true, true); + for (ThreadInfo info : deadlockedInfos) { + if (info != null) { + sb.append(ANSI_RED).append(" Thread: ").append(info.getThreadName()) + .append(" (id=").append(info.getThreadId()).append(")") + .append(" - ").append(info.getThreadState()) + .append(ANSI_RESET).append("\n"); + if (info.getLockName() != null) { + sb.append(" Waiting on: ").append(info.getLockName()).append("\n"); + } + if (info.getLockOwnerName() != null) { + sb.append(" Held by: ").append(info.getLockOwnerName()) + .append(" (id=").append(info.getLockOwnerId()).append(")\n"); + } + } + } + } else { + // Check for potential deadlock candidates + // dumpAllThreads can return null entries for threads that died between + // the snapshot call and the data fill - filter them before deref. + List blockedThreads = Arrays.stream(threadInfos) + .filter(java.util.Objects::nonNull) + .filter(t -> t.getThreadState() == Thread.State.BLOCKED) + .collect(Collectors.toList()); + + if (blockedThreads.isEmpty()) { + sb.append(ANSI_GREEN).append("No deadlocks or deadlock candidates detected.\n") + .append(ANSI_RESET); + } else { + sb.append(ANSI_YELLOW).append("No deadlocks, but ") + .append(blockedThreads.size()).append(" blocked thread(s) found (potential candidates):\n") + .append(ANSI_RESET); + for (ThreadInfo blocked : blockedThreads) { + sb.append(" ").append(blocked.getThreadName()) + .append(" blocked on ").append(blocked.getLockName()) + .append(" owned by ").append(blocked.getLockOwnerName()).append("\n"); + } + } + } + sb.append("\n"); + } + + private void appendContentionAnalysis(StringBuilder sb, ThreadInfo[] threadInfos) { + sb.append(ANSI_BOLD).append("=== Lock Contention Analysis ===\n").append(ANSI_RESET); + + // Group threads by the lock they are waiting on + Map> lockWaiters = new LinkedHashMap<>(); + for (ThreadInfo info : threadInfos) { + if (info == null) { + continue; // dumpAllThreads may return null entries for dead threads + } + if (info.getLockName() != null) { + lockWaiters.computeIfAbsent(info.getLockName(), k -> new ArrayList<>()).add(info); + } + } + + if (lockWaiters.isEmpty()) { + sb.append(ANSI_GREEN).append("No lock contention detected.\n").append(ANSI_RESET); + sb.append("\n"); + return; + } + + // Sort by contention count (most contended first) and limit to topN + List>> sorted = lockWaiters.entrySet().stream() + .sorted((a, b) -> Integer.compare(b.getValue().size(), a.getValue().size())) + .limit(topN) + .collect(Collectors.toList()); + + // Table header + sb.append(String.format("%-50s %-10s %-30s\n", "Lock", "Waiters", "Owner")); + sb.append(String.format("%-50s %-10s %-30s\n", + repeat("-", 50), repeat("-", 10), repeat("-", 30))); + + for (Map.Entry> entry : sorted) { + String lockName = entry.getKey(); + List waiters = entry.getValue(); + String ownerName = waiters.get(0).getLockOwnerName(); + if (ownerName == null) { + ownerName = "N/A"; + } + + String color = waiters.size() >= 5 ? ANSI_RED + : waiters.size() >= 2 ? ANSI_YELLOW : ANSI_CYAN; + + sb.append(color); + sb.append(String.format("%-50s %-10d %-30s", + truncate(lockName, 50), waiters.size(), truncate(ownerName, 30))); + sb.append(ANSI_RESET).append("\n"); + + // Show waiting threads + for (ThreadInfo waiter : waiters) { + sb.append(" ").append(ANSI_CYAN).append("-> ") + .append(waiter.getThreadName()) + .append(" (blocked ").append(waiter.getBlockedTime()).append("ms)") + .append(ANSI_RESET).append("\n"); + } + } + sb.append("\n"); + } + + private void appendWaitChainVisualization(StringBuilder sb, ThreadInfo[] threadInfos) { + sb.append(ANSI_BOLD).append("=== Thread Wait Chains ===\n").append(ANSI_RESET); + + // Build adjacency: thread -> thread it's waiting on + Map threadMap = new HashMap<>(); + for (ThreadInfo info : threadInfos) { + threadMap.put(info.getThreadId(), info); + } + + // Find chains: follow lockOwnerId links + Set visited = new HashSet<>(); + List> chains = new ArrayList<>(); + + for (ThreadInfo info : threadInfos) { + if (info.getLockOwnerId() > 0 && !visited.contains(info.getThreadId())) { + List chain = buildWaitChain(info, threadMap, visited); + if (chain.size() > 1) { + chains.add(chain); + } + } + } + + if (chains.isEmpty()) { + sb.append(ANSI_GREEN).append("No wait chains detected.\n").append(ANSI_RESET); + } else { + sb.append("Found ").append(chains.size()).append(" wait chain(s):\n\n"); + int chainNum = 1; + for (List chain : chains) { + sb.append(ANSI_YELLOW).append("Chain #").append(chainNum++).append(":") + .append(ANSI_RESET).append("\n"); + for (int i = 0; i < chain.size(); i++) { + ThreadInfo t = chain.get(i); + String indent = repeat(" ", i); + sb.append(indent); + if (i > 0) { + sb.append(ANSI_CYAN).append("\\-> ").append(ANSI_RESET); + } + sb.append(t.getThreadName()) + .append(" [").append(t.getThreadState()).append("]"); + if (t.getLockName() != null) { + sb.append(" waiting on ").append(t.getLockName()); + } + sb.append("\n"); + } + sb.append("\n"); + } + } + } + + private List buildWaitChain(ThreadInfo start, Map threadMap, + Set visited) { + List chain = new ArrayList<>(); + ThreadInfo current = start; + Set chainVisited = new HashSet<>(); + + while (current != null && !chainVisited.contains(current.getThreadId())) { + chainVisited.add(current.getThreadId()); + visited.add(current.getThreadId()); + chain.add(current); + + long ownerId = current.getLockOwnerId(); + if (ownerId > 0) { + current = threadMap.get(ownerId); + } else { + break; + } + } + return chain; + } + + private static String truncate(String str, int maxLen) { + if (str == null) { + return ""; + } + return str.length() > maxLen ? str.substring(0, maxLen - 3) + "..." : str; + } + + private static String repeat(String str, int times) { + StringBuilder sb = new StringBuilder(str.length() * times); + for (int i = 0; i < times; i++) { + sb.append(str); + } + return sb.toString(); + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/command/monitor200/TimeTunnelCommand.java b/core/src/main/java/com/taobao/arthas/core/command/monitor200/TimeTunnelCommand.java index da7c7909a35..7fbf4013b9d 100644 --- a/core/src/main/java/com/taobao/arthas/core/command/monitor200/TimeTunnelCommand.java +++ b/core/src/main/java/com/taobao/arthas/core/command/monitor200/TimeTunnelCommand.java @@ -22,6 +22,7 @@ import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -52,7 +53,7 @@ public class TimeTunnelCommand extends EnhancerCommand { // 时间隧道(时间碎片的集合) // TODO 并非线程安全? - private static final Map timeFragmentMap = new LinkedHashMap(); + private static final Map timeFragmentMap = Collections.synchronizedMap(new LinkedHashMap()); // 时间碎片序列生成器 private static final AtomicInteger sequence = new AtomicInteger(1000); // TimeTunnel the method call @@ -400,14 +401,19 @@ private void processSearch(CommandProcess process) { try { // 匹配的时间片段 Map matchingTimeSegmentMap = new LinkedHashMap(); - for (Map.Entry entry : timeFragmentMap.entrySet()) { - int index = entry.getKey(); - TimeFragment tf = entry.getValue(); - Advice advice = tf.getAdvice(); - - // 搜索出匹配的时间片段 - if ((ExpressFactory.threadLocalExpress(advice)).is(searchExpress)) { - matchingTimeSegmentMap.put(index, tf); + // Collections.synchronizedMap requires the user to manually synchronize on + // the returned map when iterating its collection views, otherwise iteration + // can throw ConcurrentModificationException if another thread mutates the map. + synchronized (timeFragmentMap) { + for (Map.Entry entry : timeFragmentMap.entrySet()) { + int index = entry.getKey(); + TimeFragment tf = entry.getValue(); + Advice advice = tf.getAdvice(); + + // 搜索出匹配的时间片段 + if ((ExpressFactory.threadLocalExpress(advice)).is(searchExpress)) { + matchingTimeSegmentMap.put(index, tf); + } } } diff --git a/core/src/main/java/com/taobao/arthas/core/util/NetUtils.java b/core/src/main/java/com/taobao/arthas/core/util/NetUtils.java index 8ebff13db86..9b8be1b621c 100644 --- a/core/src/main/java/com/taobao/arthas/core/util/NetUtils.java +++ b/core/src/main/java/com/taobao/arthas/core/util/NetUtils.java @@ -75,14 +75,13 @@ public static Response request(String urlString) { * @return the response string of given url */ public static String simpleRequest(String url) { - BufferedReader br = null; try { URL obj = new URL(url); HttpURLConnection con = (HttpURLConnection) obj.openConnection(); con.setRequestProperty("Accept", "application/json"); int responseCode = con.getResponseCode(); - br = new BufferedReader(new InputStreamReader(con.getInputStream())); + BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream())); StringBuilder sb = new StringBuilder(); String line = null; while ((line = br.readLine()) != null) { @@ -102,14 +101,6 @@ public static String simpleRequest(String url) { } catch (Exception e) { return null; - } finally { - if (br != null) { - try { - br.close(); - } catch (IOException e) { - // ignore - } - } } } @@ -128,16 +119,14 @@ public static String simpleRequest(String url) { * @return the qos response in string format */ public static Response requestViaSocket(String path) { - BufferedReader br = null; - try { - Socket s = new Socket(QOS_HOST, QOS_PORT); + try (Socket s = new Socket(QOS_HOST, QOS_PORT)) { PrintWriter pw = new PrintWriter(s.getOutputStream()); pw.println("GET " + path + " HTTP/1.1"); pw.println("Host: " + QOS_HOST + ":" + QOS_PORT); pw.println(""); pw.flush(); - br = new BufferedReader(new InputStreamReader(s.getInputStream())); + BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream())); StringBuilder sb = new StringBuilder(); String line = null; boolean start = false; @@ -153,14 +142,6 @@ public static Response requestViaSocket(String path) { return new Response(result); } catch (Exception e) { return new Response(e.getMessage(), false); - } finally { - if (br != null) { - try { - br.close(); - } catch (IOException e) { - // ignore - } - } } } diff --git a/core/src/test/java/com/taobao/arthas/core/command/monitor200/ThreadContentionCommandTest.java b/core/src/test/java/com/taobao/arthas/core/command/monitor200/ThreadContentionCommandTest.java new file mode 100644 index 00000000000..8e401d2fae5 --- /dev/null +++ b/core/src/test/java/com/taobao/arthas/core/command/monitor200/ThreadContentionCommandTest.java @@ -0,0 +1,168 @@ +package com.taobao.arthas.core.command.monitor200; + +import org.junit.Test; +import static org.junit.Assert.*; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; + +/** + * Tests for ThreadContentionCommand functionality. + * + * @author spatchava + */ +public class ThreadContentionCommandTest { + + @Test + public void testDeadlockDetectionWithNoDeadlock() { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + long[] deadlocked = threadMXBean.findDeadlockedThreads(); + // In a normal test environment, there should be no deadlocks + assertNull("No deadlocks should exist in a clean test environment", deadlocked); + } + + @Test + public void testDeadlockDetectionWithSyntheticDeadlock() throws Exception { + final Object lockA = new Object(); + final Object lockB = new Object(); + final boolean[] deadlockCreated = {false}; + + Thread t1 = new Thread(() -> { + synchronized (lockA) { + deadlockCreated[0] = true; + try { Thread.sleep(100); } catch (InterruptedException ignored) {} + synchronized (lockB) { + // This should never be reached in a deadlock + } + } + }, "DeadlockTest-Thread-1"); + + Thread t2 = new Thread(() -> { + synchronized (lockB) { + while (!deadlockCreated[0]) { + try { Thread.sleep(10); } catch (InterruptedException ignored) {} + } + synchronized (lockA) { + // This should never be reached in a deadlock + } + } + }, "DeadlockTest-Thread-2"); + + t1.setDaemon(true); + t2.setDaemon(true); + t1.start(); + t2.start(); + + // Wait for deadlock to form + Thread.sleep(500); + + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + long[] deadlockedThreads = threadMXBean.findDeadlockedThreads(); + + // Clean up - interrupt threads (they won't actually unblock from deadlock, + // but since they're daemon threads they'll be cleaned up on JVM exit) + t1.interrupt(); + t2.interrupt(); + + // Verify deadlock was detected + assertNotNull("Deadlock should be detected between two threads", deadlockedThreads); + assertEquals("Exactly 2 threads should be in deadlock", 2, deadlockedThreads.length); + } + + @Test + public void testContentionAnalysisOutputFormatting() { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true); + + assertNotNull("Thread dump should not be null", threadInfos); + assertTrue("Thread dump should contain at least one thread", threadInfos.length > 0); + + // Verify ThreadInfo contains expected data + boolean foundMainThread = false; + for (ThreadInfo info : threadInfos) { + assertNotNull("Thread name should not be null", info.getThreadName()); + assertNotNull("Thread state should not be null", info.getThreadState()); + if ("main".equals(info.getThreadName())) { + foundMainThread = true; + } + } + assertTrue("Should find the main thread in thread dump", foundMainThread); + } + + @Test + public void testThreadWaitChainBuilding() { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true); + + // Build a map of thread ID -> ThreadInfo + java.util.Map threadMap = new java.util.HashMap<>(); + for (ThreadInfo info : threadInfos) { + threadMap.put(info.getThreadId(), info); + } + + // Verify we can look up threads by ID + for (ThreadInfo info : threadInfos) { + ThreadInfo lookedUp = threadMap.get(info.getThreadId()); + assertNotNull("Should find thread in map by ID", lookedUp); + assertEquals("Thread names should match", info.getThreadName(), lookedUp.getThreadName()); + } + + // Verify wait chain links are valid (if a thread is waiting on a lock owner, + // that owner should exist in the thread map) + for (ThreadInfo info : threadInfos) { + if (info.getLockOwnerId() > 0) { + ThreadInfo owner = threadMap.get(info.getLockOwnerId()); + assertNotNull("Lock owner thread should exist in thread map for thread: " + + info.getThreadName(), owner); + } + } + } + + @Test + public void testNoContentionScenario() { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true); + + // Count threads with lock contention (waiting on a lock) + int blockedCount = 0; + for (ThreadInfo info : threadInfos) { + if (info.getThreadState() == Thread.State.BLOCKED) { + blockedCount++; + } + } + + // In a normal test environment without deliberate contention, + // we expect no or very few blocked threads + assertTrue("In a clean test environment, blocked thread count should be low", + blockedCount < threadInfos.length / 2); + } + + @Test + public void testThreadMXBeanContentionMonitoring() { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + + // Test that contention monitoring can be enabled/disabled safely + boolean originalState = threadMXBean.isThreadContentionMonitoringEnabled(); + try { + threadMXBean.setThreadContentionMonitoringEnabled(true); + assertTrue("Contention monitoring should be enabled", + threadMXBean.isThreadContentionMonitoringEnabled()); + + // Verify blocked time is available when monitoring is enabled + ThreadInfo[] infos = threadMXBean.dumpAllThreads(false, false); + for (ThreadInfo info : infos) { + // Blocked time should be >= 0 when monitoring is enabled + assertTrue("Blocked time should be non-negative", + info.getBlockedTime() >= 0); + assertTrue("Waited time should be non-negative", + info.getWaitedTime() >= 0); + } + } finally { + // Restore original state - validates the fix for resource leak + threadMXBean.setThreadContentionMonitoringEnabled(originalState); + assertEquals("Contention monitoring should be restored to original state", + originalState, threadMXBean.isThreadContentionMonitoringEnabled()); + } + } +}