背景
TAP项目上传镜像有的镜像有几百G大小,直接上传会导致很多问题,如:浏览器内存不够、网络问题导致中断、nginx连接超时、后端文件缓存导致内存不够等,为此我们需要用到分片上传技术
实现
思路:先对文件切片进行hash计算,利用hash和文件名校验文件是否存在,不存在的模拟线程池并发上传分片
主要代码逻辑
const uploadBigFile = async (option) => {
const { file } = option;
const fileData = state.fileList.find((item) => item.uid === file.uid);
//生成文件切片和MDS
const { md5: md5Value, chunkList } = await getWorkMd5(file, sliceSize);
const { id, merge, fileSliceDOS, path } = await initFile({
md5Value,
nameCn: file.name,
});
if (merge === 0) {
const data = state.fileList.find((item) => item.uid == option.file.uid);
data.fileId = id;
data.path = path;
data.totalMd5 = md5Value;
return true;
}
// 上传分片 挂载数据
fileData.fileId = id;
fileData.path = path;
chunkList.forEach((chunk) => {
chunk.fileId = id;
chunk.totalMd5 = md5Value;
chunk.path = path;
});
const uploadedChunks = !fileSliceDOS
? []
: fileSliceDOS.map((slice) => slice.currentIndex);
const totalSize = fileData.size;
const { stopUpload, result } = uploadChunks(
chunkList,
uploadedChunks,
3,
option.onProgress,
totalSize
);
fileData.stopUpload = stopUpload;
//监听错误信息停止上传
result.catch((error) => {
console.log('error', error);
stopUpload();
});
return result;
};worker线程计算md5
调用方法
const getWorkMd5 = (file, size = 100 * 1024 * 1024) =>
new Promise((resolve) => {
// 创建线程
const worker = new Worker(new URL('./worker.js', import.meta.url), {
type: 'module', //主要解决web-worker不能引入node_modules中的包的问题
});
worker.postMessage({
file: file,
size: size,
});
worker.onmessage = (e) => {
const {
data: { md5, chunkList, sliceNum },
} = e;
resolve({ md5, chunkList, sliceNum });
//关闭worker线程
worker.terminate();
};
});worker.js
import SparkMD5 from 'spark-md5';
const getHash = (chunks) => {
return new Promise((resolve) => {
const spark = new SparkMD5.ArrayBuffer();
function read(i) {
if (i >= chunks.length) {
resolve(spark.end());
return;
}
const { multipartFile: blob } = chunks[i];
const reader = new FileReader();
reader.onload = (e) => {
const bytes = e.target.result;
spark.append(bytes);
_read(i + 1);
};
reader.readAsArrayBuffer(blob.slice(0, 100));
}
read(0);
});
};
// 文件切片
const createChunk = (file, size) => {
let cur = 0;
let sliceNum = 0;
const list = [];
while (cur < file.size) {
list.push({
multipartFile: file.slice(cur, cur + size),
sliceName: `${file.name}-${sliceNum}`,
startPosition: cur,
endPosition: cur + size,
currentIndex: sliceNum,
percent: 0,
});
cur += size;
sliceNum++;
}
const chunkList = list.map((slice) => ({
...slice,
sliceNum,
size: slice.multipartFile.size,
}));
return { chunkList, sliceNum };
};
self.onmessage = async (e) => {
//解构赋值获取大文件
const {
data: { file, size },
} = e;
const { chunkList, sliceNum } = createChunk(file, size);
const fileMd5 = await getHash(chunkList);
self.postMessage({
md5: fileMd5,
chunkList,
sliceNum,
percent: 1,
});
self.close();
};模拟线程池方法
const uploadChunks = (chunks, uploadedChunks, concurrency = 3, onUploadProgress, totalSize, uploadSize = 0) => {
const results = []
let shouldStop = false// 控制是否停止上传的标志变量
const newCancelTokens = []
onUploadProgress({ percent: 1 })
// 并发上传切片
const uploadChunk = async chunkIndex => {
if (chunkIndex >= chunks.length || shouldStop) {
return results
}
const currentChunk = chunks[chunkIndex]
if (!uploadedChunks.includes(currentChunk.currentIndex)) {
// 保存每个接口的取消事件
const cancelToken = axios.CancelToken.source()
newCancelTokens.push(cancelToken)
await uploadFn(currentChunk, cancelToken)
// 修改进度条
uploadSize += currentChunk.size
const percent = uploadSize * 100 / totalSize
onUploadProgress({ percent })
results.push(切片`${currentChunk}上传成功`)
return uploadChunk(chunkIndex + concurrency)
}
// 启动并发上传
const promises = []
for (let i = 0; i < concurrency; i++) {
promises.push(uploadChunk(i))
}
//返回一个函数,用于停止上传
const stopUpload = () => {
shouldStop = true
newCancelTokens.forEach(source => {
source.cancel()
})
}
//返回Promise.all以等待所有切片上传完成
return { stopUpload, result: Promise.all(promises) }
}