背景

TAP项目上传镜像有的镜像有几百G大小,直接上传会导致很多问题,如:浏览器内存不够、网络问题导致中断、nginx连接超时、后端文件缓存导致内存不够等,为此我们需要用到分片上传技术

实现

思路:先对文件切片进行hash计算,利用hash和文件名校验文件是否存在,不存在的模拟线程池并发上传分片

主要代码逻辑

const uploadBigFile = async (option) => {
    const { file } = option;
    const fileData = state.fileList.find((item) => item.uid === file.uid);

    //生成文件切片和MDS
    const { md5: md5Value, chunkList } = await getWorkMd5(file, sliceSize);
    const { id, merge, fileSliceDOS, path } = await initFile({
      md5Value,
      nameCn: file.name,
    });

    if (merge === 0) {
      const data = state.fileList.find((item) => item.uid == option.file.uid);
      data.fileId = id;
      data.path = path;
      data.totalMd5 = md5Value;
      return true;
    }

    // 上传分片 挂载数据
    fileData.fileId = id;
    fileData.path = path;
    chunkList.forEach((chunk) => {
      chunk.fileId = id;
      chunk.totalMd5 = md5Value;
      chunk.path = path;
    });
    const uploadedChunks = !fileSliceDOS
      ? []
      : fileSliceDOS.map((slice) => slice.currentIndex);
    const totalSize = fileData.size;
    const { stopUpload, result } = uploadChunks(
      chunkList,
      uploadedChunks,
      3,
      option.onProgress,
      totalSize
    );
    fileData.stopUpload = stopUpload;
    //监听错误信息停止上传
    result.catch((error) => {
      console.log('error', error);
      stopUpload();
    });
    return result;
  };

worker线程计算md5

调用方法

  const getWorkMd5 = (file, size = 100 * 1024 * 1024) =>
    new Promise((resolve) => {
      // 创建线程
      const worker = new Worker(new URL('./worker.js', import.meta.url), {
        type: 'module', //主要解决web-worker不能引入node_modules中的包的问题
      });
      worker.postMessage({
        file: file,
        size: size,
      });
      worker.onmessage = (e) => {
        const {
          data: { md5, chunkList, sliceNum },
        } = e;
        resolve({ md5, chunkList, sliceNum });
        //关闭worker线程
        worker.terminate();
      };
    });

worker.js

import SparkMD5 from 'spark-md5';

  const getHash = (chunks) => {
    return new Promise((resolve) => {
      const spark = new SparkMD5.ArrayBuffer();
      function read(i) {
        if (i >= chunks.length) {
          resolve(spark.end());
          return;
        }
        const { multipartFile: blob } = chunks[i];
        const reader = new FileReader();
        reader.onload = (e) => {
          const bytes = e.target.result;
          spark.append(bytes);
          _read(i + 1);
        };
        reader.readAsArrayBuffer(blob.slice(0, 100));
      }

      read(0);
    });
  };

  // 文件切片
  const createChunk = (file, size) => {
    let cur = 0;
    let sliceNum = 0;
    const list = [];

    while (cur < file.size) {
      list.push({
        multipartFile: file.slice(cur, cur + size),
        sliceName: `${file.name}-${sliceNum}`,
        startPosition: cur,
        endPosition: cur + size,
        currentIndex: sliceNum,
        percent: 0,
      });
      cur += size;
      sliceNum++;
    }
    const chunkList = list.map((slice) => ({
      ...slice,
      sliceNum,
      size: slice.multipartFile.size,
    }));

    return { chunkList, sliceNum };
  };
  self.onmessage = async (e) => {
    //解构赋值获取大文件
    const {
      data: { file, size },
    } = e;
    const { chunkList, sliceNum } = createChunk(file, size);
    const fileMd5 = await getHash(chunkList);
    self.postMessage({
      md5: fileMd5,
      chunkList,
      sliceNum,
      percent: 1,
    });
    self.close();
  };

模拟线程池方法

 const uploadChunks = (chunks, uploadedChunks, concurrency = 3, onUploadProgress, totalSize, uploadSize = 0) => {
    const results = []
    let shouldStop = false// 控制是否停止上传的标志变量
    const newCancelTokens = []
    onUploadProgress({ percent: 1 })
    // 并发上传切片
    const uploadChunk = async chunkIndex => {

      if (chunkIndex >= chunks.length || shouldStop) {
        return results

      }

      const currentChunk = chunks[chunkIndex]
      if (!uploadedChunks.includes(currentChunk.currentIndex)) {
        // 保存每个接口的取消事件
        const cancelToken = axios.CancelToken.source()
        newCancelTokens.push(cancelToken)

        await uploadFn(currentChunk, cancelToken)

        // 修改进度条
        uploadSize += currentChunk.size
        const percent = uploadSize * 100 / totalSize
        onUploadProgress({ percent })
        results.push(切片`${currentChunk}上传成功`)

        return uploadChunk(chunkIndex + concurrency)
      }

      // 启动并发上传
      const promises = []
      for (let i = 0; i < concurrency; i++) {
        promises.push(uploadChunk(i))
      }

      //返回一个函数,用于停止上传
      const stopUpload = () => {
        shouldStop = true
        newCancelTokens.forEach(source => {
          source.cancel()
        })
      }

      //返回Promise.all以等待所有切片上传完成
      return { stopUpload, result: Promise.all(promises) }
    }