LC/Tc.md
kyriewhluo c14b127911 代码重构:优化查询性能并解决超时问题- 引入线程池来处理并发查询请求,以提升查询性能。
- 使用AtomicInteger保证实例数量统计的安全性。
- 利用CountDownLatch确保所有并行查询任务完成后返回结果。
- 主要针对集群和实例数量查询接口的性能优化,解决超时问题。
2024-08-02 11:01:42 +08:00

10 KiB
Raw Blame History

    public List<Instance> findInstanceList(List<Instance> instanceList, TsfBaseEntity entity) {
    if (CollectionUtils.isEmpty(instanceList)) {
        return new ArrayList<>();
    }
    List<Instance> instanceRunList = new ArrayList<>();
    //虚机节点
    long start, end;
    start = System.currentTimeMillis();
    List<Instance> instanceCVMList = instanceList.stream().filter(item ->
            ClusterConstant.CLUSTER_TYPE.CVM.equals(item.getClusterType())).collect(Collectors.toList());
    List<String> instanceIdList = instanceCVMList.stream().map(Instance::getInstanceId).collect(Collectors.toList());
    List<InstanceAgent> instanceAgentList = instanceAgentJpaRepository.findByInstanceIdIn(instanceIdList);
    Map<String, InstanceAgent> instanceAgentMap = TsfMapUtil.list2HashMap(instanceAgentList, "getInstanceId");
    if (!CollectionUtils.isEmpty(instanceAgentMap)) {
        for (Instance ins : instanceCVMList) {
            InstanceAgent instanceAgent = instanceAgentMap.get(ins.getInstanceId());
            if (instanceAgent != null) {
                Long currentTimestamp = System.currentTimeMillis();
                Long heartbeatTimestamp = instanceAgent.getUpdateTime() != null ? instanceAgent.getUpdateTime().getTime() : 0L;
                if (heartbeatTimestamp + 18 * 60 * 1000 > currentTimestamp) {
                    instanceRunList.add(ins);
                }
            }
        }
    }
    end = System.currentTimeMillis();
    logger.info("call cvm clusterInstances cost {}ms", (end - start));

    //容器节点
    start = System.currentTimeMillis();
    List<Instance> instanceCCSList = instanceList.stream().filter(item ->
            ClusterConstant.CLUSTER_TYPE.CCS.equals(item.getClusterType())).collect(Collectors.toList());
    Map<String, List<Instance>> instanceCcsMap = instanceCCSList.stream().collect(Collectors.groupingBy(Instance::getClusterId));
    instanceCcsMap.forEach((clusterId,instanceCcsList) -> {
        Cluster cluster = new Cluster();
        cluster.transferBase(entity);
        cluster.setClusterId(clusterId);
        List<Instance> ccsRunInstanceList = commonContainerInstanceService.findRunInstanceList(cluster, instanceCcsList);
        instanceRunList.addAll(ccsRunInstanceList);
    });
    end = System.currentTimeMillis();
    logger.info("call ccs clusterInstances cost {}ms", (end - start));
    return instanceRunList;
}

117789438

CountDownLatch 的使用

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        final int numberOfTasks = 3; // 假设有3个任务需要完成
        CountDownLatch latch = new CountDownLatch(numberOfTasks);

        for (int i = 0; i < numberOfTasks; i++) {
            new Thread(() -> {
                System.out.println("任务 " + Thread.currentThread().getName() + " 开始执行...");
                // 模拟任务执行需要一些时间
                try {
                    Thread.sleep((long) (Math.random() * 1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务 " + Thread.currentThread().getName() + " 执行完成");
                latch.countDown(); // 任务完成计数减1
            }).start();
        }

        System.out.println("主线程等待所有任务完成...");
        latch.await(); // 主线程等待所有任务完成
        System.out.println("所有任务已完成,主线程继续执行...");
    }
}

使用AtomicInteger、CountDownLatch、线程池的并发方式优化查询集群数量接口和实例数量资源接口的性能解决超时问题。在查询实例或集群数量过多时查询DB还需要查询容器平台、会导致查询时延达到15s秒 使用redis封装避免短时间缓存击穿

线程工厂

package com.tencent.tsf.resource.common.util;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadFactory implements ThreadFactory {
	private final ThreadGroup group;
	private final AtomicInteger threadNumber = new AtomicInteger(1);
	private final String namePrefix;

	public CustomThreadFactory(String prefix) {
		 SecurityManager s = System.getSecurityManager();
         group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
         namePrefix = prefix + "-thread-";
	}

	@Override
	public Thread newThread(Runnable r) {
		Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
		if (t.isDaemon())
			t.setDaemon(false);
		if (t.getPriority() != Thread.NORM_PRIORITY)
			t.setPriority(Thread.NORM_PRIORITY);
		return t;
	}

}

线程池

private ExecutorService overviewResourceUsageExecutor =
            new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2,0, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(2048),
                    new CustomThreadFactory("overviewResourceUsageThreadPool"),
                    new ThreadPoolExecutor.AbortPolicy());

