在说断点续传之前,先说一下文件上传的原理,因为断点续传的应用场景就是文件上传。
文件上传
- 客户端发送上传文件请求
- 服务器收到请求之后解析请求头和请求体,获取上传的数据并进行验证(已经上传的文件不重复上传)
- 服务器端将分片上传的文件存在临时目录中,所有分片上传后再进行合并和重命名
- 最后将文件存储在指定的目录
断点续传
文件传输过程中,如果出现传输中断或者失败的情况,可以从中断的地方继续传输,不用重新开始传输。
断点续传有很多种实现方式,这里推荐b站一个用Java实现多线程断点续传的视频<4.java多线程-实现断点续传_哔哩哔哩_bilibili>
在实际应用时我选择用Redis实现断点续传
大致的思路:前端计算上传文件的MD5值,如果Redis中由对应的MD5值则说明秒传成功
后端通过Redis记录文件分片的情况(Redis中使用Set记录哪些分片已经上传,已经上传过的分片会被跳过),前端开始上传之后,每次上传成功都要记录在Redis中,续传之后前端重新计算MD5值,重新开始上传Redis中记录缺失的分片。当所有分片都上传完成并且合并成文件之后,删除Redis中对应的分片记录。
以上思路只适用于单线程,如果要使用多线程方式实现,需要考虑更多的问题~
代码实现
@Transactional(rollbackFor = Exception.class)
public boolean uploadFile(MultipartFile file, String fileName, String fileMD5,
String fileId, HttpServletRequest request, String redisKey) {
try {
// 参数校验
if (file == null || file.isEmpty()) {
throw new IllegalArgumentException("文件不能为空");
}
// 如果fileId为空,则生成一个随机的fileId
if (StringUtils.isEmpty(fileId)) {
fileId = RandomStringUtils.random(10);
}
// 从请求头中获取分片信息
int chunkNumber = 0;
int totalChunks = 1;
long chunkSize = file.getSize();
long totalSize = file.getSize();
String chunkMD5 = null;
try {
chunkNumber = Integer.parseInt(request.getHeader("Chunk-Number"));
totalChunks = Integer.parseInt(request.getHeader("Total-Chunks"));
chunkSize = Long.parseLong(request.getHeader("Chunk-Size"));
totalSize = Long.parseLong(request.getHeader("Total-Size"));
chunkMD5 = request.getHeader("Chunk-MD5");
} catch (Exception e) {
log.info("未获取到分片信息");
}
// 创建临时目录
String tempDir = System.getProperty("java.io.tmpdir") + "/upload/" + fileMD5 + "/";
File tempFileDir = new File(tempDir);
if (!tempFileDir.exists()) {
tempFileDir.mkdirs();
}
// 秒传检查
File finalFile = new File("/final/path/" + fileId + "_" + fileName);
if (finalFile.exists() && finalFile.length() == totalSize) {
log.info("文件已存在,秒传成功");
return true;
}
// 处理分片上传
File chunkFile = new File(tempDir + "chunk_" + chunkNumber);
// 断点续传检查 - 结合Redis
Boolean chunkExistsInRedis = redisTemplate.opsForSet().isMember(
String.format(CHUNK_UPLOAD_KEY, fileMD5), String.valueOf(chunkNumber));
if (chunkFile.exists() && chunkFile.length() == chunkSize &&
Boolean.TRUE.equals(chunkExistsInRedis)) {
log.info("分片 {} 已存在,跳过上传", chunkNumber);
} else {
// 保存分片文件
File tempChunkFile = new File(tempDir + "chunk_" + chunkNumber + ".tmp");
file.transferTo(tempChunkFile);
// 分片MD5校验
if (chunkMD5 != null && !chunkMD5.isEmpty()) {
try (FileInputStream fis = new FileInputStream(tempChunkFile)) {
MessageDigest digest = MessageDigest.getInstance("MD5");
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
digest.update(buffer, 0, bytesRead);
}
byte[] md5Bytes = digest.digest();
StringBuilder sb = new StringBuilder();
for (byte b : md5Bytes) {
sb.append(String.format("%02x", b));
}
String actualChunkMD5 = sb.toString();
if (!chunkMD5.equals(actualChunkMD5)) {
tempChunkFile.delete();
throw new RuntimeException("分片MD5校验失败");
}
}
}
// 重命名临时文件
if (!tempChunkFile.renameTo(chunkFile)) {
// 重命名失败时复制文件
try (FileInputStream fis = new FileInputStream(tempChunkFile);
FileOutputStream fos = new FileOutputStream(chunkFile)) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
fos.write(buffer, 0, bytesRead);
}
}
tempChunkFile.delete();
}
// 将已上传的分片信息记录到Redis
redisTemplate.opsForSet().add(
String.format(CHUNK_UPLOAD_KEY, fileMD5), String.valueOf(chunkNumber));
// 设置过期时间(例如24小时)
redisTemplate.expire(
String.format(CHUNK_UPLOAD_KEY, fileMD5), 24, TimeUnit.HOURS);
}
// 检查是否所有分片都已上传完成 - 使用Redis记录的信息
long uploadedChunksCount = redisTemplate.opsForSet().size(
String.format(CHUNK_UPLOAD_KEY, fileMD5));
boolean allChunksUploaded = uploadedChunksCount == totalChunks;
// 如果所有分片都已上传,合并文件
if (allChunksUploaded) {
// 创建合并后的文件
File mergedFile = new File(tempDir + "merged_" + fileMD5 + "_" + fileName);
try (FileOutputStream fos = new FileOutputStream(mergedFile, true)) {
for (int i = 0; i < totalChunks; i++) {
File currentChunk = new File(tempDir + "chunk_" + i);
try (FileInputStream fis = new FileInputStream(currentChunk)) {
byte[] buffer = new byte[1024 * 8];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
fos.write(buffer, 0, bytesRead);
}
}
currentChunk.delete();
}
}
// 验证合并文件的MD5
try (FileInputStream fis = new FileInputStream(mergedFile)) {
MessageDigest digest = MessageDigest.getInstance("MD5");
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
digest.update(buffer, 0, bytesRead);
}
byte[] md5Bytes = digest.digest();
StringBuilder sb = new StringBuilder();
for (byte b : md5Bytes) {
sb.append(String.format("%02x", b));
}
String actualFileMD5 = sb.toString();
if (!fileMD5.equals(actualFileMD5)) {
throw new RuntimeException("文件MD5校验失败");
}
}
// 验证文件大小
if (mergedFile.length() != totalSize) {
throw new RuntimeException("文件大小不匹配");
}
// 保存到最终位置
File finalDir = new File("/final/path/");
if (!finalDir.exists()) {
finalDir.mkdirs();
}
if (finalFile.exists()) {
finalFile.delete();
}
Files.move(mergedFile.toPath(), finalFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
// 清理Redis中的分片记录
redisTemplate.delete(String.format(CHUNK_UPLOAD_KEY, fileMD5));
// 清理临时目录
File[] tempFiles = tempFileDir.listFiles();
if (tempFiles != null) {
for (File tempFile : tempFiles) {
tempFile.delete();
}
}
tempFileDir.delete();
// TODO 更新数据库记录
log.info("文件上传完成: {}", fileName);
return true;
}
return true;
} catch (Exception e) {
log.error("文件上传失败", e);
return false;
}
}
1 条评论
码住