前文说到,自适应负载均衡与流控,将 CPU 负载、RT、上线时间通过一个公式来计算一个权重值或阈值,但是这个公式通用型可能不够,服务节点的性能存在差异,因为在不同的机器有不同的硬件资源,比如有些机器是 I/ O 特别强,有比较高的的 IOPS,有些机器有更高的 CPU 频率,CPU处理性能比较高,所以通过一个算法去计算最大并发量,通用性比较低,计算出来的最大并发量可能不够准确,并且这几个的指标并不能完全体现真实的服务节点负载,那么还有那些指标可以更加科学得体现节点负载呢?

有哪些指标

Java 程序的负载和状态可以通过多个指标体现,这些指标分为操作系统级别、JVM 级别和应用程序级别。以下是常用指标的分类:

Java 对象

系统指标

public class SystemMetrics {
    /**
     * CPU 核心数
     */
    private int cpuCount;
    /**
     * 最近1分钟的CPU负载
     */
    private double cpuLoad1m;
    /**
     * 最近5分钟的CPU负载
     */
    private double cpuLoad5m;
    /**
     * 最近15分钟的CPU负载
     */
    private double cpuLoad15m;
    /**
     * 当前CPU使用情况(用户、系统等)
     */
    private CpuUsage cpuUsage;
    /**
     * 上下文切换次数
     */
    private double contextSwitches;
    /**
     * 中断次数
     */
    private double interrupts;
    /**
     * 可用内存(单位:字节)
     */
    private double memoryAvailable;
    /**
     * 总内存(单位:字节)
     */
    private double memoryTotal;
    /**
     * 已使用的交换空间(单位:字节)
     */
    private double swapUsed;
    /**
     * 总交换空间(单位:字节)
     */
    private double swapTotal;
    /**
     * 当前进程数量
     */
    private int processCount;
    /**
     * 当前线程数量
     */
    private int threadCount;
    /**
     * 打开的文件描述符数量
     */
    private long openFdCount;
    /**
     * 每秒输入输出操作次数(IOPS)
     */
    private long iops;
}

class CpuUsage {
    /**
     * 用户使用率
     */
    private double cpuUsrUsage;
    /**
     * 系统使用率
     */
    private double cpuSysUsage;
    /**
     * 空闲率
     */
    private double cpuIdle;
}

CPU负载(load)与使用率(usage)的区别

  • CPU 负载:系统当前的负载,表示正在等待 CPU 的进程数量。通常为 1 分钟、5 分钟、15 分钟的平均值。 负载为 1.0,表示 CPU 核心处于满负载,负载为 2.0,表示任务过多,可能超载。

  • CPU 使用率:CPU 正在实际执行任务的时间与总时间的比值。通常以百分比表示。CPU 使用率为 50%,表示 CPU 空闲时间较多,100% 表示 CPU 完全占用。

网络指标

public class NetworkMetrics {
    // 网络性能指标
    /**
     * 网络延迟(单位:毫秒)
     */
    private double latency;
    /**
     * 网络带宽(单位:字节/秒)
     */
    private double bandwidth;
    /**
     * 最大传输单元(MTU,单位:字节)
     */
    private long mtu;
    /**
     * 网络速度(单位:比特/秒)
     */
    private long speed;
    /**
     * 网络接口的MAC地址
     */
    private String macaddr;
    /**
     * IPv4地址数组
     */
    private String[] iPv4addr;
    /**
     * IPv6地址数组
     */
    private String[] iPv6addr;
    /**
     * 是否有网络数据
     */
    private boolean hasData;
    /**
     * 接收的数据包数量
     */
    private long packetsRecv;
    /**
     * 接收的字节数
     */
    private long bytesRecv;
    /**
     * 输入错误的数量
     */
    private long inErrors;
    /**
     * 输出错误的数量
     */
    private long outErrors;
    /**
     * 发送的字节数
     */
    private long bytesSent;
    /**
     * 发送的数据包数量
     */
    private long packetsSent;
}

JVM GC 指标

public class GcMetrics {
    /**
     * JVM GC 次数
     */
    private int gcCount1m;
    /**
     * JVM GC 吞吐(%)
     */
    private double gcThroughput1m;
    /**
     * JVM 暂停 时间(ms)
     */
    private double gcPauseTime1m;
    /**
     * JVM 内存使用情况
     */
    private double gcMemoryPoolUsage;
    /**
     * JVM堆内存分配速率(MBS)
     */
    private double gcHeapAllocationRate;
    /**
     * JVM堆内存晋升速率(MBS)
     */
    private double gcHeapPromotedRate;
}

JVM 线程指标

public class ThreadMetrics {
    /**
     * 当前活动线程数量
     */
    private double liveThreads;

    /**
     * 线程峰值数量
     */
    private double peakThreads;

    /**
     * 被阻塞的线程数量
     */
    private double blockedThreads;

    /**
     * 正在运行的线程数量
     */
    private double runnableThreads;

    /**
     * 新创建的线程数量
     */
    private double newThreads;

    /**
     * 正在等待的线程数量(包括休眠、I/O等待等)
     */
    private double timedWaitingThreads;

    /**
     * 已终止的线程数量
     */
    private double terminatedThreads;

    /**
     * 处于等待状态的线程数量(例如:wait、join、lock等)
     */
    private double waitingThreads;
}

