前端大文件上传 - 总结(Vue3 + hook + Web Worker实现,通过多个Worker线程大大提高Hash计算的速度)

流程图

根据流程图,下面详细说一下具体的实现

选择文件

基于 el-upload 实现;用户选择文件之后,监听 el-upload 的Change事件,将 File 对象保存;原生的实现也是同理只要保存 File 对象就行。
当用户点击提交之后才进行分片操作,不然等待分片完成才提交需要一定时间(770MB的文件耗时是16s)

文件分片&计算文件hash值

为了判断文件是否上传过,必须使用根据文件内容生成的hash值判断。而这个过程可以在分片的时候计算。通过 SparkMD5 进行增量更新,这里的增量更新是指:在计算第一个分块的哈希值后,对接下来的分块只计算新增内容的哈希值,而不是对整个分块重新计算。
其他方案:之前前后两个完整的分块,其他分块分别只取前中后几个字节计算。这种方案可以大大减少时间,但是无法确保hash值的唯一性。
这里我们使用第一种方案,同时文件切片的执行交由 Web Worker 处理。

代码实现:

// 大文件切片上传,worker.js
import SparkMD5 from 'spark-md5';
const DefaultChunkSize = 1024 * 1024 * 50; // 50MB
// const DefaultChunkSize = 1024 * 1024 * 1; // 1MB

self.onmessage = (e) => {
	const { file, chunkSize = DefaultChunkSize } = e.data;

	let blobSlice = File.prototype.slice || File.prototype.mozSlice || File.prototype.webkitSlice,
		chunks = Math.ceil(file.size / chunkSize),
		currentChunk = 0,
		spark = new SparkMD5.ArrayBuffer(),
		fileChunkHashList = [],
		fileChunkList = [],
		fileReader = new FileReader();

	loadNext();

	function loadNext() {
		let start = currentChunk * chunkSize,
			end = ((start + chunkSize) >= file.size) ? file.size : start + chunkSize;

		let chunk = blobSlice.call(file, start, end);
		fileChunkList.push(chunk);
		fileReader.readAsArrayBuffer(chunk);
	}

	function getChunkHash(e) {
		const chunkSpark = new SparkMD5.ArrayBuffer();
		chunkSpark.append(e.target.result);
		fileChunkHashList.push(chunkSpark.end());
	}

	// 处理每一块的分片
	fileReader.onload = function (e) {
		spark.append(e.target.result); // Append array buffer
		currentChunk++;

		getChunkHash(e)

		if (currentChunk < chunks) {
			loadNext();
		} else {
			// 计算完成后,返回结果
			self.postMessage({
				fileMd5: spark.end(),
				fileChunkList,
				fileChunkHashList,
			});
			fileReader.abort();
			fileReader = null;
		}
	}

	// 读取失败
	fileReader.onerror = function () {
		self.postMessage({
			error: 'oops, something went wrong.'
		});
	}
};

当用户点击提交时执行 handleCutFile 函数,该函数会创建一个 worker ,并执行对应的文件切片脚本,当文件切片完成会返回对应的 fileMd5fileChunkListfileChunkHashList。然后执行 handleCutSu***ess 函数,就是对应流程图后面的操作

async function handleCutFile() {
	// Vite中使用worker.js脚本
	const worker = new Worker(new URL('@/workers/cutFile.js', import.meta.url), {
		type: 'module',
	})
	
	worker.postMessage({ file: file.value.raw })
	worker.onmessage = (e) => {
		handleCutSu***ess(e.data)
		worker.terminate()
	}
}
文件上传

检查文件是否上传过

async function checkFile() {
	// 这个接口要配置防响应拦截
	const params = {
		filename: fileName.value,
		file_hash: fileMd5.value,
		total_chunks: chunkTotal.value,
	}
	const data = await checkFileFn(params)
	// 已经上传过返回已上传的分块IDs:chunk_upload[]
	if (data.code === 0) {
		return data.data
	}

	if (data.code === 1) {
		// 空间不足
		modal.msgError(t('sample.notEnoughSpace'))
		return false
	}

	modal.msgError(data.msg)
	return false
}

根据返回的分片ID(索引 + 1)列表处理对应逻辑

