websocket是什么?网上已经有太多文章写过了,这里就不介绍了。这篇文章主要分析Java-WebSocket实现原理
先简单介绍下使用方法吧
1.首先实现一个websocket服务类
public class MyWebSocketServer extends WebSocketServer {
public MyWebSocketServer(int port) {
super(new InetSocketAddress(port));
}
public MyWebSocketServer(InetSocketAddress address) {
super(address);
}
/**
* 当服务器成功启动时调用
*/
@Override
public void onStart() {
System.out.println("onStart---------->Server Started!");
}
/**
* 在握手完成后调用
*
* @param conn WebSocket实例
* @param handshake websocket握手实例
*/
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
//当接入新连接时调用
System.out.println("onOpen------->当前连接数" + getConnections().size());
}
/**
* 在websocket连接关闭后调用。
*
* @param conn
* @param code
* @param reason
* @param remote
*/
@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
//当已接入客户端断开时调用
System.out.println("onClose------->当前连接数" + getConnections().size());
}
/**
* 回调从远程主机接收的字符串消息
*
* @param conn websocket实例(可以利用该接口回调一些消息给客户端)
* @param message String类型数据
*/
@Override
public void onMessage(WebSocket conn, String message) {
//broadcast(message);//发消息给所有已连接客户端
System.out.println("onMessage: " + message);
}
/**
* 回调从远程主机接收的二进制消息
*
* @param conn
* @param message
*/
@Override
public void onMessage(WebSocket conn, ByteBuffer message) {
}
/**
* 这个方法将被调用,主要是因为IO或协议错误
*
* @param conn 可以是空,如果有错误不属于一个特定的websocket。例如,服务器端口无法绑定
* @param ex 异常会导致此错误
*/
@Override
public void onError(WebSocket conn, Exception ex) {
if (conn != null)
ex.printStackTrace();
// some errors like port binding failed may not be assignable to a specific websocket
}
}
}
2.启动MyWebsocket服务
public class ServerMain {
public static void main(String[] args) throws InterruptedException, IOException {
int port = 8887;
MyWebSocketServer mWebSocketServer = new MyWebSocketServer(port);
mWebSocketServer.start();//开启服务
System.out.println("MyWebSocketServer started on port: " + mWebSocketServer.getPort());
}
3.客户端连接websocket服务
public static void main(String[] args) throws URISyntaxException {
final WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://192.168.1.120:8887")) {
@Override
public void onOpen(ServerHandshake handshakedata) {
}
@Override
public void onMessage(String message) {
}
@Override
public void onClose(int code, String reason, boolean remote) {
}
@Override
public void onError(Exception ex) {
}
};
webSocketClient.connect();
new Thread(new Runnable() {
@Override
public void run() {
int mb=300;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
byte[] bytes = new byte[1024 * 1024 * mb];//300mb
for (int i = 0; i < 1024 * 1024 * mb; i++) {
bytes[i] = 1;
}
System.out.println("发送中...");
webSocketClient.send(bytes);//发送字节数据
System.out.println("发送成功...");
}
}).start();
}
接下来从源码分析下原理
1.先看看服务类WebSocketServer
从刚刚服务使用中我们可以看到先调用了一个 mWebSocketServer.start();
public abstract class WebSocketServer extends AbstractWebSocket implements Runnable {
public void start() {
if( selectorthread != null )
throw new IllegalStateException( getClass().getName() + " can only be started once." );
new Thread( this ).start();
}
public void run() {
if (!doEnsureSingleThread()) {
return;
}
if (!doSetupSelectorAndServerThread()) {//这里做一些信道的配置并启动socket服务
return;
}
try {
int iShutdownCount = 5;
int selectTimeout = 0;
// 这里是个死循环
while ( !selectorthread.isInterrupted() && iShutdownCount != 0) {
SelectionKey key = null;
try {
if (isclosed.get()) {
selectTimeout = 5;
}
int keyCount = selector.select( selectTimeout );
if (keyCount == 0 && isclosed.get()) {
iShutdownCount--;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> i = keys.iterator();
while ( i.hasNext() ) {
key = i.next();
if( !key.isValid() ) {
continue;
}
if( key.isAcceptable() ) {
//当有新的连接请求时,doAccept里面会产生一个新的socket信道,并在selector中注册OP_READ事件
doAccept(key, i);
continue;
}
// doRead这里主要会把收到的数据添加到阻塞队列中,然后会分配一个WebSocketWorker工作线程处理队列中的消息,再经过一系列处理最终会回调到onMessage,onError这些方法(WebSocketWorker这个线程是在doSetupSelectorAndServerThread启动的)
if( key.isReadable() && !doRead(key, i)) {
continue;
}
//这里主要处理写数据请求
if( key.isWritable() ) {
doWrite(key);
}
}
doAdditionalRead();
} catch ( CancelledKeyException e ) {
// an other thread may cancel the key
} catch ( ClosedByInterruptException e ) {
return; // do the same stuff as when InterruptedException is thrown
} catch ( WrappedIOException ex) {
handleIOException( key, ex.getConnection(), ex.getIOException());
} catch ( IOException ex ) {
handleIOException( key, null, ex );
} catch ( InterruptedException e ) {
// FIXME controlled shutdown (e.g. take care of buffermanagement)
Thread.currentThread().interrupt();
}
}
} catch ( RuntimeException e ) {
// should hopefully never occur
handleFatal( null, e );
} finally {
doServerShutdown();
}
}
private boolean doSetupSelectorAndServerThread() {
selectorthread.setName( "WebSocketSelector-" + selectorthread.getId() );
try {
server = ServerSocketChannel.open();// 打开监听信道
server.configureBlocking( false );// 设置为非阻塞模式
ServerSocket socket = server.socket();
socket.setReceiveBufferSize( WebSocketImpl.RCVBUF );
socket.setReuseAddress( isReuseAddr() );
socket.bind( address, getMaxPendingConnections() );//与本地端口绑定
selector = Selector.open();//创建选择器
/**
* 将通道(Channel)注册到通道管理器(Selector),并为该通道注册selectionKey.OP_ACCEPT事件
* 注册该事件后,当事件到达的时候,selector.select()会返回,
* 如果事件没有到达selector.select()会一直阻塞。
*/
server.register( selector, server.validOps() );
startConnectionLostTimer();
for( WebSocketWorker ex : decoders ){
ex.start();
}
onStart();
} catch ( IOException ex ) {
handleFatal( null, ex );
return false;
}
return true;
}
}
总结:WebSocketServer利用了selector信道选择器处理了读,写,连接请求,同时启用了多个WebSocketWorker工作线程,让每个线程负责维护多个客户端(所有客户端平均分配),处理各自所维护的客户端们发来的请求。
2.再看看客户端WebSocketClient
public abstract class WebSocketClient extends AbstractWebSocket implements Runnable, WebSocket {
//刚刚WebSocketClient实例化之后首先调用了connect()
public void connect() {
if( connectReadThread != null )
throw new IllegalStateException( "WebSocketClient objects are not reuseable" );
connectReadThread = new Thread( this );
connectReadThread.setName( "WebSocketConnectReadThread-" + connectReadThread.getId() );
connectReadThread.start();
}
public void run() {
InputStream istream;
try {
boolean isNewSocket = false;
if (socketFactory != null) {
socket = socketFactory.createSocket();//创建socket
} else if( socket == null ) {
socket = new Socket( proxy );
isNewSocket = true;
} else if( socket.isClosed() ) {
throw new IOException();
}
socket.setTcpNoDelay( isTcpNoDelay() );
socket.setReuseAddress( isReuseAddr() );
if (!socket.isConnected()) {
InetSocketAddress addr = new InetSocketAddress(dnsResolver.resolve(uri), this.getPort());
socket.connect(addr, connectTimeout);//socket连接服务端
}
// if the socket is set by others we don't apply any TLS wrapper
if (isNewSocket && "wss".equals( uri.getScheme())) {
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, null, null);
SSLSocketFactory factory = sslContext.getSocketFactory();
socket = factory.createSocket(socket, uri.getHost(), getPort(), true);
}
if (socket instanceof SSLSocket) {
SSLSocket sslSocket = (SSLSocket)socket;
SSLParameters sslParameters = sslSocket.getSSLParameters();
onSetSSLParameters(sslParameters);
sslSocket.setSSLParameters(sslParameters);
}
istream = socket.getInputStream();
ostream = socket.getOutputStream();
sendHandshake();
} catch ( /*IOException | SecurityException | UnresolvedAddressException | InvalidHandshakeException | ClosedByInterruptException | SocketTimeoutException */Exception e ) {
onWebsocketError( engine, e );
engine.closeConnection( CloseFrame.NEVER_CONNECTED, e.getMessage() );
return;
} catch (InternalError e) {
// https://bugs.openjdk.java.net/browse/JDK-8173620
if (e.getCause() instanceof InvocationTargetException && e.getCause().getCause() instanceof IOException) {
IOException cause = (IOException) e.getCause().getCause();
onWebsocketError(engine, cause);
engine.closeConnection(CloseFrame.NEVER_CONNECTED, cause.getMessage());
return;
}
throw e;
}
writeThread = new Thread( new WebsocketWriteThread(this) );
writeThread.start();//启动一个写数据的线程(该线程做了啥,下方写了)
byte[] rawbuffer = new byte[ WebSocketImpl.RCVBUF ];
int readBytes;
try {//这里通过一个死循环,不断读数据
while ( !isClosing() && !isClosed() && ( readBytes = istream.read( rawbuffer ) ) != -1 ) {
engine.decode( ByteBuffer.wrap( rawbuffer, 0, readBytes ) );//这里经过一系列处理,最终还是会调用onMessage(),onError()这些方法
}
engine.eot();
} catch ( IOException e ) {
handleIOException(e);
} catch ( RuntimeException e ) {
onError( e );
engine.closeConnection( CloseFrame.ABNORMAL_CLOSE, e.getMessage() );
}
connectReadThread = null;
}
private class WebsocketWriteThread implements Runnable {
private final WebSocketClient webSocketClient;
WebsocketWriteThread(WebSocketClient webSocketClient) {
this.webSocketClient = webSocketClient;
}
@Override
public void run() {
Thread.currentThread().setName( "WebSocketWriteThread-" + Thread.currentThread().getId() );
try {
runWriteData();//调用了个写数据的方法
} catch ( IOException e ) {
handleIOException( e );
} finally {
closeSocket();
writeThread = null;
}
}
private void runWriteData() throws IOException {
try {
while( !Thread.interrupted() ) {//这用了个死循环
ByteBuffer buffer = engine.outQueue.take();//再这里利用阻塞队列取队列数据,添加新数据并唤醒,他才会继续执行
ostream.write( buffer.array(), 0, buffer.limit() );//这里写数据给服务端
ostream.flush();
}
} catch ( InterruptedException e ) {
for (ByteBuffer buffer : engine.outQueue) {
ostream.write( buffer.array(), 0, buffer.limit() );
ostream.flush();
}
Thread.currentThread().interrupt();
}
}
private void closeSocket() {
try {
if( socket != null ) {
socket.close();
}
} catch ( IOException ex ) {
onWebsocketError( webSocketClient, ex );
}
}
}
}
总结:WebSocketClient直接用的socket,写数据利用了阻塞队列和一个死循环,不断读取队列中有没有新消息要处理。读数据(istream.read)还是直接用输入流和死循环读的。
关于socket心跳包的一些疑问
socket连接时到底有没有必要通过一个心跳包来维护长连接?
答案是有必要,长时间不交互数据tcp长连接会断开(防火墙,路由器记录消失等因素)。
使用应用层socket心跳包有哪些作用?
当TCP连接的一方突然发生机器断电关机、系统死机、网线松动等情况下,连接的对端进程可能检测不到任何异常,并最后等待“超时”才断开TCP连接。所以此时心跳包的作用就体现出来了,不仅可以检测对方是否在线,还能及时回收资源。
当然socket在传输层也自带心跳探测,不过由于时效性,不方便检测连接是否可用等因素,我们还是实现自己的心跳包比较好。上文的websocket当然也实现了心跳机制不需要我们再实现了。