获取指标的框架

有了指标,如何去获取呢?或者说,如何在 Java 进程中获取这些信息?

JMX

JDK 提供了多种 MXBean(Managed Bean),它们通过 JMX(Java Management Extensions)接口暴露 JVM 的运行时信息,方便开发者监控和管理 Java 应用程序。以下是主要的 MXBean 及其作用和详细解释:

  1. MemoryMXBean:提供 JVM 内存使用情况,包括堆内存和非堆内存的管理信息;支持垃圾回收的基本操作。

  2. GarbageCollectorMXBean:提供 JVM 中垃圾回收器的性能数据。

  3. ThreadMXBean:提供 JVM 中线程的状态信息和性能数据;支持检测死锁。

  4. RuntimeMXBean:提供 JVM 的运行时信息。

  5. OperatingSystemMXBean:提供底层操作系统的相关信息。

  6. CompilationMXBean:提供 JIT 编译器的相关信息。

  7. ClassLoadingMXBean:提供 JVM 中类加载器的状态信息。

  8. MemoryPoolMXBean:提供 JVM 中直接内存的使用信息。

代码实战如下:

public class JMXBeanExample {
    public static void main(String[] args) {

        System.out.println("================================操作系统=================================");
        OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();
        System.out.println("OS Name: " + osMXBean.getName());
        System.out.println("OS Architecture: " + osMXBean.getArch());
        System.out.println("Available Processors: " + osMXBean.getAvailableProcessors());
        System.out.println("System Load Average: " + osMXBean.getSystemLoadAverage());

        MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();

        System.out.println("=================================内存==================================");
        // 堆内存使用情况
        MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();
        System.out.println("Heap Memory Used: " + heapMemoryUsage.getUsed());
        System.out.println("Heap Memory Max: " + heapMemoryUsage.getMax());

        // 非堆内存使用情况
        MemoryUsage nonHeapMemoryUsage = memoryMXBean.getNonHeapMemoryUsage();
        System.out.println("Non-Heap Memory Used: " + nonHeapMemoryUsage.getUsed());
        System.out.println("Non-Heap Memory Max: " + nonHeapMemoryUsage.getMax());


        System.out.println("=================================线程数=================================");
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        // 活跃线程数
        System.out.println("Thread Count: " + threadMXBean.getThreadCount());
        System.out.println("Daemon Thread Count: " + threadMXBean.getDaemonThreadCount());
        // 获取所有线程信息
        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
        for (ThreadInfo threadInfo : threadInfos) {
            System.out.println("Thread Name: " + threadInfo.getThreadName());
            System.out.println("Thread State: " + threadInfo.getThreadState());
        }

        System.out.println("=================================类加载=================================");
        ClassLoadingMXBean classLoadingMXBean = ManagementFactory.getClassLoadingMXBean();
        System.out.println("Loaded Class Count: " + classLoadingMXBean.getLoadedClassCount());
        System.out.println("Total Loaded Class Count: " + classLoadingMXBean.getTotalLoadedClassCount());
        System.out.println("Unloaded Class Count: " + classLoadingMXBean.getUnloadedClassCount());

        System.out.println("================================垃圾回收=================================");
        List<GarbageCollectorMXBean> gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
        System.out.println("Garbage Collector Details:");
        for (GarbageCollectorMXBean gcMXBean : gcMXBeans) {
            System.out.println("- " + gcMXBean.getName() + ": " + gcMXBean.getCollectionCount() + " collections, "
                    + gcMXBean.getCollectionTime() + "ms");
        }

        System.out.println("================================运行时==================================");
        RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
        System.out.println("runtimeMXBean.getVmName() = " + runtimeMXBean.getVmName());
        System.out.println("runtimeMXBean.getVmVersion() = " + runtimeMXBean.getVmVersion());
        System.out.println("runtimeMXBean.getStartTime() = " + runtimeMXBean.getStartTime());
        System.out.println("runtimeMXBean.getUptime() = " + runtimeMXBean.getUptime());
    }
}

Sigar

Sigar 是一个跨平台的系统性能监控库,可以用来收集操作系统级别的性能指标。它支持多种编程语言,包括 Java、C、Python 和 Perl 等,常用于实时监控系统的运行状态。

使用 Sigar 的注意事项:

  1. Sigar 要将 Sigar 的三个运行库(sigar-amd64-winnt.dll,sigar-x86-winnt.dll,sigar-x86-winnt.lib)安装到 JDK 环境的 /bin 目录下,Sigar 官网
  2. Sigar 的 JDK 要使用 Oracle 的 openJDK,微软的 Zulu JDK 运行不了。
  3. Sigar 目前没有适配 Macbook 的 M 系列芯片。

Sigar 实战:

