上次写了单线程文件上传的思路,为了让这个功能显得更加高大上,多线程方式也得实现一下。本次多线程的实现参考了B站教程java多线程-实现断点续传_哔哩哔哩_bilibili,这个教程解决了我的很多疑问,我的代码也是基于教程中的代码实现的。
初步构想
多线程实现会增加很多困难,只是简单一想就有很多要解决的问题,在开始写代码之前,我自己构想了一下:
- 使用线程池的方式实现多线程,分片使用ffmpeg,每一个线程用于上传一个分片,分片上传成功之后在Redis中记录已经上传的分片编号
- 使用RandomAccessFile对文件进行读取,这个方案似乎比第一个方案更好(ffmpeg处理音视频似乎更好?),同样使用线程池,多个线程分别读取分片
涉及多线程安全的部分,我没有过多的预想,只能在代码实现的时候一步一步修改,目前想到的方法是使用锁和线程安全的集合类来保证线程安全。
核心
整个文件上传逻辑的核心是RandomAccessFile类,RandomAccessFile这个类的特殊之处在于可以从任意位置读取文件。
在读取文件中,RandomAccessFile类比较重要的方法有:
- seek(long pos) 源码如下所示,这个方法用于设置指针偏移量,这个偏移量是“绝对”的,是相对于文件开头的偏移量。
public void seek(long pos) throws IOException {
if (pos < 0) {
throw new IOException("Negative seek offset");
}
long comp = Blocker.begin();
try {
seek0(pos);
} finally {
Blocker.end(comp);
}
}- skipeBytes(int n) 源码如下所示,这个方法用于跳过指定字节数,这个指定的数量是“相对”的,基准是当前读取到的地方。
public int skipBytes(int n) throws IOException {
long pos;
long len;
long newpos;
if (n <= 0) {
return 0;
}
pos = getFilePointer();
len = length();
newpos = pos + n;
if (newpos > len) {
newpos = len;
}
seek(newpos);
/* return the actual number of bytes skipped */
return (int) (newpos - pos);
}接下来用一小段代码展示以上两个方法的区别
假设现在有一个名为hello.txt的文件,里面的内容是一行文本“Hello World”
RandomAccessFile raf = new RandomAccessFile("hello.txt", "r");
// 指针移动到下标 3
raf.seek(3);
System.out.println("当前指针位置: " + raf.getFilePointer());
System.out.println("当前位置字符: " + (char)raf.read());
// 再跳过 2 个字节
raf.seek(3);
raf.skipBytes(2);
System.out.println("skip后指针位置: " + raf.getFilePointer());
System.out.println("当前位置字符: " + (char)raf.read());
raf.close();
得到的结果是:
当前指针位置: 3
当前位置字符: lskip后指针位置: 5
当前位置字符:
一句话总结就是:seek()是绝对定位(无论当前指针指向哪里),skipBytes()是相对定位(相对于当前指针指向的位置)。
使用这个类之后,似乎不用手动进行分片了,分片的过程变成了计算各线程读取的范围。具体的计算也很简单,每个线程需要读取的文件大小直接用文件大小/线程数再取整即可。
断点续传的部分我本来是想利用Redis记录已上传分片的索引,但是转念一想,用Redis又要考虑一下数据一致性的问题了(前面那篇文章里面似乎并没有考虑这个问题,疏忽了)。我想偷个懒😫,简单实现一下就好了!所以我采用B站教程里面使用的方法,用一个日志文件记录每个线程读取的进度,文件上传完毕后删除日志文件。因为RandomAccessFile可以从任意位置读取文件,只需要在线程读取之前先查找日志文件,如果有日志文件,则从日志文件记录的断点开始读取文件,这样就能实现断点续传。
扩展
如果把以上思路扩展到实际应用中,基本思路如下,这一部分我没写代码实现,如有错误请指正
客户端
首先获取待上传文件的大小,并创建本地文件
对文件进行分片,对每一个分片创建一个上传任务
如果某个分片上传失败则记录并重试(这里会设置一个阈值,超过后不再重试)
服务端
接受上传文件的请求并解析
每个线程读取一个分片的数据上传到服务器(这里涉及到发送请求),并在日志文件中记录上传进度
根据每个上传任务分片序号写到文件相应的位置,每个分片有固定的位置所以可以同时写入
实现
想清楚实现思路之后就开始写代码,这里只写了一个用于本地测试的demo,工作量比较小
鉴于如果要写出客户端和服务端两边的逻辑,工作量比较大,我就偷懒用只写了个本地的demo简单测试一下
完整代码地址:RubyYao0520/FileUpload-demo: 多线程文件上传demo
里面有两个类,分别是手动创建线程和线程池实现。其中手动创建线程的代码直接是B站视频教程中提供的代码。
测试
测试思路
这个方法略显粗糙,不过我觉得还挺直观的,建议选择比较大的文件进行测试(200MB左右)
测试的时候发现使用线程池方式耗时会长很多,判断是线程池实现的时候用了锁进行同步,性能下降了,所以使用“朴素”的手动创建线程的方式即可。
首先运行一遍代码,待文件被复制到目标路径之后,记下此次复制消耗的时间。不要删除复制后的文件,第二次运行代码,对比两次的运行时间,如果第二次的运行时间明显缩短,说明断点续传机制起作用了。
测试之后发现可以实现断点续传!大功告成🎉!
后记
每次在写有关技术的文章时,我都有些担心自己写错了,或是写得不够好。从构想到最后的实现,我参考了大量别人的代码,尽管这样,最后实现的效果还是没有达到我的预期。其实这篇文章早在去年十二月就动笔了,一直拖到现在才算勉强完成,每次产生新的想法或是发现新的问题,我几乎都是重新去看一遍相关的资料,这个过程不算轻松。本来的预想是写完整的客户端/服务器端的上传思路,但是现实永远是骨感的,于是只能退而求其次实现b站教程里面的思路。如果文章或代码存在问题请指正。