固定线程数量的线程池我是8核的cpu此时能用8个cpu处理16个线程当线程满了时就进入阻塞队列ArrayBlockingQueue

  1. 核心线程数corePoolSizeRuntime.getRuntime().availableProcessors() * 2。这意味着线程池会创建一个核心线程数等于当前系统可用处理器的两倍。这样的设置通常是为了确保线程池能够充分利用系统资源同时避免线程过多导致上下文切换开销过大。
  2. 最大线程数maximumPoolSize同样设置为Runtime.getRuntime().availableProcessors() * 2。这意味着线程池允许存在的最大线程数也是系统可用处理器的两倍。由于核心线程数和最大线程数相等这个线程池实际上是一个固定大小的线程池它不会根据任务的多少动态地增加或减少线程数量。
  3. 线程空闲时间keepAliveTime设置为0秒。这意味着线程一旦空闲下来就会被立即回收不会等待其他任务的到来。这个设置适用于任务持续不断地到来的场景可以节省资源。
  4. 时间单位unit设置为TimeUnit.SECONDS表示线程空闲时间的单位是秒。
  5. 工作队列workQueue使用ArrayBlockingQueue作为工作队列容量为2048。这意味着当线程池中的线程都在忙碌时新来的任务会进入这个队列等待执行。队列的容量较大可以容纳较多的待执行任务有助于减少线程创建和销毁的频率。
  6. 线程工厂threadFactory使用CustomThreadFactory自定义线程工厂来创建线程。这样可以自定义线程的名称、优先级等属性便于管理和调试。
  7. 拒绝策略RejectedExecutionHandler使用ThreadPoolExecutor.AbortPolicy()作为拒绝策略。当线程池和工作队列都满时新提交的任务会导致抛出RejectedExecutionException异常。这种策略比较直接可以让调用者感知到任务无法执行的情况并进行相应处理。

接口超时超过15s了 【此接口内部有一个循环查询逻辑循环里面需要查询DB还需要查询容器平台故性能会随着用户集群的增多而下降】

两个接口超时的原因基本是一样的:

1、DescribeGroupResourceUsage 首先会查询出所有的集群,包括虚拟机集群和容器集群,之后再将所有的集群进行遍历:

如果是容器集群先根据集群ID在DB里面查询出关联的所有group列表然后就会调用apiserver获取所有的pod信息然后累计计算部署组中的pod个数和健康状况。

如果是虚拟机集群先根据集群ID在DB里面查询出关联的所有group列表然后再去调用masterapi的相关接口获取group的实例信息最后再统计。

最终再将所有集群计算出来的数据进行累加,这里的耗时体现在,每个集群的查询耗时是串行累加的,不是并行,所以集群数量越多,累加时间肯定越大。

2、DescribeInstanceResourceUsage 首先会查询出所有的集群,包括虚拟机集群和容器集群,之后再将所有的集群进行遍历:

如果是容器集群先根据集群ID在DB里面查询出关联的所有group列表然后就会调用apiserver获取所有的pod信息然后累计计算部署组中的pod个数。

如果是虚拟机集群根据clusterID去调用masterapi的相关接口获取集群下面的instance列表然后再统计各个维度的instance个数。

然后再将所有集群计算出来的数据进行累加,这里的耗时体现在,每个集群的查询耗时是串行累加的,不是并行,所以集群数量越多,累加时间肯定越大。

最后还需要统计所有容器集群的健康状态此时需要调用TKE的接口去批量查询容器集群健康状态。

  • 优化策略:将串行查询优化为并行查询

  • 问题: 在查询实例或集群数量过多时查询DB还需要查询容器平台、会导致查询时延达到15s秒

  • 原因查询出所有的集群包括虚拟机集群和容器集群之后再将所有的集群进行遍历如果是容器集群先根据集群ID在DB里面查询出关联的所有group列表然后就会调用apiserver获取所有的pod信息然后累计计算部署组中的pod个数和健康状况 如果是虚拟机集群先根据集群ID在DB里面查询出关联的所有group列表然后再去调用masterapi的相关接口获取group的实例信息最后再统计。 最终再将所有集群计算出来的数据进行累加,这里的耗时体现在,每个集群的查询耗时是串行累加的,不是并行,所以集群数量越多,累加时间肯定越大。

  • 解决引入线程池来并发处理查询请求使用AtomicInteger来安全地统计实例数量并利用CountDownLatch确保所有并发任务完成后才返回结果从而提高了接口的并发处理能力并解决了超时问题。