public class SigarExample {
    public static void main(String[] args) {
        Runtime r = Runtime.getRuntime();
        Properties props = System.getProperties();
        InetAddress addr = InetAddress.getLocalHost();
        String ip = addr.getHostAddress();
        Map<String, String> map = System.getenv();
        String userName = map.get("USERNAME");// 获取用户名
        String computerName = map.get("COMPUTERNAME");// 获取计算机名
        String userDomain = map.get("USERDOMAIN");// 获取计算机域名
        System.out.println("用户名:" + userName);
        System.out.println("本地ip地址:" + ip);
        System.out.println("本地主机名:" + addr.getHostName());
        System.out.println("JVM可以使用的总内存:" + r.totalMemory());
        System.out.println("JVM可以使用的剩余内存:" + r.freeMemory());
        System.out.println("JVM可以使用的处理器个数:" + r.availableProcessors());
        System.out.println("Java运行时环境规范版本:" + props.getProperty("java.specification.version"));
        System.out.println("Java运行时环境规范供应商:" + props.getProperty("java.specification.vender"));
        System.out.println("Java运行时环境规范名称:" + props.getProperty("java.specification.name"));
        System.out.println("Java的类格式版本号:" + props.getProperty("java.class.version"));
        System.out.println("Java的类路径:" + props.getProperty("java.class.path"));
    }
}

Siigar 可以获取 CPU 主频

OSHI

OSHI (Operating System and Hardware Information) 是一个轻量级的、基于 Java 的开源库,用于收集操作系统和硬件的实时性能数据。它是纯 Java 实现的,不依赖 JNI,因此具有跨平台性和易于集成的优点。OSHI 的名称取自其功能:操作系统 (Operating System) 和硬件信息 (Hardware Information)。

public class OshiExample {

    public static void main(String[] args) {
        SystemInfo si = new SystemInfo();
        HardwareAbstractionLayer hal = si.getHardware();
        OperatingSystem os = printOS(si);
        printNetworkInterfaces(hal.getNetworkIFs());
    }

    private static OperatingSystem printOS(SystemInfo si) {
        OperatingSystem os = si.getOperatingSystem();
        System.out.println("OS Name: " + os);
        System.out.println("Boot Time: " + os.getSystemBootTime());
        System.out.println("Up Time: " + os.getSystemUptime() / 1000 + " s");
        return os;
    }
  
    private static void printNetworkInterfaces(List<NetworkIF> list) {
        for (NetworkIF net : list) {
            System.out.format(" Name: %s (%s)%n", net.getName(), net.getDisplayName());
            System.out.format("   MAC Address: %s %n", net.getMacaddr());
            System.out.format("   MTU: %s, Speed: %s %n", net.getMTU(), FormatUtil.formatValue(net.getSpeed(), "bps"));
            System.out.format("   IPv4: %s %n", Arrays.toString(net.getIPv4addr()));
            System.out.format("   IPv6: %s %n", Arrays.toString(net.getIPv6addr()));
            boolean hasData = net.getBytesRecv() > 0 || net.getBytesSent() > 0 || net.getPacketsRecv() > 0
                    || net.getPacketsSent() > 0;
            System.out.format("   Traffic: received %s/%s%s; transmitted %s/%s%s %n",
                    hasData ? net.getPacketsRecv() + " packets" : "?",
                    hasData ? FormatUtil.formatBytes(net.getBytesRecv()) : "?",
                    hasData ? " (" + net.getInErrors() + " err)" : "",
                    hasData ? net.getPacketsSent() + " packets" : "?",
                    hasData ? FormatUtil.formatBytes(net.getBytesSent()) : "?",
                    hasData ? " (" + net.getOutErrors() + " err)" : "");
        }
    }
}

Micrometer

Micrometer 是一款监控数据采集、输出框架,其将自己称为监控界的 slf4j,提供了监控服务商(如适配监控数据存储的时序数据库或者提供完整解决方案的监控框架)无关的 API 门面。了解到它是因为在项目实践过程中采用了 Spring-actuator 这款监控工具,而其中就使用了 Micrometer。Micrometer 也将与 Spring 良好集成作为宣传点,SpringBoot 相关的监控解决方案从 SpringBoot 2.0 开始全面更改为 Micrometer,不过要知道 Micrometer 与 Spring 属于同门,都是 Pivotal 旗下的产品。Micrometer 这么强大,那么获取指标信息不是轻轻松松?但是网上介绍 Micrometer 大都是如何接入 Prometheus,将收集的服务指标数据发布到 Prometheus 中。或者接入其他监控系统,并没有介绍如何获取系统负载指标。那么,就让本文来介绍如何通过 Micrometer 出来获取指标吧。

Micrometer 由 Meter、Registry、Meter Filters等核心概念组成,这些内容你可以看这篇文章,本文不做详细介绍。本文主要介绍如何直接使用 Micrometer。

Micrometer 内置了许多常用的 JVM 和 系统级指标,同时支持自定义指标。以 JvmMemoryMetrics 为例:

public class MetricsExample {
    public static void main(String[] args) {
        // 创建一个简单的MeterRegistry
        MeterRegistry registry = new SimpleMeterRegistry();

        // 绑定JVM Memory Metrics到Registry
        new JvmMemoryMetrics().bindTo(registry);

        // 查询Eden Space的使用量
        double edenSpaceUsed = registry.get("jvm.memory.used")
                .tag("area", "heap")
                .tag("id", "PS Eden Space")
                .gauge()
                .value();

        System.out.println("Eden Space Used: " + edenSpaceUsed / (1024 * 1024) + " MB");
    }
}

可以直接获取 JVM EdenSpace 已使用的大小。也可以自定义 Metrics,例如可以获取到的 IOPS 数值,绑定到 registry 的 iops 属性上,

public class IOPSMetrics {

    public static void main(String[] args) {
        MeterRegistry registry = new SimpleMeterRegistry();
        IOPSMetrics iopsMetrics = new IOPSMetrics(registry);
        iopsMetrics.recordIOPS();
        System.out.println("iops = "
                + registry.get("disk.iops").counter().count());
    }