async function uploadFile() {
	const data = await checkFile()
	if (!data) return

	const { chunk_upload, upload_id } = data
	uploadId.value = upload_id

	if (chunk_upload.length === 0) {
		// 上传整个文件
	}
	
	if (chunk_upload.length !== chunkTotal.value) {
		// 上传未上传的分片 - 断点续传
	}
	// 上传完成 - 秒传(可能需要发起合并请求)
}

处理所有分片的请求 & 进度的获取

// 通过请求池的方式上传文件
async function handleUploadRequest(uploadedChunks = []) {
	const requestList = []
	for (let i = 0; i < fileChunkList.value.length; i++) {
		if (uploadedChunks.indexOf(i + 1) === -1) {
			requestList.push(uploadChunk(fileChunkList.value[i], i, fileMd5.value))
		}
	}

	// 这里控制同时请求的接口数量
	await processRequests(requestList)
}

// 处理分片文件的上传
function uploadChunk(chunk, index, fileMd5) {
	const params = {
		chunk_id: index + 1,
		file_hash: fileMd5,
		upload_id: uploadId.value,
		chunk_hash: fileChunkHashList.value[index],
	}
	const formData = new FormData()
	formData.append('file_chunk', chunk)
	return {
		url: uploadUrl,
		method: 'post',
		timeout: 5 * 60 * 1000,
		data: formData,
		params,
		skipInterceptor: true,
		headers: {
			// 以前的人写了避免重复提交
			repeatSubmit: false,
		},
		onUploadProgress: (progressEvent) => {
			chunkUploadedSize[index] = progressEvent.loaded

			// 计算uploadedSize的总和
			const size = Object.values(chunkUploadedSize).reduce((total, item) => {
				return total + item
			}, 0)

			// 计算总的上传进度
			uploadProgress = ((size + uploadedSize) / fileSize.value) * 100

			store.dispatch('upload/setProgress', uploadProgress)
		}
	}
}

通过 async 控制并发请求的数量,npm i -D async;当全部发送完成时发起合并请求。

async function processRequests(requestList) {
	// 同时发起三个请求
	mapLimit(requestList, 3, async (req) => {
		await request(req)
	}, async (err) => {
		if (err) {
			console.log('err: ', err)
			return false
		}
		// 全部发送成功应该发起合并请求
		await mergeFileFn({ upload_id: uploadId.value })
		return true
	})
}
完整代码

这里通过 hook 封装了这一块代码。代码中并没有处理文件上传完成,但没有发起合并请求的情况,因为后端没返回这种情况,所以这里没写;建议与后端进行沟通要求考虑全部情况。

import request from '@/utils/request'
import modal from '@/plugins/modal'
import { mapLimit } from 'async'
import { useI18n } from "vue-i18n"

const DefaultChunkSize = 1024 * 1024 * 50; // 50MB

