okio是okhttp的底层io库,是一个我用起来比较方便io库。然而知其然不知其所以然,所以我决定研究一下okio的源码,这篇文件主要记录下我学习okio源码的心得。
okio的缓存类Segment.java
okio的原理是将要write、read的数据以byte[]的形式先缓存起来,然后再将缓存的数据write到目的地或者read成想要形式。而做到缓存数据的类就是Segment.java
我们直接看源码:
final class Segment {
/** 一个Segment可以缓存数据的大小、源码中定成8KB */
static final int SIZE = 8192;
/** 将Segment缓存的数据分享出去条件 */
static final int SHARE_MINIMUM = 1024;
/** 缓存的数据*/
final byte[] data;
/** 数据可以被读取的起点*/
int pos;
/** 数据可以被读取的终点*/
int limit;
/** 数据是分享出去、或者分享得到的*/
boolean shared;
/** 对数据拥有操作pos、limit的权限,分享得到的数据是没有操作权限的*/
boolean owner;
/** 下一个Segment节点*/
Segment next;
/** 上一个Segment节点*/
Segment prev;
/** 一个拥有操作数据权限的Segment构造方法*/
Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}
/** 一个分享得到的Segment构造方法*/
Segment(Segment shareFrom) {
this(shareFrom.data, shareFrom.pos, shareFrom.limit);
shareFrom.shared = true;
}
/** 一个分享得到的Segment构造方法*/
Segment(byte[] data, int pos, int limit) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.owner = false;
this.shared = true;
}
/**
* 双向链表pop操作
* 链表中删除自己,返回下一个节点操作
*/
public @Nullable Segment pop() {
Segment result = next != this ? next : null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
return result;
}
/**
* 双向链表push操作
* 自己后面加入segment节点
*/
public Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}
//=================================================
// 之后都是优化Segment缓存数据的函数
//=================================================
/**
* 分割操作
* 把自己数据分割byteCount个出去
*/
public Segment split(int byteCount) {
//如过byteCount<= 0 或者 自己并没有byteCount个数据,则抛出异常
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
if (byteCount >= SHARE_MINIMUM) {
//如果要分割的数据数达到分享条件,把自己分享出去
prefix = new Segment(this);
} else {
//如果没达到分享条件则从SegmentPool缓存池中获取一块可用的缓存空间
prefix = SegmentPool.take();
//把自己的数据复制给prefix
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
//分割操作的本质就是改变自己的pos、和prefix的limit达到分割的目的
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
/**
* 将分割的数据push到自己之前的节点
* 因为prefix的数据和在自己的数据顺序关系是prefix在自己之前
*/
prev.push(prefix);
//return分割出去的Segement
return prefix;
}
/**
* 写入操作
* 将自己的byteCount个数据写入另一个Segment
*/
public void writeTo(Segment sink, int byteCount) {
//如果另一个Segment没有操作权限,直接抛出异常
if (!sink.owner) throw new IllegalArgumentException();
//如果另一个Segment没有足够的连续空间写入,则尝试压缩data[]使其拥有足够的连续空间
if (sink.limit + byteCount > SIZE) {
//如果另一个Segment分享出去了,那么就不能压缩data[],抛出异常
if (sink.shared) throw new IllegalArgumentException();
//如果另一个Segment压缩data[]之后还是没有只够的连续空间,抛出异常
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
//压缩data[]操作即:将数据的pos移动到data[0]的位置
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
//经过或没压缩data[]操作后,将自己的byteCount个数据写入另一个Segment
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
/**
* 合并Segement操作
* 尝试将自己和前面一个节点合并,压缩data[]达到优化缓存的目的
*/
public void compact() {
//如果前面一个节点就是自己,抛出异常
if (prev == this) throw new IllegalStateException();
//如果前面一个节点没有操作权限则不能合并
if (!prev.owner) return;
//计算自己有多少数据
int byteCount = limit - pos;
//计算前面一个节点有多少剩余空间
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
//如果自己的数据个数大于前面节点的剩余空间则不能进行合并
if (byteCount > availableByteCount) return;
//将数据写入前一个节点
writeTo(prev, byteCount);
//双向链表中删除自己
pop();
//缓存池回收
SegmentPool.recycle(this);
}
}
从源码可以看出Segment是一个双向链表结构,源码中有一个SegmentPool(缓存池)。这个类是用来维护Segment的,作用是回收利用Segment。我们来看下源码:
final class SegmentPool {
/** 缓存池的最大SIZE为64KB
* 在Segment源码中我们知道1个Segment的大小为8KB
* 即缓存池可以回收利用的Segment最多为8个
*/
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
/** 第一个可以回收再利用的Segment*/
static @Nullable Segment next;
/** 缓存池现在拥有可再利用的缓存大小,一定是8KB的倍数 */
static long byteCount;
private SegmentPool() {
}
/**
* 获取一个拥有操作权限的Segement
*/
static Segment take() {
synchronized (SegmentPool.class) {
//如果缓冲池中有就从缓存池中获取
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
//如果没有则直接new一个操作权限的Segement
return new Segment();
}
/**
* 回收Segment
*/
static void recycle(Segment segment) {
//如果segment还没从双向链表中脱离出则抛出异常
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
//如果segement是分享得来的或分享出去的、则不能被回收
if (segment.shared) return;
synchronized (SegmentPool.class) {
//如果缓存池已经满了,不能回收这个segment了
if (byteCount + Segment.SIZE > MAX_SIZE) return;
//将segment回收到缓存池链表
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
}
从static void recycle(Segment segment){...}
函数可以看出,SegmentPool的缓存池是用一个单向链表来维护的,与Segment用双向链表维护不同。Segment中使用双向链表是为了让数据的压缩、分割、合并操作,更加方便和高效。而SegmentPool没有这一需求,只要保证static Segment take(){...}
能得到Segement就好。
okio基本io结构
看完缓存我们来看看最重要的io操作
上面的类图描述了一个最基本的io操作需要用到的东西,之后会讲到。
Sink和Source是okio库中最基础io操作接口,定义了任何read、write操作都是从Buffer持有的Segment缓存中获取数据再进行read、write。那么如何把数据read到缓存中,以及如何将缓存中的数据write到目的地呢?以write为例我们看Okio类中的一段源码:
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
//获取Buffer的Segment缓存
Segment head = source.head;
//计算要写入的数据个数
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
//使用OutputStream将缓存数据写入目标
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
};
}
上面这段源码可以看出Sink实际上是OutputStream的包装,把缓存在Segment中的数据写入目的地还是由OutputStream进行。同理Source也是InputStream的包装,将数据读取到Segment缓存还是由InputStream进行。
接下来我们看BufferedSink和BufferedSouce,这2个接口定义了各种类型的数据写入Segment函数和把Segment数据以各种类型读出的函数,方便大家使用。具体的实现实在Buffer中进行的。
举个例子,BufferedSink接口中的定义了这么一个把数据源写入缓存的函数
//将source[]中的数据从offset位置写byteCount个到Segment缓存
BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
在Buffer中实现如下
@Override
public Buffer write(byte[] source, int offset, int byteCount) {
//如果你要写入目标的数据源source为空,抛出异常
if (source == null) throw new IllegalArgumentException("source == null");
//检查offset、byteCount、source.length是否有数据越界的关系,有则抛出异常
checkOffsetAndCount(source.length, offset, byteCount);
//计算要写入数据的终点
int limit = offset + byteCount;
//如果要写入数据的偏移位置小于要写入数据的终点,开始写入
while (offset < limit) {
//获取一个拥有操作权限的Segment
Segment tail = writableSegment(1);
//计算要写入这个Segment的字节数
int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
//写入
System.arraycopy(source, offset, tail.data, tail.limit, toCopy);
offset += toCopy;
tail.limit += toCopy;
}
size += byteCount;
return this;
}
/**
* 获取一个可用容量大等于minimumCapacity,且拥有权限的Segment
*
* @param minimumCapacity 最小可用容量
*/
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
//如果Buffer中没有Segment缓存,则直接从缓存池中获取一个Segment并将其作为head节点
head = SegmentPool.take();
return head.next = head.prev = head;
}
//获取双向链表的最后一个节点
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
/**
* 如果最后一个节点没有足够的容量,或者没有操作权限。
* 则从缓冲池中获取一个Segment,并push到双向链表的最后一个节点
*/
tail = tail.push(SegmentPool.take());
}
return tail;
}
同理,举一个BufferedSouce中的读取函数例子
//把Segement缓存中的数据读取一个byte,以byte形式返回
byte readByte() throws IOException;
在Buffer中实现如下
@Override
public byte readByte() {
//如果缓存数据size==0,抛出异常
if (size == 0) throw new IllegalStateException("size == 0");
//获取缓存的头节点
Segment segment = head;
int pos = segment.pos;
int limit = segment.limit;
byte[] data = segment.data;
//读取1字节
byte b = data[pos++];
size -= 1;
if (pos == limit) {
/**
*如果读取后head节点没有可以读取的数据了
*则pop掉head节点,并且把head节点的下一个节点作为head
*/
head = segment.pop();
//缓存池回收
SegmentPool.recycle(segment);
} else {
segment.pos = pos;
}
return b;
}
现在下来理一下我们知道了的write、read流程:
- write
- read
那么如何将Okio实现的Sink、Source与Buffer连接起来呢?
答案是RealBufferedSink和RealBufferedSource
我们先来看看RealBufferedSource的部分源码
final class RealBufferedSource implements BufferedSource {
/**读写Segment缓存的Buffer*/
public final Buffer buffer = new Buffer();
/**包装了InputStream的Source*/
public final Source source;
/**用来判断输入流是否关闭*/
boolean closed;
/**构造函数传入包装了InputStream的Source*/
RealBufferedSource(Source source) {
if (source == null) throw new NullPointerException("source == null");
this.source = source;
}
/**以字节形式读取1个字节*/
@Override
public byte readByte() throws IOException {
/**
* read前先请求说明需要从buffer的Segment中获取1个byte,
* 1.如果buffer的Segment中有1个byte,则不进行任何操作
* 2.如果buffer的Segment中没有1个byte
* 则使用Source包装的InputStream读取Segment.SIZE个数据到buffer的Segment中
* 如果InputStream读取不到,则抛出异常
*/
require(1);
//buffer的Segment中数据以byte
return buffer.readByte();
}
@Override
public void require(long byteCount) throws IOException {
if (!request(byteCount)) throw new EOFException();
}
/**读取到Segment*/
@Override
public boolean request(long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
while (buffer.size < byteCount) {
if (source.read(buffer, Segment.SIZE) == -1) return false;
}
return true;
}
}
可以看出Okio类创建的包装了InputStream的Source实例通过构造函数传入RealBufferedSource类,RealBufferedSource类自己持有一个Buffer,这样就将read流程的1、2步骤连接起来了
同理,我们再看看RealBufferedSink的部分源码
final class RealBufferedSink implements BufferedSink {
/**读写Segment缓存的Buffer*/
public final Buffer buffer = new Buffer();
/**包装了OutputStream的Sink*/
public final Sink sink;
/**用来判断输出流是否关闭*/
boolean closed;
/**构造函数传入包装了OutputStream的Sink*/
RealBufferedSink(Sink sink) {
if (sink == null) throw new NullPointerException("sink == null");
this.sink = sink;
}
/**写入Segment*/
@Override
public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
//如果输出流关闭了,抛出异常
if (closed) throw new IllegalStateException("closed");
//向buffer的Segment缓存写入数据
buffer.write(source, offset, byteCount);
//完成写入Segment缓存,提交给skin包装的OutputStream将缓存写到目的地
return emitCompleteSegments();
}
/**Segment写到目的地*/
@Override
public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
//先获得t缓存中有多少数据
long byteCount = buffer.completeSegmentByteCount();
//调用sink包装的OutputStream将缓存写到目的地
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
}
可以看出Okio类创建的包装了OutputStream的Sink实例通过构造函数传入RealBufferedSink类,RealBufferedSink类自己持有一个Buffer,这样就将write流程的1、2步骤连接起来了
到此Okio的read、write流程学习完毕。
其他无关的废话
刚走上开发的道路,各位大佬多多指教。另外,杭州3个月工作经验,4个月实习经验的Android开发有需要的吗?