    private final Counter counter;
    private final HardwareAbstractionLayer hal;

    public IOPSMetrics(MeterRegistry registry) {
        SystemInfo systemInfo = new SystemInfo();
        hal = systemInfo.getHardware();
        this.counter = Counter.builder("disk.iops")
                .description("IOPS (Input/Output Operations Per Second)")
                .register(registry);
    }

    public void recordIOPS() {
        Map<String, Map<String, Long>> IOPSMap = new ConcurrentHashMap<>();
        for (HWDiskStore hwDiskStore : hal.getDiskStores()) {
            String name = hwDiskStore.getName();
            long readsBefore = hwDiskStore.getReads();
            long writesBefore = hwDiskStore.getWrites();
            Map<String, Long> beforeMap = new HashMap<>();
            beforeMap.put("readsBefore", readsBefore);
            beforeMap.put("writesBefore", writesBefore);
            IOPSMap.put(name, beforeMap);
        }

        try {
            TimeUnit.SECONDS.sleep(1); // 1秒内的IOPS
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (HWDiskStore hwDiskStore : hal.getDiskStores()) {
            String name = hwDiskStore.getName();
            Map<String, Long> afterMap = IOPSMap.get(name);
            long readsAfter = hwDiskStore.getReads();
            long writesAfter = hwDiskStore.getWrites();
            afterMap.put("readsAfter", readsAfter);
            afterMap.put("writesAfter", writesAfter);
        }
        long iops = 0;
        for (Map.Entry<String, Map<String, Long>> stringMapEntry : IOPSMap.entrySet()) {
            String name = stringMapEntry.getKey();
            Map<String, Long> value = stringMapEntry.getValue();
            Long readsBefore = value.get("readsBefore");
            Long writesBefore = value.get("writesBefore");
            Long readsAfter = value.get("readsAfter");
            Long writesAfter = value.get("writesAfter");
            iops += (readsAfter - readsBefore) + (writesAfter - writesBefore);
        }
        counter.increment(iops); // 记录次数
    }
}

Micrometer 并不会去获取指标,更像是对获取指标的去做了一层封装,将系统获取指标的能力做了一层收口。统一交给调用方使用,有点像日志库(log4j)与日志门面(slf4j)的关系。如下图(图源《码出高效#p150》):

门面设计模式是面向对象设计模式中的一种,日志框架采用的就是这种模式,它只提供一套接口规范,自身不负责日志功能的实现,目的是让使用者不需要关注细节。使用最广泛的日志门面是 slf4j。日志库具体实现了日志的相关功能,主流的日志库有 log4j、logback。开发者在使用时需要关注所使用的日志库的具体细节。logback 是 log4j 的升级版,且本身就实现了slf4j 的接口。

Micrometer 可以看作是监控指标的门面,JMX 可以看作其中一个日志库。Micrometer 官方也实现了一些 日志适配器,可以直接使用指标,比如 JvmMemoryMetrics,我们也可以自定义一些 Metrics,比如上面的 CustomCounter,再交给 Micrometer 这个“监控门面”统一管理。

可以看到,Micrometer 并不生产指标,它是指标的搬运工。比如要获取 GC 的暂停信息,在 Micrometer 的 io.micrometer.core.instrument.binder.jvm.JvmGcMetrics.GcMetricsNotificationListener#handleNotification 方法处,有获取 GC 信息的代码:

public void handleNotification(Notification notification, Object ref) {
  CompositeData cd = (CompositeData)notification.getUserData();
  GarbageCollectionNotificationInfo notificationInfo = GarbageCollectionNotificationInfo.from(cd);
  String gcCause = notificationInfo.getGcCause();
  String gcAction = notificationInfo.getGcAction();
  GcInfo gcInfo = notificationInfo.getGcInfo();
  long duration = gcInfo.getDuration();
  if (JvmMemory.isConcurrentPhase(gcCause, notificationInfo.getGcName())) {
    Timer.builder("jvm.gc.concurrent.phase.time").tags(JvmGcMetrics.this.tags).tags(new String[]{"action", gcAction, "cause", gcCause}).description("Time spent in concurrent phase").register(this.registry).record(duration, TimeUnit.MILLISECONDS);
  } else {
    Timer.builder("jvm.gc.pause").tags(JvmGcMetrics.this.tags).tags(new String[]{"action", gcAction, "cause", gcCause}).description("Time spent in GC pause").register(this.registry).record(duration, TimeUnit.MILLISECONDS);
  }}

从 GarbageCollectionNotificationInfo 获取 GC 信息。如果想要获取 GC 信息,可以从中借鉴,有如下实现:

public class NotificationExample {
    public static void main(String[] args) throws InterruptedException {
        for (GarbageCollectorMXBean gcBean : ManagementFactory.getGarbageCollectorMXBeans()) {
            System.out.println(gcBean.getName());
            NotificationEmitter emitter = (NotificationEmitter) gcBean;
            NotificationListener notificationListener = new NotificationListener() {
                @Override
                public void handleNotification(Notification notification, Object handback) {
                    if (notification.getType().equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION)) {
                        GarbageCollectionNotificationInfo from = GarbageCollectionNotificationInfo.from((CompositeData) notification.getUserData());
                        GcInfo gcInfo = from.getGcInfo();
                        String gcName = from.getGcName();
                        String gcCause = from.getGcCause();
                        String gcType = from.getGcAction();
                        long duration = gcInfo.getDuration();
                        long startTime = gcInfo.getStartTime();
                        long endTime = gcInfo.getEndTime();
                        System.out.println("gcName: " + gcName +
                                " gcCause: [" + gcCause +
                                "] gcType: [" + gcType +
                                "] duration: " + duration +
                                " ms startTime: " + startTime +
                                " endTime: " + endTime);
                    }
                }
            };

            emitter.addNotificationListener(notificationListener, null, null);
        }
        for (int i = 1; i < 20000000; i++) {
            byte[] bytes = new byte[1024];
            if (i % 2 == 0) {
                bytes = null;
            }
        }
        TimeUnit.MILLISECONDS.sleep(4000);
    }
}

就可以获取 GC 暂停信息了。

获取指标实践

综合评估,还是使用 OSHI 最合适获取系统指标,它有完善的查询接口,单有些数据无法获取时,使用 JMX 补充一些信息。比如 OSHI 无法获取 GC 信息,可以从 JMX 的 GarbageCollectionNotificationInfo 中获取。

CPU

首先是获取 CPU 信息,OSHI 可以获取 CPU 三段(1,5,15 min)的负载(load)与使用率(usage)。

CPU 负载:

