代码重构:优化查询性能并解决超时问题- 引入线程池来处理并发查询请求,以提升查询性能。

- 使用AtomicInteger保证实例数量统计的安全性。
- 利用CountDownLatch确保所有并行查询任务完成后返回结果。
- 主要针对集群和实例数量查询接口的性能优化,解决超时问题。
This commit is contained in:
kyriewhluo 2024-08-02 11:01:42 +08:00
parent 6320272406
commit c14b127911
6 changed files with 442 additions and 0 deletions

175
Tc.md Normal file
View File

@ -0,0 +1,175 @@
```java
public List<Instance> findInstanceList(List<Instance> instanceList, TsfBaseEntity entity) {
if (CollectionUtils.isEmpty(instanceList)) {
return new ArrayList<>();
}
List<Instance> instanceRunList = new ArrayList<>();
//虚机节点
long start, end;
start = System.currentTimeMillis();
List<Instance> instanceCVMList = instanceList.stream().filter(item ->
ClusterConstant.CLUSTER_TYPE.CVM.equals(item.getClusterType())).collect(Collectors.toList());
List<String> instanceIdList = instanceCVMList.stream().map(Instance::getInstanceId).collect(Collectors.toList());
List<InstanceAgent> instanceAgentList = instanceAgentJpaRepository.findByInstanceIdIn(instanceIdList);
Map<String, InstanceAgent> instanceAgentMap = TsfMapUtil.list2HashMap(instanceAgentList, "getInstanceId");
if (!CollectionUtils.isEmpty(instanceAgentMap)) {
for (Instance ins : instanceCVMList) {
InstanceAgent instanceAgent = instanceAgentMap.get(ins.getInstanceId());
if (instanceAgent != null) {
Long currentTimestamp = System.currentTimeMillis();
Long heartbeatTimestamp = instanceAgent.getUpdateTime() != null ? instanceAgent.getUpdateTime().getTime() : 0L;
if (heartbeatTimestamp + 18 * 60 * 1000 > currentTimestamp) {
instanceRunList.add(ins);
}
}
}
}
end = System.currentTimeMillis();
logger.info("call cvm clusterInstances cost {}ms", (end - start));
//容器节点
start = System.currentTimeMillis();
List<Instance> instanceCCSList = instanceList.stream().filter(item ->
ClusterConstant.CLUSTER_TYPE.CCS.equals(item.getClusterType())).collect(Collectors.toList());
Map<String, List<Instance>> instanceCcsMap = instanceCCSList.stream().collect(Collectors.groupingBy(Instance::getClusterId));
instanceCcsMap.forEach((clusterId,instanceCcsList) -> {
Cluster cluster = new Cluster();
cluster.transferBase(entity);
cluster.setClusterId(clusterId);
List<Instance> ccsRunInstanceList = commonContainerInstanceService.findRunInstanceList(cluster, instanceCcsList);
instanceRunList.addAll(ccsRunInstanceList);
});
end = System.currentTimeMillis();
logger.info("call ccs clusterInstances cost {}ms", (end - start));
return instanceRunList;
}
```
117789438
CountDownLatch 的使用
```java
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
final int numberOfTasks = 3; // 假设有3个任务需要完成
CountDownLatch latch = new CountDownLatch(numberOfTasks);
for (int i = 0; i < numberOfTasks; i++) {
new Thread(() -> {
System.out.println("任务 " + Thread.currentThread().getName() + " 开始执行...");
// 模拟任务执行需要一些时间
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务 " + Thread.currentThread().getName() + " 执行完成");
latch.countDown(); // 任务完成计数减1
}).start();
}
System.out.println("主线程等待所有任务完成...");
latch.await(); // 主线程等待所有任务完成
System.out.println("所有任务已完成,主线程继续执行...");
}
}
```
**使用AtomicInteger、CountDownLatch、线程池的并发方式优化查询集群数量接口和实例数量资源接口的性能解决超时问题。在查询实例或集群数量过多时查询DB还需要查询容器平台、会导致查询时延达到15s秒**
使用redis封装避免短时间缓存击穿
线程工厂
```java
package com.tencent.tsf.resource.common.util;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public CustomThreadFactory(String prefix) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = prefix + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
```
线程池
```java
private ExecutorService overviewResourceUsageExecutor =
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2,0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2048),
new CustomThreadFactory("overviewResourceUsageThreadPool"),
new ThreadPoolExecutor.AbortPolicy());
```
固定线程数量的线程池我是8核的cpu此时能用8个cpu处理16个线程当线程满了时就进入阻塞队列ArrayBlockingQueue
1. 核心线程数corePoolSizeRuntime.getRuntime().availableProcessors() * 2。这意味着线程池会创建一个核心线程数等于当前系统可用处理器的两倍。这样的设置通常是为了确保线程池能够充分利用系统资源同时避免线程过多导致上下文切换开销过大。
2. 最大线程数maximumPoolSize同样设置为Runtime.getRuntime().availableProcessors() * 2。这意味着线程池允许存在的最大线程数也是系统可用处理器的两倍。由于核心线程数和最大线程数相等这个线程池实际上是一个固定大小的线程池它不会根据任务的多少动态地增加或减少线程数量。
3. 线程空闲时间keepAliveTime设置为0秒。这意味着线程一旦空闲下来就会被立即回收不会等待其他任务的到来。这个设置适用于任务持续不断地到来的场景可以节省资源。
4. 时间单位unit设置为TimeUnit.SECONDS表示线程空闲时间的单位是秒。
5. 工作队列workQueue使用ArrayBlockingQueue作为工作队列容量为2048。这意味着当线程池中的线程都在忙碌时新来的任务会进入这个队列等待执行。队列的容量较大可以容纳较多的待执行任务有助于减少线程创建和销毁的频率。
6. 线程工厂threadFactory使用CustomThreadFactory自定义线程工厂来创建线程。这样可以自定义线程的名称、优先级等属性便于管理和调试。
7. 拒绝策略RejectedExecutionHandler使用ThreadPoolExecutor.AbortPolicy()作为拒绝策略。当线程池和工作队列都满时新提交的任务会导致抛出RejectedExecutionException异常。这种策略比较直接可以让调用者感知到任务无法执行的情况并进行相应处理。
接口超时超过15s了
【此接口内部有一个循环查询逻辑循环里面需要查询DB还需要查询容器平台故性能会随着用户集群的增多而下降】
两个接口超时的原因基本是一样的:
1、DescribeGroupResourceUsage 首先会查询出所有的集群,包括虚拟机集群和容器集群,之后再将所有的集群进行遍历:
如果是容器集群先根据集群ID在DB里面查询出关联的所有group列表然后就会调用apiserver获取所有的pod信息然后累计计算部署组中的pod个数和健康状况。
如果是虚拟机集群先根据集群ID在DB里面查询出关联的所有group列表然后再去调用masterapi的相关接口获取group的实例信息最后再统计。
最终再将所有集群计算出来的数据进行累加,这里的耗时体现在,每个集群的查询耗时是串行累加的,不是并行,所以集群数量越多,累加时间肯定越大。
2、DescribeInstanceResourceUsage 首先会查询出所有的集群,包括虚拟机集群和容器集群,之后再将所有的集群进行遍历:
如果是容器集群先根据集群ID在DB里面查询出关联的所有group列表然后就会调用apiserver获取所有的pod信息然后累计计算部署组中的pod个数。
如果是虚拟机集群根据clusterID去调用masterapi的相关接口获取集群下面的instance列表然后再统计各个维度的instance个数。
然后再将所有集群计算出来的数据进行累加,这里的耗时体现在,每个集群的查询耗时是串行累加的,不是并行,所以集群数量越多,累加时间肯定越大。
最后还需要统计所有容器集群的健康状态此时需要调用TKE的接口去批量查询容器集群健康状态。
- 优化策略:将串行查询优化为并行查询
- 问题: 在查询实例或集群数量过多时查询DB还需要查询容器平台、会导致查询时延达到15s秒
- 原因查询出所有的集群包括虚拟机集群和容器集群之后再将所有的集群进行遍历如果是容器集群先根据集群ID在DB里面查询出关联的所有group列表然后就会调用apiserver获取所有的pod信息然后累计计算部署组中的pod个数和健康状况
如果是虚拟机集群先根据集群ID在DB里面查询出关联的所有group列表然后再去调用masterapi的相关接口获取group的实例信息最后再统计。
最终再将所有集群计算出来的数据进行累加,这里的耗时体现在,每个集群的查询耗时是串行累加的,不是并行,所以集群数量越多,累加时间肯定越大。
- 解决引入线程池来并发处理查询请求使用AtomicInteger来安全地统计实例数量并利用CountDownLatch确保所有并发任务完成后才返回结果从而提高了接口的并发处理能力并解决了超时问题。