const useCutFile = ({ uploadUrl, checkFileFn, mergeFileFn, callback }) => {
	const store = useStore();
	const { t } = useI18n()

	const file = ref(null)
	const fileName = ***puted(() => file.value?.name || '')
	const fileSize = ***puted(() => file.value?.size || 0)

	const fileMd5 = ref('')
	const fileChunkList = ref([])
	const fileChunkHashList = ref([])
	const chunkTotal = ref(0)
	// 上传id
	const uploadId = ref('')
	// 上传进度
	let uploadProgress = 0
	// 每一个分片已上传的大小
	let chunkUploadedSize = {}
	// 断点续传时已上传的大小
	let uploadedSize = 0

	// 监听上传文件弹框的文件改变
	async function handleUploadChange(fileObj) {
		file.value = fileObj
	}

	// 开始处理文件分块
	async function handleCutFile() {
		const worker = new Worker(new URL('@/workers/cutFile.js', import.meta.url), {
			type: 'module',
		})

		// 文件切块的过程不可点击
		worker.postMessage({ file: file.value.raw })
		worker.onmessage = (e) => {
			handleCutSu***ess(e.data)
			worker.terminate()
		}
	}

	// 切片文件成功
	async function handleCutSu***ess(data) {
		fileMd5.value = data.fileMd5
		fileChunkList.value = data.fileChunkList
		fileChunkHashList.value = data.fileChunkHashList
		chunkTotal.value = fileChunkList.value.length

		uploadFile()
	}

	// 上传文件
	async function uploadFile() {
		const data = await checkFile()
		if (!data) return

		const { chunk_upload, upload_id } = data
		uploadId.value = upload_id

		if (chunk_upload.length === 0) {
			// 上传整个文件
			return await handleUploadRequest()
		}
		// 上传未上传的分片,过滤已上传的分片 - 断点续传
		if (chunk_upload.length !== chunkTotal.value) {
			uploadedSize = chunk_upload.length * DefaultChunkSize

			return await handleUploadRequest(chunk_upload)
		}
		// 上传完成 - 秒传
		store.dispatch('upload/setSampleUploading', false)
		modal.msgSu***ess(t('upload.uploadedTip'))
		resetData()
		return true
	}

	// 检查文件是否已经上传过
	async function checkFile() {
		// 这个接口要配置防响应拦截
		const params = {
			filename: fileName.value,
			file_hash: fileMd5.value,
			total_chunks: chunkTotal.value,
		}
		const { code, msg, data } = await checkFileFn(params)
		// 已经上传过返回对应hash值
		if (code === 0) {
			return data
		}

		modal.msgError(msg)
		store.dispatch('upload/setSampleUploading', false)
		return false
	}

	// 处理分片文件的上传
	function uploadChunk(chunk, index, fileMd5) {
		const params = {
			chunk_id: index + 1,
			file_hash: fileMd5,
			upload_id: uploadId.value,
			chunk_hash: fileChunkHashList.value[index],
		}

		const formData = new FormData()
		formData.append('file_chunk', chunk)

		return {
			url: uploadUrl,
			method: 'post',
			timeout: 5 * 60 * 1000,
			data: formData,
			params,
			skipInterceptor: true,
			headers: {
				// 以前的人写了避免重复提交
				repeatSubmit: false,
			},
			onUploadProgress: (progressEvent) => {
				chunkUploadedSize[index] = progressEvent.loaded

				// 计算uploadedSize的总和
				const size = Object.values(chunkUploadedSize).reduce((total, item) => {
					return total + item
				}, 0)

				// 计算总的上传进度
				uploadProgress = ((size + uploadedSize) / fileSize.value) * 100

				store.dispatch('upload/setProgress', uploadProgress)
			}
		}
	}

	// 通过请求池的方式上传文件
	async function handleUploadRequest(uploadedChunks = []) {
		const requestList = []
		for (let i = 0; i < fileChunkList.value.length; i++) {
			if (uploadedChunks.indexOf(i + 1) === -1) {
				requestList.push(uploadChunk(fileChunkList.value[i], i, fileMd5.value))
			}
		}

		// 方法一:使用请求池的方式发送请求
		await processRequests(requestList)

		// 方法二:使用Promise.all一次性发送全部请求,uploadChunk需要返回request({})
		// await Promise.all(requestList)
		// // 上传完成后,合并文件
		// await mergeFileFn({ upload_id: uploadId.value })
		// return true
	}

	async function processRequests(requestList) {
		mapLimit(requestList, 3, async (reqItem) => {
			await request(reqItem)
		}, async (err) => {
			if (err) {
				console.log('err: ', err)
				modal.msgError(t('upload.error'))
				store.dispatch('upload/setSampleUploading', false)
				resetData()
				return false
			}
			await mergeFileFn({ upload_id: uploadId.value })
			modal.msgSu***ess(t('upload.su***ess'))
			callback && callback()
			store.dispatch('upload/setSampleUploading', false)
			resetData()
			return true
		})
	}

	// 上传成功,还原数据
	function resetData() {
		fileMd5.value = ''
		fileChunkList.value = []
		fileChunkHashList.value = []
		chunkTotal.value = 0
		uploadId.value = ''
		uploadProgress = 0
		chunkUploadedSize = {}
		uploadedSize = 0
	}

	return {
		file,
		handleUploadChange,
		handleCutFile,
		handleCutSu***ess,
		uploadFile,
		resetData,
	}
}

export default useCutFile