  // CPU
  int cpuCount = processor.getLogicalProcessorCount();
  systemMetrics.setCpuCount(cpuCount);
  double[] loadAverage = processor.getSystemLoadAverage(3);
  systemMetrics.setCpuLoad1m(NumberTool.round(loadAverage[0] < 0 ? 0 : loadAverage[0], 2));
  systemMetrics.setCpuLoad5m(NumberTool.round(loadAverage[1] < 0 ? 0 : loadAverage[1], 2));
  systemMetrics.setCpuLoad15m(NumberTool.round(loadAverage[2] < 0 ? 0 : loadAverage[2], 2));
  systemMetrics.setContextSwitches(processor.getContextSwitches());
  systemMetrics.setInterrupts(processor.getInterrupts());
  CpuUsage cpuUsage = getCpuUsage();
  systemMetrics.setCpuUsage(cpuUsage);

CPU 使用率:

private static CpuUsage getCpuUsage(CentralProcessor processor) {
  long[] prevTicks = processor.getSystemCpuLoadTicks();
  // Wait a second
  Util.sleep(1000);
  long[] ticks = processor.getSystemCpuLoadTicks();
  long user = ticks[CentralProcessor.TickType.USER.getIndex()] - prevTicks[CentralProcessor.TickType.USER.getIndex()];
  long nice = ticks[CentralProcessor.TickType.NICE.getIndex()] - prevTicks[CentralProcessor.TickType.NICE.getIndex()];
  long sys = ticks[CentralProcessor.TickType.SYSTEM.getIndex()] - prevTicks[CentralProcessor.TickType.SYSTEM.getIndex()];
  long idle = ticks[CentralProcessor.TickType.IDLE.getIndex()] - prevTicks[CentralProcessor.TickType.IDLE.getIndex()];
  long iowait = ticks[CentralProcessor.TickType.IOWAIT.getIndex()] - prevTicks[CentralProcessor.TickType.IOWAIT.getIndex()];
  long irq = ticks[CentralProcessor.TickType.IRQ.getIndex()] - prevTicks[CentralProcessor.TickType.IRQ.getIndex()];
  long softirq = ticks[CentralProcessor.TickType.SOFTIRQ.getIndex()] - prevTicks[CentralProcessor.TickType.SOFTIRQ.getIndex()];
  long steal = ticks[CentralProcessor.TickType.STEAL.getIndex()] - prevTicks[CentralProcessor.TickType.STEAL.getIndex()];
  long totalCpu = user + nice + sys + idle + iowait + irq + softirq + steal;
  CpuUsage cpuUsage = new CpuUsage();
  cpuUsage.setCpuUsrUsage(NumberTool.div(100 * user, totalCpu, 2));
  cpuUsage.setCpuSysUsage(NumberTool.div(100 * sys, totalCpu, 2));
  cpuUsage.setCpuIdle(NumberTool.div(100 * idle, totalCpu, 2));
  return cpuUsage;
}

内存

OSHI 可以获取系统的内存与线程指标,代码如下:

// 内存
GlobalMemory memory = HAL.getMemory();
systemMetrics.setMemoryTotal(NumberTool.round((double) memory.getTotal() / MB, 2));
systemMetrics.setMemoryAvailable(NumberTool.round((double) memory.getAvailable() / MB, 2));
long swapUsed = memory.getVirtualMemory().getSwapUsed();
long swapTotal = memory.getVirtualMemory().getSwapTotal();
systemMetrics.setSwapUsed(NumberTool.round((double) swapUsed / MB, 2));
systemMetrics.setSwapTotal(NumberTool.round((double) swapTotal / MB, 2));

// 线程
systemMetrics.setProcessCount(OS.getProcessCount());
systemMetrics.setThreadCount(OS.getThreadCount());
FileSystem fileSystem = OS.getFileSystem();

磁盘

OSHI 可以获取系统的文件描述符指标,代码如下:

// 文件描述符
systemMetrics.setOpenFdCount(fileSystem.getOpenFileDescriptors());
systemMetrics.setIops(getIops1s());

IOPS 无法直接获取 IOPS,需要使用 JMX 的 HWDiskStore 先记录读写次数,在获取一秒后的读写次数,相减就可以获取IOPS了,代码如下:

private static long getIops() {
  Map<String, Map<String, Long>> IOPSMap = new ConcurrentHashMap<>();
  for (HWDiskStore hwDiskStore : HAL.getDiskStores()) {
    String name = hwDiskStore.getName();
    long readsBefore = hwDiskStore.getReads();
    long writesBefore = hwDiskStore.getWrites();
    Map<String, Long> beforeMap = new HashMap<>();
    beforeMap.put("readsBefore", readsBefore);
    beforeMap.put("writesBefore", writesBefore);
    IOPSMap.put(name, beforeMap);
  }

  // 记录一秒的读写差
  Util.sleep(1000);

  for (HWDiskStore hwDiskStore : HAL.getDiskStores()) {
    String name = hwDiskStore.getName();
    Map<String, Long> afterMap = IOPSMap.get(name);
    long readsAfter = hwDiskStore.getReads();
    long writesAfter = hwDiskStore.getWrites();
    afterMap.put("readsAfter", readsAfter);
    afterMap.put("writesAfter", writesAfter);
  }
  long iops = 0;
  for (Map.Entry<String, Map<String, Long>> stringMapEntry : IOPSMap.entrySet()) {
    Map<String, Long> value = stringMapEntry.getValue();
    Long readsBefore = value.get("readsBefore");
    Long writesBefore = value.get("writesBefore");
    Long readsAfter = value.get("readsAfter");
    Long writesAfter = value.get("writesAfter");
    iops += (readsAfter - readsBefore) + (writesAfter - writesBefore);
  }
  return iops;
}

网络

OSHI 可以获取系统的网络指标,代码如下:

 public static NetworkMetrics getNetworkMetrics() {
        NetworkMetrics networkMetrics = new NetworkMetrics();
        networkMetrics.setLatency(0.0D);
        networkMetrics.setBandwidth(0.0D);
        for (NetworkIF net : HAL.getNetworkIFs()) {
          // 过滤虚拟网卡
            if (null == net.getIPv4addr()
                    || net.getIPv4addr().length == 0
                    || null == net.getIPv6addr()
                    || net.getIPv6addr().length == 0) {
                continue;
            }
            networkMetrics.setMtu(net.getMTU());
            networkMetrics.setSpeed(net.getSpeed());
            networkMetrics.setMacaddr(net.getMacaddr());
            networkMetrics.setIPv4addr(net.getIPv4addr());
            networkMetrics.setIPv6addr(net.getIPv6addr());
            boolean hasData = net.getBytesRecv() > 0 || net.getBytesSent() > 0 || net.getPacketsRecv() > 0
                    || net.getPacketsSent() > 0;
            networkMetrics.setHasData(hasData);
            networkMetrics.setBytesRecv(net.getBytesRecv());
            networkMetrics.setBytesSent(net.getBytesSent());
            networkMetrics.setPacketsRecv(net.getPacketsRecv());
            networkMetrics.setPacketsSent(net.getPacketsSent());
            networkMetrics.setInErrors(net.getInErrors());
            networkMetrics.setOutErrors(net.getOutErrors());
        }
        return networkMetrics;
    }

JVM

OSHI 可以获取系统的 GC 指标,代码如下:

public static GcMetrics getGcMetrics() {
  GcMetrics gcMetrics = new GcMetrics();
  gcMetrics.setGcPauseTime1m(NumberTool.round(getGcPauseTime1m(), 2));
  gcMetrics.setGcThroughput1m(NumberTool.round(getGcThroughput1m(), 2));
  gcMetrics.setGcCount1m(gcCount);
  gcMetrics.setGcMemoryPoolUsage(0.0D);
  gcMetrics.setGcHeapAllocationRate(NumberTool.round(getAllocationRate(REGISTRY), 2));
  gcMetrics.setGcHeapPromotedRate(NumberTool.round(getPromotedRate(REGISTRY), 2));
  return gcMetrics;
}

private static double getAllocationRate(MeterRegistry registry) {
  double memoryAllocated = registry.get("jvm.gc.memory.allocated").counter().count();
  return memoryAllocated / 1024 / 1024;
}

private static double getPromotedRate(MeterRegistry registry) {
  double memoryPromoted = registry.get("jvm.gc.memory.promoted").counter().count();
  return memoryPromoted / 1024 / 1024;
}

GC 的暂停时间与吞吐要使用 GarbageCollectionNotificationInfo 获取,在计算出1分钟的平均暂停时间与吞吐,代码如下:

/**
* GC 最后更新时间
*/
private static long gcLastTime = 0;
/**
* GC 次数
*/
private static int gcCount = 0;
/**
* GC 计时 Map <time,avgLatency>
*/
static Map<Long, Long> gcTimeMap = new ConcurrentHashMap<>();

public static void init() {

  EXECUTOR_SERVICE.submit(() -> {
    for (GarbageCollectorMXBean gcBean : ManagementFactory.getGarbageCollectorMXBeans()) {
      NotificationEmitter emitter = (NotificationEmitter) gcBean;
      NotificationListener notificationListener = (notification, handback) -> {
        if (notification.getType().equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION)) {
          GarbageCollectionNotificationInfo from = GarbageCollectionNotificationInfo.from((CompositeData) notification.getUserData());
          GcInfo gcInfo = from.getGcInfo();
          long duration = gcInfo.getDuration();
          log.debug("{} GC duration: {}", from.getGcName(), duration);
          gcLastTime = System.currentTimeMillis();
          gcTimeMap.put(gcLastTime, duration);
        }
      };
      emitter.addNotificationListener(notificationListener, null, null);
    }
  });
}

