需求场景

在线文档创建副本。在线文档中包含很多图片文件,通过objectId关联对象存储中的文件对象。对象存储提供了restapi支持文件拷贝功能。
image.png

优化前

通过for循环,在for循环中调用http接口拷贝文件

    @Override
    public RtData serverCopyPrivateObjects(String desFid, List<SourceDesObjectVm> sourceDesObjectList, Long desOwner, Long uid) {
        List<SourceDesObjectVm> objectList = new ArrayList<>();
        for (SourceDesObjectVm sourceDesObjectVm : sourceDesObjectList) {
            try {
                SourceDesObjectVm resObject = serverCopyPrivateObject(desFid, sourceDesObjectVm.getDesObjectId(), desOwner, sourceDesObjectVm.getSourceObjectId(), uid);
                objectList.add(resObject);
            } catch (ObjectCopyException e) {
                e.printStackTrace();
                log.error("对象拷贝异常", e);
                return ResponseBuilder.build(e.getRtCode().getCode(), e.getMessage());
            }
        }
        return ResponseBuilder.success(objectList);
    }

如果总共有150个图片文件,每个图片文件请求耗时2s,则需要300s才能拷贝完成。

线程池优化

任务类

import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.util.Date;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;

/**
 * @author yuwb
 * @description 拷贝对象任务
 * @date 2021.11.29
 */
@Slf4j
public class CopyObjectCalculator implements Callable<SourceDesObjectVm> {
    private String desFid;
    private String desObjectId;
    private Long desOwner;
    private String sourceObjectId;
    private Long uid;
    private OssObjectRepository ossObjectRepository = (OssObjectRepository) SpringContextUtil.getBean("ossObjectRepository");
    private CsspConfig csspConfig = (CsspConfig) SpringContextUtil.getBean("csspConfig");
    private OssServiceImpl ossServiceImpl = (OssServiceImpl) SpringContextUtil.getBean("ossServiceImpl");

    public CopyObjectCalculator(String desFid, String desObjectId, Long desOwner, String sourceObjectId, Long uid) {
        this.desFid = desFid;
        this.desObjectId = desObjectId;
        this.desOwner = desOwner;
        this.sourceObjectId = sourceObjectId;
        this.uid = uid;
    }

    @Override
    public SourceDesObjectVm call() throws ObjectCopyException {
        
        try {
            //do copy
            CopyObjectResult copyObjectResult = csspClient.copyObject(sourceContainer, sourceObjectId, desObjectId);
            log.info("copy object res:" + JSON.toJSONString(copyObjectResult));
            log.info("拷贝对象完成 sourceContainer:{}, sourceObjectId:{}, desObjectId:{}", sourceContainer, sourceObjectId, desObjectId);
            //插入记录
            //...
        } catch (CSSPException e) {
            e.printStackTrace();
            log.error("拷贝对象异常{}", "sourceContainer:" + sourceContainer + " sourceObjectId:" + sourceObjectId + " desObjectId:" + desObjectId, desOwner, e);
            throw new ObjectCopyException(RtCode.REQUEST_FAIL, "拷贝对象CSSP异常");
        } catch (IOException e) {
            log.error("拷贝对象异常{}", "sourceContainer:" + sourceContainer + " sourceObjectId:" + sourceObjectId + " desObjectId:" + desObjectId, desOwner, e);
            e.printStackTrace();
            throw new ObjectCopyException(RtCode.REQUEST_FAIL, "拷贝对象IO异常");
        }
    }
}

创建线程池

    /**
     * io密集型设置cpu核数*2 (网络连接 数据库读写)
     */
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);

使用线程池处理拷贝任务

@Override
    public RtData serverCopyPrivateObjects(String desFid, List<SourceDesObjectVm> sourceDesObjectList, Long desOwner, Long uid) {
        List<SourceDesObjectVm> objectList = new ArrayList<>();
        //接收 feature对象
        List<Future<SourceDesObjectVm>> featureList = new ArrayList<Future<SourceDesObjectVm>>();
        for (SourceDesObjectVm sourceDesObjectVm : sourceDesObjectList) {
            CopyObjectCalculator copyObjectCalculator = new CopyObjectCalculator(desFid, sourceDesObjectVm.getDesObjectId(), desOwner, sourceDesObjectVm.getSourceObjectId(), uid);
            Future<SourceDesObjectVm> res = executor.submit(copyObjectCalculator);
            featureList.add(res);
        }
        long start = System.currentTimeMillis();
        int successTaskNumber = 0;
        do {
            successTaskNumber = 0;
            for (int j = 0; j < featureList.size(); j++) {
                Future<SourceDesObjectVm> result = featureList.get(j);
                if(result.isDone()){
                    successTaskNumber = successTaskNumber+1;
                }
            }
            try {
                TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if ((System.currentTimeMillis() - start) > 60000) {
                //超过60秒不再处理
                for (Future<SourceDesObjectVm> future : featureList) {
                    if(!future.isDone()){
                        //如果还在执行就打断
                        future.cancel(true);
                    }
                }
                return ResponseBuilder.serverError("拷贝对象超时");
            }
        } while (successTaskNumber < featureList.size());

        for (Future<SourceDesObjectVm> future : featureList) {
            try {
                objectList.add(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.error("对象拷贝异常", e);
                return ResponseBuilder.build(RtCode.SERVER_ERROR.getCode(), e.getMessage());
            } catch (ExecutionException e) {
                e.printStackTrace();
                if (e.getCause() instanceof ObjectCopyException) {
                    //捕获业务自定义异常抛出
                    log.error("对象拷贝异常", e);
                    ObjectCopyException ce = (ObjectCopyException) e.getCause();
                    return ResponseBuilder.build(ce.getRtCode().getCode(), ce.getMessage());
                } else {
                    log.error("对象拷贝异常", e);
                    return ResponseBuilder.build(RtCode.SERVER_ERROR.getCode(), e.getMessage());
                }

            }
        }
        return ResponseBuilder.success(objectList);
    }

实际测试:

24核服务器一个服务节点,170张图片拷贝完成时间由63秒优化至3s。

处理细节:

通过future对象获取拷贝请求是否处理完成。
在一个图片文件拷贝异常时,打断其他拷贝请求任务,直接返回结果,释放线程池线程资源。

主要影响因素:

服务器核数,线程池配置、图片数量和大小、对象存储服务处理拷贝请求速度