额外–上传组件封装
<template>
	<el-dialog
		v-model="visible"
		:title="title"
		:width="width"
		append-to-body
		class="***mon-center-dialog"
		@close="emit('update:visible', false)"
	>
		<el-upload
			ref="uploadRef"
			:headers="getHeaders"
			:limit="1"
			:a***ept="a***ept"
			:action="actionUrl"
			:show-file-list="showFileList"
			:before-upload="handleBeforeUpload"
			:on-change="handleChange"
			:on-su***ess="handleSu***ess"
			:on-error="handleError"
			:on-exceed="handleExceed"
			:on-remove="handleRemove"
			:auto-upload="autoUpload"
			:disabled="loading"
			drag
		>
			<el-icon class="el-icon--upload">
				<upload-filled />
			</el-icon>
			<div class="el-upload__text">
				{{ t('upload.drag') }}
				<em>{{ t('upload.upload') }}</em>
			</div>
			<template #tip>
				<div class="el-upload__tip text-center">
					<span>{{ tipText || t('upload.onlyCsv') }}</span>
					<span v-if="templateUrl || customDownload">
						(<el-link
							type="primary"
							:underline="false"
							style="font-size: 12px; vertical-align: baseline"
							@click="handleDownload"
							>{{ t('upload.downloadTemplate2') }}
						</el-link>)
					</span>
				</div>
			</template>
		</el-upload>

		<div class="content">
			<slot />
		</div>

		<template #footer>
			<div class="dialog-footer">
				<el-button @click="emit('update:visible', false)">
					{{ t('pub.cancel') }}
				</el-button>
				<el-button type="primary" :disabled="disabled" @click="handleConfirm">
					{{ t('pub.sure') }}
				</el-button>
			</div>
		</template>
	</el-dialog>
</template>

<script setup>
import modal from '@/plugins/modal'
import { genFileId } from 'element-plus'
import { download } from '@/utils/request'
import { useI18n } from 'vue-i18n'
const { t } = useI18n()

const props = defineProps({
	// 弹框参数
	visible: {
		type: Boolean,
		default: false,
	},
	title: {
		type: String,
		default: 'Upload File',
	},
	width: {
		type: String,
		default: '450px',
	},
	// 上传参数
	hasAuthorization: {
		type: Boolean,
		default: true,
	},
	// * 任意文件
	a***ept: {
		type: String,
		default: '.csv',
	},
	action: {
		type: String,
		default: '',
	},
	showFileList: {
		type: Boolean,
		default: true,
	},
	autoUpload: {
		type: Boolean,
		default: false,
	},
	// 500MB
	size: {
		type: Number,
		default: 500,
	},
	tipText: {
		type: String,
		default: '',
	},
	templateUrl: {
		type: String,
		default: '',
	},
	templateName: {
		type: String,
		default: 'template',
	},
	customDownload: {
		type: Boolean,
		default: false,
	},
	downloadMethod: {
		type: String,
		default: 'post',
	},
	autoSubmit: {
		type: Boolean,
		default: true,
	},
})
const emit = defineEmits(['change', 'remove', 'su***ess', 'error', 'submit', 'cut-su***ess'])

const store = useStore()
const getHeaders = ***puted(() => {
	if (props.hasAuthorization) {
		return { Authorization: 'Bearer ' + store.getters.token }
	}
	return {}
})

const actionUrl = ***puted(() => {
	return props.action ? `${ import.meta.env.VITE_APP_BASE_API }${props.action}` : ''
})

const loading = ref(false)
const uploadRef = ref()
const isAbort = ref(false)
const disabled = ref(true)

const fileObj = ref(null)

const handleBeforeUpload = (file) => {
	if(isAbort.value) {
		abort(file)
		return
	}
	loading.value = true
}

const handleChange = (file) => {
	const isLt = file.size / 1024 / 1024 < props.size
	const allowedExtensions = (props.a***ept && props.a***ept !== '*') ? props.a***ept.split(',').map(item => item.substring(1)) : []
	// 以第一个.后面的所有作为文件后缀
	const tmp = file.name.split('.')
	tmp.shift()
	const fileExtension = tmp.join('.').toLowerCase()

	if (!isLt) {
		modal.msgError(`${t('upload.sizeLimit')}${props.size}MB!`)
		isAbort.value = true
		return false
	}

	if (allowedExtensions.length && !allowedExtensions.includes(fileExtension)) {
		modal.msgError(`${t('upload.fileType')} ${allowedExtensions.join(', ')}`)
		isAbort.value = true
		return false
	}

	disabled.value = false
	fileObj.value = file
	emit('change', file)

	return true
}

const handleRemove = () => {
	disabled.value = true
	emit('remove', null, null)
}

