需求场景
在线文档创建副本。在线文档中包含很多图片文件,通过objectId关联对象存储中的文件对象。对象存储提供了restapi支持文件拷贝功能。
优化前
通过for循环,在for循环中调用http接口拷贝文件
@Override
public RtData serverCopyPrivateObjects(String desFid, List<SourceDesObjectVm> sourceDesObjectList, Long desOwner, Long uid) {
List<SourceDesObjectVm> objectList = new ArrayList<>();
for (SourceDesObjectVm sourceDesObjectVm : sourceDesObjectList) {
try {
SourceDesObjectVm resObject = serverCopyPrivateObject(desFid, sourceDesObjectVm.getDesObjectId(), desOwner, sourceDesObjectVm.getSourceObjectId(), uid);
objectList.add(resObject);
} catch (ObjectCopyException e) {
e.printStackTrace();
log.error("对象拷贝异常", e);
return ResponseBuilder.build(e.getRtCode().getCode(), e.getMessage());
}
}
return ResponseBuilder.success(objectList);
}
如果总共有150个图片文件,每个图片文件请求耗时2s,则需要300s才能拷贝完成。
线程池优化
任务类
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.util.Date;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
/**
* @author yuwb
* @description 拷贝对象任务
* @date 2021.11.29
*/
@Slf4j
public class CopyObjectCalculator implements Callable<SourceDesObjectVm> {
private String desFid;
private String desObjectId;
private Long desOwner;
private String sourceObjectId;
private Long uid;
private OssObjectRepository ossObjectRepository = (OssObjectRepository) SpringContextUtil.getBean("ossObjectRepository");
private CsspConfig csspConfig = (CsspConfig) SpringContextUtil.getBean("csspConfig");
private OssServiceImpl ossServiceImpl = (OssServiceImpl) SpringContextUtil.getBean("ossServiceImpl");
public CopyObjectCalculator(String desFid, String desObjectId, Long desOwner, String sourceObjectId, Long uid) {
this.desFid = desFid;
this.desObjectId = desObjectId;
this.desOwner = desOwner;
this.sourceObjectId = sourceObjectId;
this.uid = uid;
}
@Override
public SourceDesObjectVm call() throws ObjectCopyException {
try {
//do copy
CopyObjectResult copyObjectResult = csspClient.copyObject(sourceContainer, sourceObjectId, desObjectId);
log.info("copy object res:" + JSON.toJSONString(copyObjectResult));
log.info("拷贝对象完成 sourceContainer:{}, sourceObjectId:{}, desObjectId:{}", sourceContainer, sourceObjectId, desObjectId);
//插入记录
//...
} catch (CSSPException e) {
e.printStackTrace();
log.error("拷贝对象异常{}", "sourceContainer:" + sourceContainer + " sourceObjectId:" + sourceObjectId + " desObjectId:" + desObjectId, desOwner, e);
throw new ObjectCopyException(RtCode.REQUEST_FAIL, "拷贝对象CSSP异常");
} catch (IOException e) {
log.error("拷贝对象异常{}", "sourceContainer:" + sourceContainer + " sourceObjectId:" + sourceObjectId + " desObjectId:" + desObjectId, desOwner, e);
e.printStackTrace();
throw new ObjectCopyException(RtCode.REQUEST_FAIL, "拷贝对象IO异常");
}
}
}
创建线程池
/**
* io密集型设置cpu核数*2 (网络连接 数据库读写)
*/
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
使用线程池处理拷贝任务
@Override
public RtData serverCopyPrivateObjects(String desFid, List<SourceDesObjectVm> sourceDesObjectList, Long desOwner, Long uid) {
List<SourceDesObjectVm> objectList = new ArrayList<>();
//接收 feature对象
List<Future<SourceDesObjectVm>> featureList = new ArrayList<Future<SourceDesObjectVm>>();
for (SourceDesObjectVm sourceDesObjectVm : sourceDesObjectList) {
CopyObjectCalculator copyObjectCalculator = new CopyObjectCalculator(desFid, sourceDesObjectVm.getDesObjectId(), desOwner, sourceDesObjectVm.getSourceObjectId(), uid);
Future<SourceDesObjectVm> res = executor.submit(copyObjectCalculator);
featureList.add(res);
}
long start = System.currentTimeMillis();
int successTaskNumber = 0;
do {
successTaskNumber = 0;
for (int j = 0; j < featureList.size(); j++) {
Future<SourceDesObjectVm> result = featureList.get(j);
if(result.isDone()){
successTaskNumber = successTaskNumber+1;
}
}
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
if ((System.currentTimeMillis() - start) > 60000) {
//超过60秒不再处理
for (Future<SourceDesObjectVm> future : featureList) {
if(!future.isDone()){
//如果还在执行就打断
future.cancel(true);
}
}
return ResponseBuilder.serverError("拷贝对象超时");
}
} while (successTaskNumber < featureList.size());
for (Future<SourceDesObjectVm> future : featureList) {
try {
objectList.add(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
log.error("对象拷贝异常", e);
return ResponseBuilder.build(RtCode.SERVER_ERROR.getCode(), e.getMessage());
} catch (ExecutionException e) {
e.printStackTrace();
if (e.getCause() instanceof ObjectCopyException) {
//捕获业务自定义异常抛出
log.error("对象拷贝异常", e);
ObjectCopyException ce = (ObjectCopyException) e.getCause();
return ResponseBuilder.build(ce.getRtCode().getCode(), ce.getMessage());
} else {
log.error("对象拷贝异常", e);
return ResponseBuilder.build(RtCode.SERVER_ERROR.getCode(), e.getMessage());
}
}
}
return ResponseBuilder.success(objectList);
}
实际测试:
24核服务器一个服务节点,170张图片拷贝完成时间由63秒优化至3s。
处理细节:
通过future对象获取拷贝请求是否处理完成。
在一个图片文件拷贝异常时,打断其他拷贝请求任务,直接返回结果,释放线程池线程资源。
主要影响因素:
服务器核数,线程池配置、图片数量和大小、对象存储服务处理拷贝请求速度