View File

@ -0,0 +1,68 @@
package cn.whaifree.redo.redo_all_240721;
import cn.whaifree.leetCode.model.TreeNode;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
public class LeetCode145 {
public static void main(String[] args) {
TreeNode root = TreeNode.constructTreeByArray(1, 2, 3, 4, 5);
TreeNode.printTree(root);
postorderTraversal(root).forEach(
(x) -> {
System.out.println(x);
}
);
}
public static List<Integer> postorderTraversal1(TreeNode root) {
List<Integer> res = new LinkedList<>();
if (root == null) {
return res;
}
Deque<TreeNode> stack = new LinkedList<>();
stack.push(root);
while (!stack.isEmpty()) {
TreeNode pop = stack.pop();
if (pop.left != null) {
stack.push(pop.left);
}
if (pop.right != null) {
stack.push(pop.right);
}
res.add(pop.val);
}
Collections.reverse(res);
return res;
}
public static List<Integer> postorderTraversal(TreeNode root) {
List<Integer> res = new LinkedList<>();
if (root == null) {
return res;
}
Deque<TreeNode> stack = new LinkedList<>();
stack.push(root);
while (!stack.isEmpty()) {
TreeNode pop = stack.pop();
if (pop == null) {
res.add(stack.pop().val);
} else {
stack.push(pop);
stack.push(null);
if (pop.right != null) {
stack.push(pop.right);
}
if (pop.left != null) {
stack.push(pop.left);
}
}
}
return res;
}
}