const handleSu***ess = (res, file) => {
	if (res.code && res.code !== 0) {
		emit('change', null, null)
		uploadRef.value.clearFiles()
		modal.msgError(res.msg, 10000)
		loading.value = false
		return
	}
	modal.msgSu***ess(t('upload.su***ess'))
	// 不知道为什么,这里触发的就算多级嵌套也能在父级接收到
	emit('su***ess', res, file)
	emit('update:visible', false)
	loading.value = false
}

const handleError = (err) => {
	modal.msgError(t('upload.error'))
	emit('error', err)
	loading.value = false
}

const handleExceed = (files) => {
  uploadRef.value.clearFiles()
  const file = files[0]
  file.uid = genFileId()
  uploadRef.value.handleStart(file)
}

const handleDownload = () => {
	if (props.customDownload) {
		emit('download')
		return
	}

	if (props.templateName.includes('.')) {
		download(props.templateUrl, {}, props.templateName)
		return
	}
	download(props.templateUrl, {}, `${props.templateName}${props.a***ept}`, props.downloadMethod)
}

const handleReset = () => {
	uploadRef.value.clearFiles()
}

const handleConfirm = () => {
	if (props.autoSubmit) {
		uploadRef.value.submit()
	} else {
		emit('submit', uploadRef.value, fileObj.value)
	}
}

defineExpose({
	handleReset,
})
</script>

<style scoped lang="scss">
.content {
	margin-top: 18px;
}
</style>

文件Hash计算优化

思路:就是通过多个Web Worker处理一个文件,将这个文件分成对应数量的Chunk给不同的Web Worker处理;这里只做粗略的计算,就是按照200M分一个Web Worker计算,最多5个Web Worker。因为调研发现Web Worker不是越多越好,至于多少个最合适我也不知道;因为目前公司业务需求上传的文件最多只接受5G,平均算下来1G一个Web Worker计算Hash的时间也只是20s;现在1G以内的文件都是5s算完。

问题: 实际上这样计算跟单个Web Worker计算的Hash值是不同的

代码实现,下面是改变的部分
	// Web Worker的数量
	let workerNum = 1
	// 存储多个Web Worker
	const workers = []

	// 开始处理文件分块
	async function handleCutFile() {
		// 缓存worker返回的数据
		const workerArr = new Array(workerNum).fill(null)
		let count = 0

		workerNum = Math.min(Math.ceil(file.value.size / DefaultWorkSize), MaxWorker)

		for (let i = 0; i < workerNum; i++) {
			const worker = new Worker(new URL('@/workers/cutFile.js', import.meta.url), {
				type: 'module',
			})
			workers.push(worker)

			// 监听从 Worker 接收到的消息
			worker.onmessage = function(e) {
				workerArr[e.data.index] = e.data
				count++
				// 处理从 Worker 接收到的结果
				worker.terminate()

				if (count === workerNum) {
					handleCutSu***ess(workerArr)
				}
			};
		}

		// 将文件数据分配给每个 Web Worker 进行计算
		const chunkSize = Math.ceil(file.value.size / workerNum)

		for (let i = 0; i < workerNum; i++) {
			const start = i * chunkSize
			const end = start + chunkSize
			const chunk = file.value.raw.slice(start, end)

			// 向 Worker 发送消息,并分配文件数据块
			workers[i].postMessage({ file: chunk, index: i })
		}
	}

	// 合并文件hash值
	function calculateFileHash() {
		const spark = new SparkMD5()

		for (let i = 0; i < fileChunkList.value.length; i++) {
			spark.append(fileChunkList.value[i])
		}

		return spark.end()
	}

	// 切片文件成功
	async function handleCutSu***ess(data) {
		data.forEach((item) => {
			fileHashes.value.push(item.fileMd5)
			fileChunkList.value.push(...item.fileChunkList)
			fileChunkHashList.value.push(...item.fileChunkHashList)
		})
		fileMd5.value = data.length === 1 ? data[0].fileMd5 : calculateFileHash()
		chunkTotal.value = fileChunkList.value.length

		uploadFile()
	}
展望

后面更改上传功能为选择多个文件,但是同时只能上传一个(带宽有限,参考了一下其他的线上上传也是同时只能有一个在上传);但是多个文件又涉及多个Web Worker计算问题所以需要解决的问题挺多,到时候再另外起一篇文章总结。

转载请说明出处内容投诉
CSS教程_站长资源网 » 前端大文件上传 - 总结(Vue3 + hook + Web Worker实现,通过多个Worker线程大大提高Hash计算的速度)

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买