Java Nio
在 jdk 1.4
之后提供新的io
方式,称之为非阻塞io
。
有三个重要部分:Channels
通道、Buffers
缓冲区 和 Selectors
选择器。
缓冲区 Buffer
public abstract class Buffer {
......
}
Buffer
是缓存区的总父类,这是一个抽样类。因为Channels
通道只与缓存区进行数据交互,所以首先我们弄懂缓存区Buffer
的操作。
成员属性
// 牢记这一点: mark <= position <= limit <= capacity
// 标记值,使用 mark() 方法之后,记录position的值,再使用reset()方法,将记录的mark值重新赋值给position
private int mark = -1;
// 在写的模式下,position值等于已经写入数据最大位置值,在读的模式下,position值等于已经读过数据最大位置值,
private int position = 0;
// 在写的模式下,limit值等于capacity;在读的模式下,limit值等于已经写入数据最大位置值。
private int limit;
// 总容量值
private int capacity;
// 这个值只有在 direct buffer 中使用。表示这个 direct buffer 的内存地址。
long address;
Buffer
是即可以读,又可以写的。要实现这个特性,就要使用成员变量了。
模拟实现 Buffer 功能
我们来模拟实现这个功能,假设我们现在有一个整数数组arr
,数组大小是10
,看我们如何在这个数组上实现即能读又能写的功能。
- 先使用
capacity
记录一下数组长度值。因为当前数组中没有数据,那就只能先写。 - 当使用
arr[0]=0
后,我们已经添加了元素了,这个时候需要记录一下,添加过元素的位置,下次再添加元素时,就在下一个位置添加,这里使用position
记录。添加下一个元素就是arr[++position]=1
,即arr[1]=1
。最多能添加capacity
元素,position
值必须小于等于capacity
。 - 当写入元素之后,我们想要读取写入的元素。注意写入数据是从
arr[0]
到arr[position]
,这时使用limit
记录position
的值,再将position
重新设置为0
,那么写入的数据就是arr[position]
到arr[limit]
,就可以通过arr[position++]
不断读取写入数据。
Buffer
与上面介绍的有一点区别,刚开始的时候,会将capacity
赋值给 limit
。要保证一点就是 position <= limit <= capacity
。
所以我们可以看成:
- 在写的模式下,
arr[0]
到arr[position]
的值表示写入数据的区域,arr[position]
到arr[limit]
表示待写入的区域。 - 在读的模式下,
arr[position]
到arr[limit]
表示读取数据的区域,其他区域即使有数据,但是对我们来说都是无效区域。
注意:写和读模式在
Buffer
是没有严格区分的,即在Buffer
中没有任何字段表示当前Buffer
是在读模式还是写模式。这个是Buffer
最致命的问题。
举个例子就明白了:
- 一个新的
Buffer
,它的容量是10
,也就是说capacity
,因为是刚开始的时候,它的limit
等于capacity
,也是10
,position
就是0
。这个时候,我们添加两个元素,那么position
就变成了2
。 - 此时我们想读取这个
Buffer
已写入的内容,我们需要将limit = position; position = 0
,然后开始读取position
到limit
内容了,就是刚写入的内容。但是如果忘了limit = position; position = 0
这转换位置的操作,程序还是读取position
到limit
内容,这时这些内容并不是我们刚写入的内容,而是不确定的内容。
各个属性详细解释
-
capacity
表示总容量值,是一个固定值。注意当使用clear()
方法之后,会将capacity
值赋值给limit
。 -
position
和limit
:Buffer
的读写功能全靠这两个参数。当要向Buffer
中写数据时,待写入的区域就是position -> limit
,已经写入的数据就是0 -> position
。 当想从Buffer
中读取数据时,就是读取position -> limit
局域的数据,如果我们没有置换position
和limit
位置,那么我们将读取错误的数据。 -
mark
标记值,用于记录position
的值。主要实现标记重读和重写的功能。即我们已经向Buffer
中写入一些数据了,接下了写入数据时,可能会发生错误,想重新从该位置再次写入,这个时候就需要使用mark()
方法,将mark = position
, 当发生错误时,再使用reset()
方法,position = mark
将位置重置,继续写入。重新读也是如此。
重要方法
标记重读和重写
/**
* 打标记
* @return
*/
public final Buffer mark() {
mark = position;
return this;
}
/**
* 恢复标记
* @return
*/
public final Buffer reset() {
int m = mark;
// 在此之间,如果 mark值发生变化,小于0,那么就报错。
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
这是配套方法,实现标记重读和重写。
在
reset()
方法中,当mark < 0
的时候,会抛出异常。什么时候,mark
值会小于0
呢,当我们主动改变position
的时候,即原来的标记值没用了,会将mark
值重新设置成-1
。
清楚Buffer数据
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
注意
Buffer
并不是真正删除数据,而是改变了position
和limit
,那就找不到已经写入数据的正确局域,即使数据还保存在数组中,但是我们没法正确读取或者接着写入数据了。只能重新写了。
切换成读模式
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
只有调用这个方法之后,我们才能正确读取 Buffer
数据,否则将读取错误的数据。
重读或者重写
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
将 position
重写设置成 0
,表示即将重新读或者重新写。
剩余空间
// 是否有剩余空间
public final boolean hasRemaining() {
return position < limit;
}
// 还有多少剩余空间
public final int remaining() {
return limit - position;
}
在读模式下,表示还剩余多少读取空间,在写模式下,表示还有多少写入空间。
得到位置
final int nextGetIndex() {
if (position >= limit)
throw new BufferUnderflowException();
return position++;
}
final int nextGetIndex(int nb) {
if (limit - position < nb)
throw new BufferUnderflowException();
int p = position;
position += nb;
return p;
}
final int nextPutIndex() {
if (position >= limit)
throw new BufferOverflowException();
return position++;
}
final int nextPutIndex(int nb) {
if (limit - position < nb)
throw new BufferOverflowException();
int p = position;
position += nb;
return p;
}
不管是读取还是写入,获取位置的代码逻辑都是一样的,只不过抛出的异常不一样。就是增加position
的值,表示位置的改变。
ByteBuffer
ByteBuffer
是 Buffer
最重要的子类之一,通过它我们来了解 Buffer
是如何使用的。
成员属性
final byte[] hb;
final int offset;
boolean isReadOnly;
-
hb
: 储存数据的byte
数组。 -
offset
: 偏移量。即从offset
位置之后才是有效位置,之前的区域即不能写,也不能读。 -
isReadOnly
: 表示这个Buffer
只能读,不能写。
构造函数
ByteBuffer(int mark, int pos, int lim, int cap,
byte[] hb, int offset)
{
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}
这个构造函数是给子类堆内内存 HeapByteBuffer
调用的。
ByteBuffer(int mark, int pos, int lim, int cap) {
this(mark, pos, lim, cap, null, 0);
}
这个构造函数是给子类堆外内存 DirectByteBuffer
调用的。
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
通过静态方法创建堆内内存 HeapByteBuffer
对象。
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
通过静态方法创建堆外内存 DirectByteBuffer
对象。
堆内内存
HeapByteBuffer
就借助byte[]
数组来储存数据,因为byte[]
数组是在jvm
管理的内存之中,所以称为堆内内存。
堆外内存DirectByteBuffer
不使用数组这些东西,而是直接使用jvm
管控之外的一块内存区域, 来储存数据。
重要方法
get 系列方法
public abstract byte get();
获取当前位置(即position + offset
)的数据,并将 position
的值增加1
public abstract byte get(int index);
获取给与位置(即position + offset + index
)的数据。但是千万注意不会改变 position
的值。
例子:
public static void main(String[] args) throws Exception {
FileChannel fileChannel = aFile.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
for (int index = 0; index < 10; index++) {
byteBuffer.put((byte) ('0' + index));
}
// 一定要调用这个方法。
byteBuffer.flip();
System.out.println((char)byteBuffer.get());
System.out.println((char)byteBuffer.get(5));
System.out.println((char)byteBuffer.get());
}
运行结果:
0
5
1
可以看出 get(int index)
方法不会改变 position
的值。
public ByteBuffer get(byte[] dst) {
return get(dst, 0, dst.length);
}
public ByteBuffer get(byte[] dst, int offset, int length) {
checkBounds(offset, length, dst.length);
if (length > remaining())
throw new BufferUnderflowException();
int end = offset + length;
for (int i = offset; i < end; i++)
dst[i] = get();
return this;
}
将Buffer
中的一些数据写入到给与的 dst
数组中。如果想将Buffer
中写入数据全部读取到一个byte[]
,可以这么做:
public static void main(String[] args) throws Exception {
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
for (int index = 0; index < 10; index++) {
byteBuffer.put((byte) ('0' + index));
}
// 一定要调用这个方法。
byteBuffer.flip();
// byteBuffer.remaining() 可以得到读取数据的大小
byte[] dst = new byte[byteBuffer.remaining()];
byteBuffer.get(dst);
System.out.println(new String(dst));
}
put 系列方法
public abstract ByteBuffer put(byte b);
在当前位置(即position + offset
)写入数据b
,并将 position
的值增加1
。
public abstract ByteBuffer put(int index, byte b);
在给与位置(即position + offset + index
)写入数据b
。但是千万注意不会改变 position
的值。
public final ByteBuffer put(byte[] src) {
return put(src, 0, src.length);
}
public ByteBuffer put(byte[] src, int offset, int length) {
checkBounds(offset, length, src.length);
if (length > remaining())
throw new BufferOverflowException();
int end = offset + length;
for (int i = offset; i < end; i++)
this.put(src[i]);
return this;
}
将给与的byte[]
数组src
中的数据写入到本Buffer
中。
public ByteBuffer put(ByteBuffer src) {
if (src == this)
throw new IllegalArgumentException();
if (isReadOnly())
throw new ReadOnlyBufferException();
int n = src.remaining();
if (n > remaining())
throw new BufferOverflowException();
for (int i = 0; i < n; i++)
put(src.get());
return this;
}
将给与缓冲区src
中的数据写入到本Buffer
中。
public static void main(String[] args) throws Exception {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 将数据写入 byteBuffer 中
byteBuffer.put("你好世界".getBytes());
// 切换成 读模式
byteBuffer.flip();
// 将byteBuffer中的数据读取到 dst 中
byte[] dst = new byte[byteBuffer.remaining()];
byteBuffer.get(dst);
System.out.println(new String(dst));
}
通过 wrap()
方法创建 HeapByteBuffer
对象
public static ByteBuffer wrap(byte[] array) {
return wrap(array, 0, array.length);
}
public static ByteBuffer wrap(byte[] array,
int offset, int length)
{
try {
return new HeapByteBuffer(array, offset, length);
} catch (IllegalArgumentException x) {
throw new IndexOutOfBoundsException();
}
}
HeapByteBuffer(byte[] buf, int off, int len) {
super(-1, off, off + len, buf.length, buf, 0);
}
ByteBuffer(int mark, int pos, int lim, int cap,
byte[] hb, int offset)
{
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}
从上面的调用方法,可以看出通过 wrap
方法创建的 HeapByteBuffer
对象,它的 mark = -1, position = offset, limit = offset + length, capacity = array.length
。
这里有两个
offset
,一个是和array
相关,表示从offset
之后的array
数据才有用,所以将它赋值给offset
. 而ByteBuffer
中的offset
表示整个ByteBuffer
的数据,一般使用slice()
方法进行分片之后,才会有值。
注意,此时Buffer
的position -> limit
区域就是代表写入数据。也可以认为wrap
方法创建的HeapByteBuffer
对象就是已经处于读模式了,不需要在调用 flip() 方法了。
public static void main(String[] args) throws Exception {
ByteBuffer byteBuffer = ByteBuffer.wrap("你好世界".getBytes());
// 将byteBuffer中的数据读取到 dst 中
byte[] dst = new byte[byteBuffer.remaining()];
byteBuffer.get(dst);
System.out.println(new String(dst));
}
HeapByteBuffer
构造方法
HeapByteBuffer(int cap, int lim) {
super(-1, 0, lim, cap, new byte[cap], 0);
}
这个构造方法一般是由ByteBuffer
的allocate()
方法调用,创建一个指定大小的 Buffer
。
HeapByteBuffer(byte[] buf, int off, int len) {
super(-1, off, off + len, buf.length, buf, 0);
}
这个构造方法一般是由ByteBuffer
的wrap()
方法调用,根据外部的byte[]
数组创建一个 Buffer
.
protected HeapByteBuffer(byte[] buf,
int mark, int pos, int lim, int cap,
int off)
{
super(mark, pos, lim, cap, buf, off);
}
这个构造方法一般是由 slice()
或duplicate()
方法调用,用于分片或者复制Buffer
。
重要的方法
slice 方法
public ByteBuffer slice() {
return new HeapByteBuffer(hb,
-1,
0,
this.remaining(),
this.remaining(),
this.position() + offset);
}
这个方法只对读模式下的Buffer
有意义。即将剩余还未读取的内容,切成一个新的 Buffer
,可以在这个 Buffer
中进去读取。但是因为共享同一个 byte[]
数组hb
,所以对这个 Buffer
修改,也会影响到原来的 Buffer
。
duplicate 方法
public ByteBuffer duplicate() {
return new HeapByteBuffer(hb,
this.markValue(),
this.position(),
this.limit(),
this.capacity(),
offset);
}
创建新的 HeapByteBuffer
对象,但是因为共享同一个 byte[]
数组hb
,写的操作会相互影响。
asReadOnlyBuffer 方法
public ByteBuffer asReadOnlyBuffer() {
return new HeapByteBufferR(hb,
this.markValue(),
this.position(),
this.limit(),
this.capacity(),
offset);
}
创建只读的 HeapByteBufferR
对象。
compact 方法
public ByteBuffer compact() {
System.arraycopy(hb, ix(position()), hb, ix(0), remaining());
position(remaining());
limit(capacity());
discardMark();
return this;
}
如果已经从Buffer
中读取了一些数据,但是还有一些数据没有读完,这个时候想要向Buffer
中写入数据。
如果直接使用clear()
方法,那未读完的数据将会永远获取不到了,因为正确读取的位置没有了。
要保留未读完的数据,那么先将未读完position -> limit
区域的数据移动到开头,即变成0-> limit - position
区域,再将position = limit - position
, 然后设置limit = capacity
,因为主动改变了position
的值,我们要将mark
的值设置成-1
。这样就可以保留未读完的数据再重新读取了。注:remaining()
返回值就是limit - position
。
get系列方法
// 在 Buffer 中方法
final int nextGetIndex() {
if (position >= limit)
throw new BufferUnderflowException();
return position++;
}
final int checkIndex(int i) {
if ((i < 0) || (i >= limit))
throw new IndexOutOfBoundsException();
return i;
}
// 在 HeapByteBuffer 方法
// 加上偏移量的值
protected int ix(int i) {
return i + offset;
}
public byte get() {
return hb[ix(nextGetIndex())];
}
public byte get(int i) {
return hb[ix(checkIndex(i))];
}
可以看出get()
方法,即获取当前位置的值,也增加了position
的值了。
而get(int i)
方法,只是获取指定位置的值。
put系列方法
// 在 Buffer 中方法
final int nextGetIndex() {
if (position >= limit)
throw new BufferUnderflowException();
return position++;
}
final int checkIndex(int i) {
if ((i < 0) || (i >= limit))
throw new IndexOutOfBoundsException();
return i;
}
// 在 HeapByteBuffer 方法
// 加上偏移量的值
protected int ix(int i) {
return i + offset;
}
public ByteBuffer put(byte x) {
hb[ix(nextPutIndex())] = x;
return this;
}
public ByteBuffer put(int i, byte x) {
hb[ix(checkIndex(i))] = x;
return this;
}
可以看出put(byte x)
方法,即在当前位置写入byte
值,也增加了position
的值了。
而put(int i, byte x)
方法,只是在指定位置写入byte
值。
只读子类DirectByteBufferR
这个类继承 DirectByteBuffer
,主要是对 DirectByteBuffer
的所有写方法进行控制,使这个缓冲区Buffer
只能读,不能写。