大文件上传功能
实现功能
- [x] 大文件切片
- [x] 断点续传
- [x] 秒传
- [x] 暂停上传
- [x] 恢复上传
- [x] 切片进度条
- [x] 文件进度条
整体思路
前端
- 核心是利用
Blob.prototype.slice 方法,和数组的 slice 方法相似,调用的 slice 方法可以返回原文件的某个切片
- 根据预先设置好的切片最大数量将文件切分为一个个切片,然后借助 http 的可并发性,同时上传多个切片,这样从原本传一个大文件,变成了
同时传多个小的文件切片,可以大大减少上传时间
- 由于是并发,传输到服务端的顺序可能会发生变化,所以我们还需要给每个
切片记录顺序
服务端
- 服务端需要负责接受这些切片,并在接收到所有切片后
合并切片
何时合并切片 前端在每个切片中都携带切片最大数量的信息,当服务端接受到这个数量的切片时自动合并,也可以额外发一个请求主动通知服务端进行切片的合并
如何合并切片 使用 nodejs 的 读写流(readStream/writeStream),将所有切片的流传输到最终文件的流里
前端部分
上传基本步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| async function handleUpload() { if (!currentFile) { return message.error('你尚未选择文件'); } if (!allowUpload(currentFile)) { return message.error('上传成功'); } setUploadStatus(UploadStatus.UPLOADING)
let partList: Part[] = createChunks(currentFile);
let fileHash = await calculateHash(partList);
let lastDotIndex = currentFile.name.lastIndexOf('.'); let extName = currentFile.name.slice(lastDotIndex); let filename = `${fileHash}${extName}`;
setFilename(filename);
partList.forEach((item: Part, index) => { item.filename = filename; item.chunk_name = `${filename}-${index}`; item.loaded = 0; item.percent = 0; })
setPartList(partList);
await uploadParts(partList, filename); }
|
文件切片
文件切片并且返回切片列表,列表中每一项增加size属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| interface Part { chunk: Blob size: number filename?: string chunk_name?: string loaded?: number percent?: number xhr?: XMLHttpRequest } const DEFAULT_SIZE = 1024 * 1024 * 100;
function createChunks(file:File): Part[] { const partList = Part[] = []; let current = 0; while(current >= file.size){ let chunk = file.slice(current,current * DEFAULT_SIZE); partList.push({chunk, size:chunk.size}) current += DEFAULT_SIZE } return partList; }
|
计算文件hash值
秒传的功能 (通过webworker子进程来计算哈希)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| function calculateHash(partList:Part[]){ return new Promise(resolve => { let worker = new Worker('./hash'); worker.postMessage({ partList }) worker.onmessage = function (event) { let { percent, hash } = event.data; setHashPercent(percent);
if (hash) { resolve(hash); } } }) }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
self.importScripts('https://cdn.bootcss.com/spark-md5/3.0.0/spark-md5.min.js');
self.onmessage = async (event) => { let partList = event.data; const spark = new self.SparkMD5.ArrayBuffer(); let percent = 0 ; let perSize = 100 / partList.length; let buffers = await Promise.all(partList.map(({ chunk, size }) => new Promise((resolve) => { const reader = new FileReader(); reader.readAsArrayBuffer(chunk); reader.onload = function (event) { percent += perSize; self.postMessage({ percent: Number(percent.toFixed(2)) }); resolve(event.target.result); } }))); buffers.forEach(buffer => spark.append(buffer)); self.postMessage({ percent: 100, hash: spark.end() }); self.close(); }
|
上传切片
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| async function uploadParts(partList: Part[], filename: string) { let { needUpload, uploadList } = await verify(filename); if (!needUpload) { return message.success('秒传成功') } try { let requests = createRequests(partList, uploadList, filename);
await Promise.all(requests);
await request({ url: `/merge/${filename}`}); message.success('上传成功') reset(); } catch (e) { message.error('上传失败或暂停') } }
|
验证是否上传过(可实现秒传)
- 参数说明
needUpload 判断是否上传过,可以实现秒传(其实是后台有文件,根本没传)
uploadList 后端对应filename的所有切片列表(有可能有些切片不完整或者没有,用于断点续传)
1 2 3
| async function verify(filename: string) { return await request({ url: `/verify/${filename}` }); }
|
通过切片列表生成上传的请求数组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| function createRequests(partList: Part[], uploadList: Uploaded[], filename: string) { return partList .filter((part: Part) => { let uploadFile = uploadList.find(item => item.filename === part.chunk_name) if (!uploadFile) { part.loaded = 0; part.percent = 0; return true; } if (uploadFile.size < part.chunk.size) { part.loaded = uploadFile.size; part.percent = Number((uploadFile.size / part.size * 100).toFixed(2)); return true; } return false; }) .map((part: Part) => request({ url: `/upload/${filename}/${part.chunk_name}/${part.loaded}`, method: 'POST', headers: { 'Content-Type': 'application/octet-stream' }, setXHR: (xhr: XMLHttpRequest) => part.xhr = xhr, onProgress: (event: ProgressEvent) => { part.percent = Number(((part.loaded! + event.loaded) / part.chunk.size * 100).toFixed(2)) setPartList([...partList]); }, data: part.chunk.slice(part.loaded) })) }
|
后端
后端接收切片
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import { TEMP_DIR, mergeChunks, PUBLIC_DIR } from './utils';
app.post('/upload/:filename/:chunk_name/:start', async function (req: Request, res: Response, _next: NextFunction) { let { filename, chunk_name } = req.params; let start :number = Number(req.params.start); let chunk_dir = path.resolve(TEMP_DIR, filename); let exist = await fs.pathExists(chunk_dir); if (!exist) { await fs.mkdirs(chunk_dir); } let chunkFilePath = path.resolve(chunk_dir, chunk_name); let ws = fs.createWriteStream(chunkFilePath, { start, flags: 'a' }); req.on('end', () => { ws.close(); res.json({ success: true }) }) req.on('error', ()=>{ ws.close(); }) req.on('close', ()=>{ ws.close(); }) req.pipe(ws);
})
|
合并切片
1 2 3 4 5 6 7 8
| import { TEMP_DIR, mergeChunks, PUBLIC_DIR } from './utils';
app.get('/merge/:filename', async function (req: Request, res: Response, _next: NextFunction) { let { filename } = req.params; await mergeChunks(filename); res.json({ success: true }) })
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| const pipeStream = (filePath: string, ws: WriteStream) => { return new Promise((resolve: Function) => { let rs = fs.createReadStream(filePath); rs.on('end',async () => { await fs.unlink(filePath); resolve() }) rs.pipe(ws); }) }
export const mergeChunks = async (filename: string, size: number = DEFAULT_SIZE) => { const filePath = path.resolve(PUBLIC_DIR, filename); let chunkDir = path.resolve(TEMP_DIR, filename); let chunks = await fs.readdir(chunkDir); chunks.sort((a, b) => Number(a.split('-')[1]) - Number(b.split('-')[1])); await Promise.all( chunks.map((chunkFile, index) => pipeStream( path.resolve(chunkDir, chunkFile), fs.createWriteStream( filePath, { start: index * size } ) )) ) await fs.rmdir(chunkDir); }
|
验证接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| app.get('/verify/:filename', async (req: Request, res: Response, _next: NextFunction) => { let { filename } = req.params;
let filePath = path.resolve(PUBLIC_DIR, filename); let existFile = await fs.pathExists(filePath); if(existFile){ res.json({ success: true, needUpload: false }) }
const tempDir = path.resolve(TEMP_DIR, filename); let exist = await fs.pathExists(tempDir); let uploadList: any[] = []; if (exist) { uploadList = await fs.readdir(tempDir); uploadList = await Promise.all(uploadList.map(async (filename: string) => { let stat = await fs.stat(path.resolve(tempDir, filename)); return { filename, size: stat.size } })); } res.json({ success:true, needUpload: true, uploadList }) })
|