private static double getGcPauseTime1m() {
  long now = System.currentTimeMillis();
  gcTimeMap.entrySet().removeIf((k) -> k.getKey() < now - MINUTE);
  // 去掉一分钟以前的
  gcCount = gcTimeMap.size();
  OptionalDouble average = gcTimeMap.values().stream().mapToLong(l -> l).average();
  return average.orElse(0.0D);
}

private static double getGcThroughput1m() {
  long now = System.currentTimeMillis();
  // 去掉一分钟以前的
  gcTimeMap.entrySet().removeIf((k) -> k.getKey() < now - MINUTE);
  gcCount = gcTimeMap.size();
  long sum = gcTimeMap.values().stream().mapToLong(l -> l).sum();
  return NumberTool.div(60 * 1000 - sum, 60 * 1000, 4) * 100;
}

OSHI 可以获取系统的线程指标,代码如下:

public static ThreadMetrics getThreadMetrics() {
  ThreadMetrics threadMetrics = new ThreadMetrics();
  double liveThreads = REGISTRY.get("jvm.threads.live").gauge().value();
  threadMetrics.setLiveThreads(liveThreads);
  double peakThreads = REGISTRY.get("jvm.threads.peak").gauge().value();
  threadMetrics.setPeakThreads(peakThreads);
  double blockedThreads = REGISTRY.get("jvm.threads.states").tag("state", "blocked").gauge().value();
  threadMetrics.setBlockedThreads(blockedThreads);
  double runnableThreads = REGISTRY.get("jvm.threads.states").tag("state", "runnable").gauge().value();
  threadMetrics.setRunnableThreads(runnableThreads);
  double newThreads = REGISTRY.get("jvm.threads.states").tag("state", "new").gauge().value();
  threadMetrics.setNewThreads(newThreads);
  double timedWaitingThreads = REGISTRY.get("jvm.threads.states").tag("state", "timed-waiting").gauge().value();
  threadMetrics.setTimedWaitingThreads(timedWaitingThreads);
  double terminatedThreads = REGISTRY.get("jvm.threads.states").tag("state", "terminated").gauge().value();
  threadMetrics.setTerminatedThreads(terminatedThreads);
  double waitingThreads = REGISTRY.get("jvm.threads.states").tag("state", "waiting").gauge().value();
  threadMetrics.setWaitingThreads(waitingThreads);
  return threadMetrics;
}

代码见:https://github.com/cosmoswei/hextech/blob/main/src/main/java/com/wei/metrics/PerformanceQueryUtils.java

如何让计算负载

有了上述的系统指标,我们可以获取系统的负载和状态。但是,面对这么多的指标,如何更加科学地体现系统的负载呢?首先,我想到的是通过公式来计算一个负载值,将 CPU 负载、RT、上线时间等因素通过公式加权计算出一个权重值或阈值,用来反映系统的负载。例如,Dubbo 在计算负载时使用了指数加权平均(EWMA)算法,这可以帮助估计变量的局部均值,使得变量的更新不仅依赖当前值,还与一段时间内的历史数据有关。可以参考这篇文章

然而,单一的公式可能无法适应所有情况,尤其是在服务节点的性能存在差异时。在不同的机器上,硬件资源各不相同,比如一些机器的 I/O 性能特别强,具备较高的 IOPS(每秒输入输出操作次数),而另一些机器的 CPU 性能更强,具有更高的处理能力。因此,通过单一算法计算最大并发量的通用性可能较低,计算出来的最大并发量也可能不够准确。而且,现有的这些指标并不能完全体现服务节点的真实负载。那么,还有哪些指标可以更加科学地反映节点负载呢?

我想通过配置化的方式来计算服务器的最大并发量,通过提取一些性能指标,比如 CPU 使用率、网络延迟、线程数等,组合成一个可配置的公式,从而得出服务器的最大并发量。我认为这是一个可行的方案,但问题是,如何进行配置呢?

引入规则引擎

我的想法是,借助一个评分体系来评估系统的最大并发量。如果系统的最大并发量较高,则其得分也会相应较高,反之亦然。根据这个得分,我们可以推算出机器的最大并发量。这个评分该如何得出呢?

电商风控

在电商业务中,有一种技术叫做“风控”,它根据用户的一些属性(如用户ID、设备指纹、购买频次等),给用户评定一个风控分数,进而判断这次交易是否存在风险。这种做法能够帮助电商平台降低运营中的风险,保障交易的安全。

这一思路与我们的需求非常相似:把服务器看作“用户”,而服务器的最大并发量就可以看作“风控评分”。至于计算最大并发量的算法,则可以看作“风控规则”。通过这样的方式,我们可以在开发过程中配置具体的计算算法,最终得到一个代表服务器最大并发量的评分。通过这个分数,我们就能更科学地估算服务器的真实负载能力。风控的核心是利用规则引擎,根据用户的操作行为或属性计算出用户的风控分数,而这些规则是可以由运营团队配置的。那么规则引擎又是什么?跟主题配置驱动有什么关系?接着往下看。

