diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java index 80aa854e66cc1..591980548e2c7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java @@ -341,6 +341,7 @@ public void testClosedDataNodeGetConnections() throws Exception { TimeUnit.SECONDS.sleep(1); } } + TimeUnit.SECONDS.sleep(5); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java index d8fb9bb4c5072..f42d4237ea9d3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java @@ -242,7 +242,7 @@ public void executeRawDataQuery() { } @Test - public void tryIfTheServerIsRestart() { + public void tryIfTheServerIsRestart() throws InterruptedException { ISessionPool pool = EnvFactory.getEnv().getSessionPool(3); SessionDataSetWrapper wrapper = null; BaseNodeWrapper node = EnvFactory.getEnv().getDataNodeWrapper(0); @@ -268,6 +268,7 @@ public void tryIfTheServerIsRestart() { .ensureNodeStatus( Collections.singletonList(node), Collections.singletonList(NodeStatus.Running)); pool = EnvFactory.getEnv().getSessionPool(3); + TimeUnit.SECONDS.sleep(5); correctQuery(pool, DEFAULT_QUERY_TIMEOUT); pool.close(); return; @@ -335,7 +336,7 @@ public void tryIfTheServerIsRestartButDataIsGotten() { } @Test - public void restart() { + public void restart() throws InterruptedException { ISessionPool pool = EnvFactory.getEnv().getSessionPool(1); write10Data(pool, true); // stop the server. @@ -353,6 +354,7 @@ public void restart() { EnvFactory.getEnv() .ensureNodeStatus( Collections.singletonList(node), Collections.singletonList(NodeStatus.Running)); + TimeUnit.SECONDS.sleep(5); write10Data(pool, true); pool.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java index 3fac0d10a167c..b9b9e89e82c09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java @@ -29,6 +29,11 @@ import org.apache.iotdb.metrics.utils.MetricType; import org.apache.iotdb.metrics.utils.SystemType; +import com.sun.jna.Library; +import com.sun.jna.Native; +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.WinNT; +import com.sun.jna.ptr.IntByReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +57,9 @@ public SystemRelatedFileMetrics() { @Override public void bindTo(AbstractMetricService metricService) { - if ((CONFIG.getSystemType() == SystemType.LINUX || CONFIG.getSystemType() == SystemType.MAC) + if ((CONFIG.getSystemType() == SystemType.LINUX + || CONFIG.getSystemType() == SystemType.MAC + || CONFIG.getSystemType() == SystemType.WINDOWS) && !CONFIG.getPid().isEmpty()) { this.getOpenFileNumberCommand = new String[] {"/bin/sh", "-c", String.format("lsof -p %s | wc -l", CONFIG.getPid())}; @@ -88,6 +95,11 @@ private long getOpenFileHandlersNumber() { } } fdCount = Long.parseLong(result.toString().trim()); + } else if (CONFIG.getSystemType() == SystemType.WINDOWS) { + WinNT.HANDLE hProcess = Kernel32.INSTANCE.GetCurrentProcess(); + IntByReference handleCount = new IntByReference(); + boolean success = Kernel32Ext.INSTANCE.GetProcessHandleCount(hProcess, handleCount); + return success ? handleCount.getValue() : 0L; } } catch (IOException e) { LOGGER.warn("Failed to get open file number, because ", e); @@ -97,7 +109,9 @@ private long getOpenFileHandlersNumber() { @Override public void unbindFrom(AbstractMetricService metricService) { - if ((CONFIG.getSystemType() == SystemType.LINUX || CONFIG.getSystemType() == SystemType.MAC) + if ((CONFIG.getSystemType() == SystemType.LINUX + || CONFIG.getSystemType() == SystemType.MAC + || CONFIG.getSystemType() == SystemType.WINDOWS) && !CONFIG.getPid().isEmpty()) { metricService.remove( MetricType.AUTO_GAUGE, @@ -106,4 +120,10 @@ public void unbindFrom(AbstractMetricService metricService) { "open_file_handlers"); } } + + public interface Kernel32Ext extends Library { + Kernel32Ext INSTANCE = Native.load("kernel32", Kernel32Ext.class); + + boolean GetProcessHandleCount(WinNT.HANDLE hProcess, IntByReference pdwHandleCount); + } } diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java index 975576ac90439..b83e0c5aa1702 100644 --- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java +++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java @@ -19,5 +19,505 @@ package org.apache.iotdb.metrics.metricsets.disk; -/** Disk Metrics Manager for Windows system, not implemented yet. */ -public class WindowsDiskMetricsManager implements IDiskMetricsManager {} +import org.apache.iotdb.metrics.config.MetricConfigDescriptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Disk metrics manager for Windows system. + * + *

Windows does not expose Linux-like cumulative counters through procfs, so this implementation + * periodically samples Win32 performance counters and accumulates the observed per-second values + * into totals that match the Linux manager contract as closely as possible. + */ +public class WindowsDiskMetricsManager implements IDiskMetricsManager { + private static final Logger LOGGER = LoggerFactory.getLogger(WindowsDiskMetricsManager.class); + + private static final double BYTES_PER_KB = 1024.0; + private static final long UPDATE_SMALLEST_INTERVAL = 10000L; + private static final String POWER_SHELL = "powershell"; + private static final String POWER_SHELL_NO_PROFILE = "-NoProfile"; + private static final String POWER_SHELL_COMMAND = "-Command"; + private static final String TOTAL_DISK_INSTANCE = "_Total"; + private static final Charset WINDOWS_SHELL_CHARSET = getWindowsShellCharset(); + private static final String DISK_QUERY = + "Get-CimInstance Win32_PerfFormattedData_PerfDisk_PhysicalDisk | " + + "Where-Object { $_.Name -ne '_Total' } | " + + "ForEach-Object { " + + "[string]::Concat(" + + "$_.Name, [char]9, " + + "$_.DiskReadsPerSec, [char]9, " + + "$_.DiskWritesPerSec, [char]9, " + + "$_.DiskReadBytesPerSec, [char]9, " + + "$_.DiskWriteBytesPerSec, [char]9, " + + "$_.AvgDisksecPerRead, [char]9, " + + "$_.AvgDisksecPerWrite, [char]9, " + + "$_.PercentIdleTime, [char]9, " + + "$_.AvgDiskQueueLength) }"; + private static final String PROCESS_QUERY_TEMPLATE = + "Get-CimInstance Win32_PerfFormattedData_PerfProc_Process | " + + "Where-Object { $_.IDProcess -eq %s } | " + + "ForEach-Object { " + + "[string]::Concat(" + + "$_.IOReadOperationsPerSec, [char]9, " + + "$_.IOWriteOperationsPerSec, [char]9, " + + "$_.IOReadBytesPerSec, [char]9, " + + "$_.IOWriteBytesPerSec) }"; + + private final String processId; + private final Set diskIdSet = new HashSet<>(); + + private long lastUpdateTime = 0L; + private long updateInterval = 1L; + + private final Map lastReadOperationCountForDisk = new HashMap<>(); + private final Map lastWriteOperationCountForDisk = new HashMap<>(); + private final Map lastReadTimeCostForDisk = new HashMap<>(); + private final Map lastWriteTimeCostForDisk = new HashMap<>(); + private final Map lastMergedReadCountForDisk = new HashMap<>(); + private final Map lastMergedWriteCountForDisk = new HashMap<>(); + private final Map lastReadSizeForDisk = new HashMap<>(); + private final Map lastWriteSizeForDisk = new HashMap<>(); + private final Map lastIoUtilsPercentageForDisk = new HashMap<>(); + private final Map lastQueueSizeForDisk = new HashMap<>(); + private final Map lastAvgReadCostTimeOfEachOpsForDisk = new HashMap<>(); + private final Map lastAvgWriteCostTimeOfEachOpsForDisk = new HashMap<>(); + private final Map lastAvgSizeOfEachReadForDisk = new HashMap<>(); + private final Map lastAvgSizeOfEachWriteForDisk = new HashMap<>(); + + private long lastReallyReadSizeForProcess = 0L; + private long lastReallyWriteSizeForProcess = 0L; + private long lastAttemptReadSizeForProcess = 0L; + private long lastAttemptWriteSizeForProcess = 0L; + private long lastReadOpsCountForProcess = 0L; + private long lastWriteOpsCountForProcess = 0L; + + public WindowsDiskMetricsManager() { + processId = String.valueOf(MetricConfigDescriptor.getInstance().getMetricConfig().getPid()); + collectDiskId(); + } + + @Override + public Map getReadDataSizeForDisk() { + checkUpdate(); + return toKbMap(lastReadSizeForDisk); + } + + @Override + public Map getWriteDataSizeForDisk() { + checkUpdate(); + return toKbMap(lastWriteSizeForDisk); + } + + @Override + public Map getReadOperationCountForDisk() { + checkUpdate(); + return lastReadOperationCountForDisk; + } + + @Override + public Map getWriteOperationCountForDisk() { + checkUpdate(); + return lastWriteOperationCountForDisk; + } + + @Override + public Map getReadCostTimeForDisk() { + checkUpdate(); + return lastReadTimeCostForDisk; + } + + @Override + public Map getWriteCostTimeForDisk() { + checkUpdate(); + return lastWriteTimeCostForDisk; + } + + @Override + public Map getIoUtilsPercentage() { + checkUpdate(); + return lastIoUtilsPercentageForDisk; + } + + @Override + public Map getAvgReadCostTimeOfEachOpsForDisk() { + checkUpdate(); + return lastAvgReadCostTimeOfEachOpsForDisk; + } + + @Override + public Map getAvgWriteCostTimeOfEachOpsForDisk() { + checkUpdate(); + return lastAvgWriteCostTimeOfEachOpsForDisk; + } + + @Override + public Map getAvgSizeOfEachReadForDisk() { + checkUpdate(); + return lastAvgSizeOfEachReadForDisk; + } + + @Override + public Map getAvgSizeOfEachWriteForDisk() { + checkUpdate(); + return lastAvgSizeOfEachWriteForDisk; + } + + @Override + public Map getMergedWriteOperationForDisk() { + checkUpdate(); + return lastMergedWriteCountForDisk; + } + + @Override + public Map getMergedReadOperationForDisk() { + checkUpdate(); + return lastMergedReadCountForDisk; + } + + @Override + public Map getQueueSizeForDisk() { + checkUpdate(); + return lastQueueSizeForDisk; + } + + @Override + public double getActualReadDataSizeForProcess() { + checkUpdate(); + return lastReallyReadSizeForProcess / BYTES_PER_KB; + } + + @Override + public double getActualWriteDataSizeForProcess() { + checkUpdate(); + return lastReallyWriteSizeForProcess / BYTES_PER_KB; + } + + @Override + public long getReadOpsCountForProcess() { + checkUpdate(); + return lastReadOpsCountForProcess; + } + + @Override + public long getWriteOpsCountForProcess() { + checkUpdate(); + return lastWriteOpsCountForProcess; + } + + @Override + public double getAttemptReadSizeForProcess() { + checkUpdate(); + return lastAttemptReadSizeForProcess / BYTES_PER_KB; + } + + @Override + public double getAttemptWriteSizeForProcess() { + checkUpdate(); + return lastAttemptWriteSizeForProcess / BYTES_PER_KB; + } + + @Override + public Set getDiskIds() { + checkUpdate(); + return diskIdSet; + } + + private void collectDiskId() { + Map diskInfoMap = queryDiskInfo(); + if (diskInfoMap.isEmpty()) { + return; + } + diskIdSet.clear(); + diskIdSet.addAll(diskInfoMap.keySet()); + } + + private Map toKbMap(Map source) { + Map result = new HashMap<>(source.size()); + for (Map.Entry entry : source.entrySet()) { + result.put(entry.getKey(), entry.getValue() / BYTES_PER_KB); + } + return result; + } + + private void updateInfo() { + long currentTime = System.currentTimeMillis(); + updateInterval = lastUpdateTime == 0L ? 0L : currentTime - lastUpdateTime; + lastUpdateTime = currentTime; + updateDiskInfo(); + updateProcessInfo(); + } + + private void updateDiskInfo() { + Map diskInfoMap = queryDiskInfo(); + if (diskInfoMap.isEmpty()) { + return; + } + + diskIdSet.clear(); + diskIdSet.addAll(diskInfoMap.keySet()); + pruneDiskMetricMaps(); + + for (Map.Entry entry : diskInfoMap.entrySet()) { + String diskId = entry.getKey(); + String[] diskInfo = entry.getValue(); + long readOpsPerSec = parseLong(diskInfo[0]); + long writeOpsPerSec = parseLong(diskInfo[1]); + long readBytesPerSec = parseLong(diskInfo[2]); + long writeBytesPerSec = parseLong(diskInfo[3]); + double avgDiskSecPerRead = parseDouble(diskInfo[4]); + double avgDiskSecPerWrite = parseDouble(diskInfo[5]); + double percentIdleTime = parseDouble(diskInfo[6]); + double avgDiskQueueLength = parseDouble(diskInfo[7]); + + long intervalMillis = updateInterval; + lastReadOperationCountForDisk.put( + diskId, + accumulate(lastReadOperationCountForDisk.get(diskId), readOpsPerSec, intervalMillis)); + lastWriteOperationCountForDisk.put( + diskId, + accumulate(lastWriteOperationCountForDisk.get(diskId), writeOpsPerSec, intervalMillis)); + lastMergedReadCountForDisk.put(diskId, 0L); + lastMergedWriteCountForDisk.put(diskId, 0L); + lastReadSizeForDisk.put( + diskId, accumulate(lastReadSizeForDisk.get(diskId), readBytesPerSec, intervalMillis)); + lastWriteSizeForDisk.put( + diskId, accumulate(lastWriteSizeForDisk.get(diskId), writeBytesPerSec, intervalMillis)); + lastReadTimeCostForDisk.put( + diskId, + accumulateTimeCost( + lastReadTimeCostForDisk.get(diskId), + avgDiskSecPerRead, + readOpsPerSec, + intervalMillis)); + lastWriteTimeCostForDisk.put( + diskId, + accumulateTimeCost( + lastWriteTimeCostForDisk.get(diskId), + avgDiskSecPerWrite, + writeOpsPerSec, + intervalMillis)); + lastIoUtilsPercentageForDisk.put(diskId, clampPercentage(1.0 - percentIdleTime / 100.0)); + lastQueueSizeForDisk.put(diskId, avgDiskQueueLength); + lastAvgReadCostTimeOfEachOpsForDisk.put(diskId, avgDiskSecPerRead * 1000.0); + lastAvgWriteCostTimeOfEachOpsForDisk.put(diskId, avgDiskSecPerWrite * 1000.0); + lastAvgSizeOfEachReadForDisk.put( + diskId, readOpsPerSec == 0 ? 0.0 : ((double) readBytesPerSec) / readOpsPerSec); + lastAvgSizeOfEachWriteForDisk.put( + diskId, writeOpsPerSec == 0 ? 0.0 : ((double) writeBytesPerSec) / writeOpsPerSec); + } + } + + private void pruneDiskMetricMaps() { + pruneDiskMetricMap(lastReadOperationCountForDisk); + pruneDiskMetricMap(lastWriteOperationCountForDisk); + pruneDiskMetricMap(lastReadTimeCostForDisk); + pruneDiskMetricMap(lastWriteTimeCostForDisk); + pruneDiskMetricMap(lastMergedReadCountForDisk); + pruneDiskMetricMap(lastMergedWriteCountForDisk); + pruneDiskMetricMap(lastReadSizeForDisk); + pruneDiskMetricMap(lastWriteSizeForDisk); + pruneDiskMetricMap(lastIoUtilsPercentageForDisk); + pruneDiskMetricMap(lastQueueSizeForDisk); + pruneDiskMetricMap(lastAvgReadCostTimeOfEachOpsForDisk); + pruneDiskMetricMap(lastAvgWriteCostTimeOfEachOpsForDisk); + pruneDiskMetricMap(lastAvgSizeOfEachReadForDisk); + pruneDiskMetricMap(lastAvgSizeOfEachWriteForDisk); + } + + private void pruneDiskMetricMap(Map metricMap) { + metricMap.keySet().retainAll(diskIdSet); + } + + private void updateProcessInfo() { + String processInfo = queryProcessInfo(); + if (processInfo == null || processInfo.isEmpty()) { + return; + } + + String[] processMetricArray = processInfo.split("\t"); + if (processMetricArray.length < 4) { + LOGGER.warn("Unexpected windows process io info format: {}", processInfo); + return; + } + + long readOpsPerSec = parseLong(processMetricArray[0]); + long writeOpsPerSec = parseLong(processMetricArray[1]); + long readBytesPerSec = parseLong(processMetricArray[2]); + long writeBytesPerSec = parseLong(processMetricArray[3]); + + lastReadOpsCountForProcess = + accumulate(lastReadOpsCountForProcess, readOpsPerSec, updateInterval); + lastWriteOpsCountForProcess = + accumulate(lastWriteOpsCountForProcess, writeOpsPerSec, updateInterval); + lastReallyReadSizeForProcess = + accumulate(lastReallyReadSizeForProcess, readBytesPerSec, updateInterval); + lastReallyWriteSizeForProcess = + accumulate(lastReallyWriteSizeForProcess, writeBytesPerSec, updateInterval); + + // Windows does not expose attempted read/write sizes directly in these counters. + lastAttemptReadSizeForProcess = lastReallyReadSizeForProcess; + lastAttemptWriteSizeForProcess = lastReallyWriteSizeForProcess; + } + + private Map queryDiskInfo() { + Map result = new HashMap<>(); + for (String line : executePowerShell(DISK_QUERY)) { + if (line == null || line.isEmpty()) { + continue; + } + String[] values = line.split("\t"); + if (values.length < 9) { + LOGGER.warn("Unexpected windows disk io info format: {}", line); + continue; + } + String diskId = values[0].trim(); + if (diskId.isEmpty() || TOTAL_DISK_INSTANCE.equals(diskId)) { + continue; + } + String[] metricArray = new String[8]; + System.arraycopy(values, 1, metricArray, 0, metricArray.length); + result.put(diskId, metricArray); + } + return result; + } + + private String queryProcessInfo() { + for (String line : + executePowerShell( + String.format(PROCESS_QUERY_TEMPLATE, escapeSingleQuotedPowerShell(processId)))) { + if (line != null && !line.isEmpty()) { + return line; + } + } + return null; + } + + private String escapeSingleQuotedPowerShell(String value) { + return value.replace("'", "''"); + } + + private long accumulate(Long previousValue, long valuePerSec, long intervalMillis) { + if (intervalMillis <= 0L) { + return previousValue == null ? 0L : previousValue; + } + return (previousValue == null ? 0L : previousValue) + valuePerSec * intervalMillis / 1000L; + } + + private long accumulate(long previousValue, long valuePerSec, long intervalMillis) { + if (intervalMillis <= 0L) { + return previousValue; + } + return previousValue + valuePerSec * intervalMillis / 1000L; + } + + private long accumulateTimeCost( + Long previousValue, double avgTimeInSecond, long opsPerSec, long intervalMillis) { + if (intervalMillis <= 0L) { + return previousValue == null ? 0L : previousValue; + } + long previous = previousValue == null ? 0L : previousValue; + double operationCount = opsPerSec * intervalMillis / 1000.0; + return previous + Math.round(avgTimeInSecond * operationCount * 1000.0); + } + + private long parseLong(String value) { + try { + return Math.round(Double.parseDouble(value.trim())); + } catch (NumberFormatException e) { + LOGGER.warn("Failed to parse long value from windows disk metrics: {}", value, e); + return 0L; + } + } + + private double parseDouble(String value) { + try { + return Double.parseDouble(value.trim()); + } catch (NumberFormatException e) { + LOGGER.warn("Failed to parse double value from windows disk metrics: {}", value, e); + return 0.0; + } + } + + private double clampPercentage(double value) { + return Math.max(0.0, Math.min(1.0, value)); + } + + private List executePowerShell(String command) { + List result = new ArrayList<>(); + List rawOutput = new ArrayList<>(); + Process process = null; + try { + process = + new ProcessBuilder(POWER_SHELL, POWER_SHELL_NO_PROFILE, POWER_SHELL_COMMAND, command) + .redirectErrorStream(true) + .start(); + try (BufferedReader reader = + new BufferedReader( + new InputStreamReader(process.getInputStream(), WINDOWS_SHELL_CHARSET))) { + String line; + while ((line = reader.readLine()) != null) { + String trimmedLine = line.trim(); + if (!trimmedLine.isEmpty()) { + rawOutput.add(trimmedLine); + } + } + } + int exitCode = process.waitFor(); + if (exitCode != 0) { + LOGGER.warn( + "Failed to collect windows disk metrics, powershell exit code: {}, command {}, output {}", + exitCode, + command, + String.join(" | ", rawOutput)); + } else { + result.addAll(rawOutput); + } + } catch (IOException e) { + LOGGER.warn("Failed to execute powershell for windows disk metrics", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Interrupted while collecting windows disk metrics", e); + } finally { + if (process != null) { + process.destroy(); + } + } + return result; + } + + private static Charset getWindowsShellCharset() { + String nativeEncoding = System.getProperty("sun.jnu.encoding"); + if (nativeEncoding != null && Charset.isSupported(nativeEncoding)) { + return Charset.forName(nativeEncoding); + } + + String fileEncoding = System.getProperty("file.encoding"); + if (fileEncoding != null && Charset.isSupported(fileEncoding)) { + return Charset.forName(fileEncoding); + } + + if (Charset.isSupported("GBK")) { + return Charset.forName("GBK"); + } + return Charset.defaultCharset(); + } + + private void checkUpdate() { + if (System.currentTimeMillis() - lastUpdateTime > UPDATE_SMALLEST_INTERVAL) { + updateInfo(); + } + } +} diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java index c3ecb4b8d5028..7fb82ffaa0dc2 100644 --- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java +++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java @@ -19,4 +19,201 @@ package org.apache.iotdb.metrics.metricsets.net; -public class WindowsNetMetricManager implements INetMetricManager {} +import org.apache.iotdb.metrics.MetricConstant; +import org.apache.iotdb.metrics.config.MetricConfig; +import org.apache.iotdb.metrics.config.MetricConfigDescriptor; +import org.apache.iotdb.metrics.utils.MetricLevel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class WindowsNetMetricManager implements INetMetricManager { + private static final Logger LOGGER = LoggerFactory.getLogger(WindowsNetMetricManager.class); + + private static final MetricConfig METRIC_CONFIG = + MetricConfigDescriptor.getInstance().getMetricConfig(); + + private long lastUpdateTime = 0L; + + private Set ifaceSet = new HashSet<>(); + + private final Map receivedBytesMapForIface = new HashMap<>(); + + private final Map transmittedBytesMapForIface = new HashMap<>(); + + private final Map receivedPacketsMapForIface = new HashMap<>(); + + private final Map transmittedPacketsMapForIface = new HashMap<>(); + + private int connectionNum = 0; + + public WindowsNetMetricManager() { + checkUpdate(); + } + + @Override + public Set getIfaceSet() { + checkUpdate(); + return ifaceSet; + } + + @Override + public Map getReceivedByte() { + checkUpdate(); + return receivedBytesMapForIface; + } + + @Override + public Map getTransmittedBytes() { + checkUpdate(); + return transmittedBytesMapForIface; + } + + @Override + public Map getReceivedPackets() { + checkUpdate(); + return receivedPacketsMapForIface; + } + + @Override + public Map getTransmittedPackets() { + checkUpdate(); + return transmittedPacketsMapForIface; + } + + @Override + public int getConnectionNum() { + checkUpdate(); + return connectionNum; + } + + private void checkUpdate() { + if (System.currentTimeMillis() - lastUpdateTime >= MetricConstant.UPDATE_INTERVAL) { + updateNetStatus(); + } + } + + private void updateNetStatus() { + lastUpdateTime = System.currentTimeMillis(); + if (ifaceSet.isEmpty()) { + updateInterfaces(); + } + updateStatistics(); + if (MetricLevel.higherOrEqual(MetricLevel.NORMAL, METRIC_CONFIG.getMetricLevel())) { + updateConnectionNum(); + } + } + + private void updateInterfaces() { + try { + ifaceSet.clear(); + Process process = + Runtime.getRuntime() + .exec( + "cmd.exe /c chcp 65001 > nul & powershell.exe -Command \"Get-NetAdapter -IncludeHidden | Select Name | Format-List \""); + BufferedReader reader = + new BufferedReader( + new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.startsWith("Name :")) { + ifaceSet.add(line.substring("Name : ".length()).trim()); + } + } + int exitCode = process.waitFor(); + if (exitCode != 0) { + LOGGER.error("Failed to get interfaces, exit code: {}", exitCode); + } + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + LOGGER.error("Error updating interfaces", e); + ifaceSet.clear(); + } + } + + private void updateStatistics() { + try { + receivedBytesMapForIface.clear(); + transmittedBytesMapForIface.clear(); + receivedPacketsMapForIface.clear(); + transmittedPacketsMapForIface.clear(); + Process process = + Runtime.getRuntime() + .exec( + "cmd.exe /c chcp 65001 > nul & powershell.exe -Command \"Get-NetAdapterStatistics -IncludeHidden | Format-List Name,ReceivedBytes,SentBytes,ReceivedUnicastPackets,SentUnicastPackets \""); + BufferedReader reader = + new BufferedReader( + new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)); + String line; + String currentName = null; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.startsWith("Name ")) { + currentName = line.substring(line.indexOf(": ") + 2).trim(); + } else if (line.startsWith("ReceivedBytes ") && currentName != null) { + long value = Long.parseLong(line.substring(line.indexOf(": ") + 2).trim()); + receivedBytesMapForIface.put(currentName, value); + } else if (line.startsWith("SentBytes ") && currentName != null) { + long value = Long.parseLong(line.substring(line.indexOf(": ") + 2).trim()); + transmittedBytesMapForIface.put(currentName, value); + } else if (line.startsWith("ReceivedUnicastPackets ") && currentName != null) { + long value = Long.parseLong(line.substring(line.indexOf(": ") + 2).trim()); + receivedPacketsMapForIface.put(currentName, value); + } else if (line.startsWith("SentUnicastPackets ") && currentName != null) { + long value = Long.parseLong(line.substring(line.indexOf(": ") + 2).trim()); + transmittedPacketsMapForIface.put(currentName, value); + currentName = null; // Reset after processing an interface + } + } + int exitCode = process.waitFor(); + if (exitCode != 0) { + LOGGER.error("Failed to get statistics, exit code: {}", exitCode); + } + } catch (IOException | InterruptedException | NumberFormatException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + LOGGER.error("Error updating statistics", e); + } + } + + private void updateConnectionNum() { + try { + Process process = Runtime.getRuntime().exec("netstat -ano"); + BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + int count = 0; + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (!line.isEmpty() && !line.startsWith("Active") && !line.startsWith("Proto")) { + String[] parts = line.split("\\s+"); + if (parts.length >= 5 && parts[parts.length - 1].equals(METRIC_CONFIG.getPid())) { + count++; + } + } + } + this.connectionNum = count; + int exitCode = process.waitFor(); + if (exitCode != 0) { + LOGGER.error("Failed to get connection num, exit code: {}", exitCode); + } + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + LOGGER.error("Error updating connection num", e); + } + } +}