前提netty常用方法
通过学习RocketMQ源码发现其中涉及到很多netty编程,下面先简单梳理一下Netty 常用类和方法,本次重点说一下ByteBuffer和SocketChannel的API
1、ByteBuffer
netty涉及到所有网络通信字节流都是通过ByteBuffer来完成的,ByteBuffer特性包括:动态扩容、读写操作采用不同指针不需要随意切换、支持缓冲池等特性。
netty创建内存空间是堆外内存,最终调用的则是Unsafe#allocateMemory完成内存创建
ByteBuffer reportOffset = ByteBuffer.allocate(8);
ByteBuffer有4个属性,mark<=position<=limit<=capacity
- capacity:容纳的最大数据量
- limit:缓冲区的当前终点
- position:下一个被读或写的元素的索引,每次读写都会改变值
- mark:标记位置,数据读取关键位置,reset方法方便回退
基于这几个属性值,可以实现以下方法
limit() // 设定ByteBuffer空间容量大小
reset() // position设置为mark位置,回退到已读取数据的位置
clear() //初始化,但不影响byte数据内容
flip() // 翻转就是将存数据状态转变为获取数据状态
hasRemaining() //返回是否还是未读的数据内容 limit - position>0
get() //从position位置读取Byte数据,position加1
get(int index) //读取底层下标值
put(byte b) //写数据,向position指向的地址写字节
2、SocketChannel
- 创建服务端Channel:本质通过反射技术调用JDK底层Channel
- 初始化Channel:设置Socket参数和用户自定义属性,并在pipeline上添加两个特殊handler。设置是否使用阻塞模式:true/false。configureBlocking(false)
- 注册服务端Channel:调用底层JDK将channel注册到Selector上。register(Selector,int) 第一个参数可以传入一个选择器对象,第二个可传入SelectionKey代表操作类型的四个静态整型常量中的一个,表示该选择器关心的操作类型。包括OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE
- 端口绑定:调用底层JDK将端口绑定在channel,同时将OP_ACCEPT事件添加到Channel事件中。
主从同步流程
1、master启动建立监听
HAService#start 启动流程主要包括 acceptSocketService用于服务端接受连接线程实现类、groupTransferService 判断主从是否同步复制完成、HAClient HA客户端实现。
public void start() throws Exception {
// 建立HA服务端监听服务,并启动监听逻辑
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
// HA客户端线程
this.haClient.start();
}
首先看一下 AcceptSocketService 具体实现, beginAccept()核心是创建ServerSocketChannel、创建selector、端口绑定、设置为非阻塞、并在Select上注册连接事件。
2、处理连接请求
AcceptSocketService# run() 则每隔1s进行处理连接事件,这里只处理OP_ACCEPT事件, 当有SocketChannel 返回说明有一个slave和master建立连接请求,Master则为每一个连接创建一个HAConnetction。
// AcceptSocketService#beginAccept
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open(); //
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while (!this.isStopped()) {
// select进行轮训获取监听数据
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
try {
// 创建 HAConnection对象,主要复制M-S数据同步
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
HAService.this.addConnection(conn);
}
}
}
}
selected.clear();
}
}
}
}
slave端启动并同步偏移量
1、启动 HAClient线程HAService #start :HAClient执行run方法 这里是while循环,主要做了三件事情。
- 连接master节点,没有连接成功等待5s。
- 判断是否需要向master上报当前节点commitlog的消息偏移量。this.isTimeToReportOffset() 主要进行判断了拉取间隔是否大于haSendHeartbeatInterval默认是5s。
- 上报currentReportedOffset偏移量。
public void run() {
while (!this.isStopped()) {
try {
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
} else {
this.waitForRunning(1000 * 5);
}
}
}
}
2、HAClient#connectMaster 连接master:过程是创建socketChannel注册到selector上并注册OP_READ事件。对于slave怎么知道master地址的?这里BrokerController启动的时候 主要通过向NameServer 上报元数据调用registerBrokerAll时返回的RegisterBrokerResult值,其中就包含了masterAddr。
开始初始化当前节点 commitlog的最大偏移量和当前时间戳,开始进行数据同步操作。
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
3、上报偏移量HAClient#reportSlaveMaxOffset(this.currentReportedOffset):则是socketChannel.write 发送网络请求将slave偏移量上报给master节点。该方法 其实就是ByteBuffer操作,可以结合开篇ByteBuffer API来看。
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
}
}
return !this.reportOffset.hasRemaining();
}
master节点处理请求
master端更新slave最大偏移量并同步消息
HAConnection 按照固定端口监听客户端连接,当客户端构建好channel后封装HAConnection,开始执行任务,主要包括两个核心内部类进行网络操作ReadSocketService读操作和WriteSocketService写操作
1、HAConnection#ReadSocketService: 创建选择器后注册读事件。run 主要从从服务器拉取请求,每隔1s循环一次,获取从服务器最大偏移量是多少并更新。读取3次后数据大小为0则结束本次读取
public void run() {
while (!this.isStopped()) {
try {
this.selector.select(1000);
boolean ok = this.processReadEvent();
}
}
}
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPosition = pos;
HAConnection.this.slaveAckOffset = readOffset;
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
}
}
}
return true;
}
2、HAConnection$WriteSocketService:根据ReadSocketService获取从服务器更新的最大偏移量,然后将主服务器增量的数据同步。
当nextTransferFromWhere为-1 则表示初次进行数据传输。否则按照当前master的下一个消息的偏移量减去slave最大偏移量区间。从commitLog中获取消息 按照socketChannel将消息写给slave。同时在写操作时并不是一次就将数据写完,会分多次网络写操作。
if (this.lastWriteOver) {
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
} else {
this.lastWriteOver = this.transferData();
}
// this.transferData
// Write Body
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
}
}
}
Slave处理master返回的数据
HAService$HAClient#processReadEvent
Slave端开始进行处理从master传回的消息数据,当读取的数据大于0,调用dispatchReadRequest()转发处理请求。否则连续3次从网络读取的数据为0结束本次读操作。
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
}
}
}
return true;
}
HAService$HAClient# dispatchReadRequest主要从byteBufferRead中解析消息,并将消息存储在commitlog文件中。整个过程主要包括三件事情
1. msgHeaderSize = 8+4,表示消息的物理偏移量和消息的长度,根据这个判断是否读到一个完整的消息。否则将本次数据先备份一下,等待下一个读取网络数据
2. 获取当前消息文件的最大物理偏移量,如果slave的最大物理偏移量与master给的偏移量不相等,则返回false
3. DefaultMessageStore#appendToCommitLog方法将消息内容追加到消息内存映射文件中,然后唤醒ReputMessageService实时将消息转发给消息消费队列与索引文件,更新dispatchPosition,并向服务端及时反馈当前已存储进度。
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
if (diff >= msgHeaderSize) {
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
this.byteBufferRead.get(bodyData);
// 进行消息同步,将日志消息写入byteBuffer中
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += msgHeaderSize + bodySize;
continue;
}
}
}
return true;
}
常见问题
brokerIP1和BrokerIP2区别
BrokerIP1: 当前Broker监听的IP
BrokerIP2: broker主从时,在Broker主节点配置,broker从节点会连主节点IP2进行同步。
主从服务器都在运行过程中,消息消费者是从主拉取消息还是从从拉取?
答:默认情况下,RocketMQ消息消费者从主服务器拉取,当主服务器积压的消息超过了物理内存的40%,则建议从从服务器拉取。但如果slaveReadEnable为false,表示从服务器不可读,从服务器也不会接管消息拉取。