规则引擎

规则引擎由推理引擎发展而来,是一种嵌入在应用程序中的组件,实现了将业务决策从应用程序代码中分离出来,并使用预定义的语义模块编写业务决策。接受数据输入,解释业务规则,并根据业务规则做出业务决策,需要注意的是规则引擎并不是一个具体的技术框架,而是指的一类系统,即业务规则管理系统。目前规则引擎产品有:drools、VisualRules、iLog。有关规则引擎,可以看这篇文章

服务质量(QoS)

QoS(Quality of Service)即服务质量。在有限的带宽资源下,QoS为各种业务分配带宽,为业务提供端到端的服务质量保证。例如,语音、视频和重要的数据应用在网络设备中可以通过配置QoS优先得到服务。

QoS 本来是通信领域中,用在交换机的技术,我们借用它的名字,服务质量评分作为的评价体系的结果。

public class QoS {
    /**
     * QoS 等级,体现的是服务质量的分数
     * 值域 = 1-100
     * 默认值 = 100
     */
    private int qoSLevel = 100;
}

配置化指标实战

有了规则引擎与QoS这个概念,接下来进行实战。

架构

整体的配置化自适应服务架构如下:

应用架构图

代码实战

Step1:引入 drools 的 pom

<!-- 规则引擎  -->
<dependency>
  <groupId>org.drools</groupId>
  <artifactId>drools-core</artifactId>
  <version>7.59.0.Final</version>
</dependency>
<dependency>
  <groupId>org.drools</groupId>
  <artifactId>drools-decisiontables</artifactId>
  <version>7.59.0.Final</version>
</dependency>

Step2:获取服务器的指标

public static Metrics getMetrics0() {
  if (registry == null) {
    initMeterRegistry();
  }
  Metrics metrics = new Metrics();
  MetricsUtils metricsUtils = new MetricsUtils(registry);
  metrics.setSystemMetrics(metricsUtils.getSystemMetrics());
  metrics.setNetworkMetrics(metricsUtils.getNetworkMetrics());
  metrics.setJvmMetrics(metricsUtils.getJvmMetrics());
  return metrics;
}

Step3:规则文件

dialect "java"
rule "defaultRule"

    when
        // 如果阻塞线程的数量大于10
        Metrics(jvmMetrics.getBlockedThreads() < 10)
    then
        // 则将QoS等级设为10
        qoS.setQoSLevel(201);

end

rule "bottomRule"
    when
    then
        // 如果QoS大于100,则将QoS等级设为100
      if (qoS.getQoSLevel() > 100){
          qoS.setQoSLevel(100);
      }
end

Step4:执行规则引擎

 public QoS getQoS(Metrics metrics) {
        QoS qoS = new QoS();

        // 规则文件
        String metricQoSRulePath = PalmxConfig.getMetricQoSRulePath();

        KieSession kieSession = getKieSession(metricQoSRulePath);

        kieSession.setGlobal("qoS", qoS);
        // 插入数据到 KieSession
        kieSession.insert(metrics);
        // 触发规则执行
        kieSession.fireAllRules();
        // 关闭 KieSession
        kieSession.dispose();
        return qoS;
    }

Step5:根据QoS进行限流。

/**
* 自适应控制,通过获取服务质量等级 QoS 来限制流量的百分比
*/
private void adaptiveControl() {
  if (PalmxConfig.getAdaptiveFlowControlEnable()) {
    int localQoSLevel = QoSHandler.getLocalQoSLevelFromCache("FLOW_CONTROL");
    if (localQoSLevel <= 0) {
      this.qps = 0;
    }
    this.qps = (this.qps / localQoSLevel) * 100;
  }
}

Step6:启动效果

2024-12-23 23:21:25.271  INFO --- [pool-7-thread-1] me.xuqu.palmx.qos.QoSHandler             : get local qos level is 40
2024-12-23 23:21:25.271  INFO --- [pool-7-thread-1] me.xuqu.palmx.registry.ZookeeperUpdater  : 正在更新当前主机的节点信息 当前服务 service = me.wei.service.DemoService
2024-12-23 23:21:25.291  INFO --- [pool-7-thread-1] me.xuqu.palmx.registry.ZookeeperUpdater  : 正在更新当前主机的节点信息 当前服务 service = me.wei.service.TestService

详细代码可以参考:https://github.com/cosmoswei/palmx/tree/README.md/v1.0/src/main/java/me/xuqu/palmx/qos

参考文章

  1. Sigar 教程:https://my.oschina.net/mkh/blog/312911
  2. 万字长文详解 Micrometer:https://juejin.cn/post/7051109463180181535
  3. Micrometer 使用介绍:https://www.tony-bro.com/posts/1386774700/index.html
  4. Drools 规则引擎:https://www.cnblogs.com/ityml/p/15993391.html
  5. Dubbo 框架标准监控指标:https://cn.dubbo.apache.org/zh-cn/overview/reference/metrics/standard_metrics/
  6. Drools 规则引擎应用 看这一篇就够了:https://www.cnblogs.com/ityml/p/15993391.html