View File

@ -0,0 +1,34 @@
package cn.whaifree.redo.redo_all_240721;
import cn.whaifree.leetCode.model.TreeNode;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
public class LeetCode199 {
public static void main(String[] args) {
System.out.println(rightSideView(TreeNode.constructTreeByArray(1,2,3,null,5,null,4)));
}
public static List<Integer> rightSideView(TreeNode root) {
List<Integer> res = new LinkedList<>();
if(root == null){
return res;
}
level(res,root,0);
return res;
}
public static void level(List<Integer> res,TreeNode root,int level){
if (root==null){
return;
}
if (res.size()==level){
res.add(level,root.val);
}
level(res, root.right, level+1);
level(res, root.left, level+1);
}
}

View File

@ -0,0 +1,60 @@
package cn.whaifree.tech.thread;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class QueryTask implements Runnable {
private final String queryId;
private final int sleepTime; // 模拟查询耗时
private static final AtomicInteger counter = new AtomicInteger(0); // 计数器
private static final CountDownLatch latch = new CountDownLatch(10); // 计数器锁存器
public QueryTask(String queryId, int sleepTime) {
this.queryId = queryId;
this.sleepTime = sleepTime;
}
@Override
public void run() {
try {
Thread.sleep(sleepTime); // 模拟查询耗时
System.out.println("查询 " + queryId + " 完成");
counter.incrementAndGet(); // 安全地增加计数器
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 完成一个任务计数器减一
}
}
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS, // 存活时间单位
new LinkedBlockingQueue<>() // 任务队列
);
// 提交任务
for (int i = 0; i < 10; i++) {
executor.submit(new QueryTask("Query" + i, 3000));
}
/**
* latch.await开始
* 执行效果5个核心线程先提交等待3秒后完成并对AtomicInteger counter依次++counter此时为5
* 下一波5个线程再提交latch此时满足10个await结束
*/
// 等待所有任务完成
latch.await();
// 关闭线程池
executor.shutdown();
// 输出结果
System.out.println("所有查询已完成,查询总数:" + counter.get());
}
}

View File

@ -0,0 +1,44 @@
package cn.whaifree.test;
public class ThreadDemo1 {
static Object lock = new Object();
static int num = 0;
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 50; i++) {
synchronized (lock){
while (num%2!=0){
try {
lock.wait(); // wait会释放锁
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
num++;
System.out.println("a");
lock.notifyAll();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
synchronized (lock){ // wait释放锁后这个线程就能拿到锁
while (num%2!=1){
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
num++;
System.out.println("b");
lock.notifyAll();
}
}
}).start();
}
}

View File

@ -0,0 +1,61 @@
package cn.whaifree.test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Supplier;
public class ss {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Object o = new Object();
Object o1 = new Object();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (o) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (o1) {
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (o1) {
}
}
}).start();
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
}
});
CompletableFuture<Integer> integerCompletableFuture2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 2;
}
});
System.out.println(integerCompletableFuture.get()+integerCompletableFuture2.get());
CompletableFuture<Integer> integerCompletableFuture1 = integerCompletableFuture.thenCombine(integerCompletableFuture2, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) {
return integer + integer2;
}
});
System.out.println(integerCompletableFuture1.get());
}
}