文本根据ActionCable 5.1.0版本的代码进行讲解。
ActionCable可以在Rails5中实现集成WebSocket通讯功能。其实都得益于它所依赖的三个第三方库:websocket-driver nio4r concurrent-ruby 其中websocket-driver 负责Websocket通讯,nio4r处理IO,concurrent-ruby 处理并发worker。
nio4r
nio4r是Java 领域中NIO的Ruby实现版本,最早它是作为celluloid的底层I/O库,后来被作为底层I/O库单独开源它的底层I/O是使用了libev(一种精简高效的C/C++ I/O库)。
nio4r实现了三大特性:
- selectors: 使用Monitors监听多个I/O对象的就绪状态。
- Monitors: 追踪注册在监听器上的特定I/O事件。
- ByteBuffers: 内建的堆外缓存区,支持零复制i/o操作。
ActionCable::Connection::StreamEventLoop 流数据的事件循环类,nio4r在其中被用在与Websocket监听socket的I/O控制。Ruby标准库其实是提供I/O多路复用的,但是它的功能比较弱,仅支持select/poll系统调用。而使用nio4r能够提供epoll/kqueue等在时间复杂度为O(1)的系统调用函数。
module ActionCable
module Connection
class StreamEventLoop
def attach(io, stream)
@todo << lambda do
@map[io] = @nio.register(io, :r)
@map[io].value = stream
end
wakeup
end
def run
loop do
next unless monitors = @nio.select
# ....
monitors.each do |monitor|
io = monitor.io
stream = monitor.value
incoming = io.read_nonblock(4096, exception: false)
case incoming
when :wait_readable
next
when nil
stream.close
else
# 读取后的数据就将交由 Stream
stream.receive incoming
end
end
end
end
end
end
end
上面的代码就是经过简化后的事件循环类的代码,在ActionCable中在每创建一个连接后就要持续保存I/O的监听状态,这一部分的就是交由nior4r来完成的,可以看到attach方法将Rack闯入的hijack_io对象注册到nio中,然后在run方法的循环中通过nio.select不断监听是否有新的数据流入,如果有的话,就会使用Ruby IO对象的read_nonblock方法读出缓冲区的数据。最后在将读好的数据交给 websocket-driver进行处理。
concurrent-ruby
在ActionCable中每一个Channel中的action或者是callback的执行都是异步的,这样就使得在主线程中执行这些操作就会中断和阻塞的情况,所以ActionCable的解决方案就是使用线程池,concurrent-ruby就是提供了这一部分的功能,在ActionCable的主类,Server::Base中有定义:
require "monitor"
module ActionCable
module Server
class Base
# ......
# 线程池的大小在默认的情况下是4个,如果需要定制可以通过worker_pool_size参数进行配置。
def worker_pool
@worker_pool || @mutex.synchronize do
@worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
end
end
# ......
end
end
上面代码中的使用的ActionCable::Server::Worker就是ActionCable中的线程池类,它的内部封装了concurrent-ruby的线程池实现。
module ActionCable
module Server
# 每一条ActionCable的消息都是使用Worker线程池中的一个线程单独运行。
class Worker
def initialize(max_size: 5)
@executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end
end
end
end
websocket-driver
websocket-driver见名知意,就是为ActionCable提供websocket处理能力的驱动程序,要知道在ActionCable的5.0版本中是使用faye-websocket-ruby这个库进行处理websocket请求,后来因为EventMachine的I/O处理性能不如nio4r好,所以就将原本faye-websocket-ruby的工作拆分成了使用nio4r处理i/o,用websocket-driver专门处理websocket的相关细节,其实也就是websocket协议的处理和响应。
在ActionCable中使用ClientSocket类对websocket-driver进行了封装:
module ActionCable
module Connection
class ClientSocket
def initialize(env, event_target, event_loop, protocols)
@env = env
@event_target = event_target
@event_loop = event_loop
@url = ClientSocket.determine_url(@env)
@driver = @driver_started = nil
@close_params = ["", 1006]
@ready_state = CONNECTING
# The driver calls +env+, +url+, and +write+
@driver = ::WebSocket::Driver.rack(self, protocols: protocols)
@driver.on(:open) { |e| open }
@driver.on(:message) { |e| receive_message(e.data) }
@driver.on(:close) { |e| begin_close(e.reason, e.code) }
@driver.on(:error) { |e| emit_error(e.message) }
@stream = ActionCable::Connection::Stream.new(@event_loop, self)
end
end
end
end
上面的代码可以看到driver使用了 on message回调处理接受到的请求,其中receive_message方法是在ClientSocket类中将接受到的数据传送给MessageBuffer,数据缓冲完成后最好将交由Connection类调用线程池中的线程处理数据。
接受数据的方法其实不在websocket-driver中处理,是由我们上面讲到的nio4r读取i/o对象传送的流数据然后通过driver.parse方法传入websocket-driver进行websocket协议的相关处理最后转换数据。
最后
上面介绍了三个底层库分别被使用在ActionCable中的各个组件当中,下面的顺序图就是这些组件的调用流程。
- websocket请求启动流程:
- websocket接受数据处理流程: