HTTP协议的局限性
HTTP协议的生命周期是通过Request和Response来界定的,而Response是被动的(服务端不能主动与客户端通信),收到 一次请求才会返回一次响应。而当服务端需要主动和客户端进行通信,或者需要建立全双工通信(保持在一个连接中)时,HTTP就力不从心了。
在Websocket出现之前,实现全双工通信的方式主要是ajax轮询和long poll,这样是非常消耗性能的。
Websocket
WebSocket是HTML5 新增加的特性之一,目前主流浏览器大都提供了对其的支持。其特点是可以在客户端和服务端之间建立全双工通信,一些特殊场景,例如实时通信、在线游戏、多人协作等,WebSocket都可以作为解决方案。
Spring自4.0版本后增加了WebSocket支持,本例就使用Spring WebSocket构建一个简单实时聊天的应用。
服务端配置
WebSocketHandler
Spring WebSocket提供了一个WebSocketHandler接口,这个接口提供了WebSocket连接建立后生命周期的处理方法。
public interface WebSocketHandler {
/**
* 成功连接WebSocket后执行
*
* @param session session
* @throws Exception Exception
*/
void afterConnectionEstablished(WebSocketSession session) throws Exception;
/**
* 处理收到的WebSocketMessage
* (参照org.springframework.web.socket.handler.AbstractWebSocketHandler)
*
* @param session session
* @param message message
* @throws Exception Exception
*/
void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
/**
* 处理传输错误
*
* @param session session
* @param exception exception
* @throws Exception Exception
*/
void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
/**
* 在两端WebSocket connection都关闭或transport error发生后执行
*
* @param session session
* @param closeStatus closeStatus
* @throws Exception Exception
*/
void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
/**
* Whether the WebSocketHandler handles partial messages. If this flag is set to
* {@code true} and the underlying WebSocket server supports partial messages,
* then a large WebSocket message, or one of an unknown size may be split and
* maybe received over multiple calls to
* {@link #handleMessage(WebSocketSession, WebSocketMessage)}. The flag
* {@link WebSocketMessage#isLast()} indicates if
* the message is partial and whether it is the last part.
*/
boolean supportsPartialMessages();
}
WebSocketSession
WebSocketSession不同于HttpSession,每次断开连接(正常断开或发生异常断开)都会重新起一个WebSocketSession。
这个抽象类提供了一系列对WebSocketSession及传输消息的处理方法:
/**
* WebSocketSession id
*/
String getId();
/**
* 获取该session属性的Map
*/
Map<String, Object> getAttributes();
/**
* 发送WebSocketMessage(TextMessage或BinaryMessage)
*/
void sendMessage(WebSocketMessage<?> message) throws IOException;
/**
* 判断是否在连接
*/
boolean isOpen();
/**
* 关闭连接
*/
void close() throws IOException;
WebSocketMessage<T>
spring WebSocket提供了四种WebSocketMessage的实现:TextMessage(文本类消息)、BinaryMessage(二进制消息)、PingMessage、PongMessage(后两者用于心跳检测,在一端收到了Ping消息的时候,该端点必须发送Pong消息给对方,以检测该连接是否存在和有效)。
// 通过getPayload();方法获取WebSocketMessage的有效信息
T getPayload();
HandshakeInterceptor
HandshakeInterceptor接口是WebSocket连接握手过程的拦截器,通过实现该接口可以对握手过程进行管理。值得注意的是,beforeHandshake中的attributes与WebSocketSession中通过getAttributes();返回的Map是同一个Map,我们可以在其中放入一些用户的特定信息。
public interface HandshakeInterceptor {
/**
* 握手前
*/
boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception;
/**
* 握手后
*/
void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception);
}
WebSocketConfigurer
通过实现WebSocketConfigurer接口,可以注册相应的WebSocket处理器、路径、允许域、SockJs支持。
public interface WebSocketConfigurer {
/**
* 注册WebSocketHandler
*/
void registerWebSocketHandlers(WebSocketHandlerRegistry registry);
}
客户端配置
核心API
url为指定的WebSocket注册路径,当协议为http时,使用ws://,当协议为https,使用wss://。
var path = window.location.hostname + ":****/" + window.location.pathname.split("/")[1];
var websocket = new WebSocket('ws://' + path + '/****Handler');
// 新建连接
websocket.onopen = function () {
// ...
};
// 收到消息
websocket.onmessage = function (event) {
// ...
};
// 传输错误
websocket.onerror = function () {
// ...
};
// 关闭
websocket.onclose = function () {
// ...
};
// onbeforeunload,窗口刷新、关闭事件前执行
window.onbeforeunload = function () {
// ...
};
// 发送消息
websocket.send();
onmessage的event对象:
可以看出,应使用event.data获取服务端发送的消息。
SockJs支持
有的浏览器不支持WebSocket,使用SockJs可以模拟WebSocket。
if (window.WebSocket) {
console.log('Support WebSocket.');
websocket = new WebSocket('ws://' + path + '/****Handler');
} else {
console.log('Not Support WebSocket!);
websocket = new SockJS('http://' + path + '/****Handler')
}
实现思路
以下使用WebSocket构建一个实时聊天应用。
1.客户端与服务端通信只使用TextMessage(文本类消息),客户端只能发送聊天文本,服务端可以单播和广播消息,包括聊天文本、上线、下线、掉线、用户列表信息、认证信息和服务器时间。
2.以HttpSession来唯一区别用户,而不是WebSocketSession。
3.核心思路是当新的WebSocketSession建立时,将其加入一个集合,当该session失效时(close、error)将其从集合中删除,当服务端需要单播或广播消息时,以这个集合为根据。
服务端实现
工程搭建
新建Spring Boot项目,添加必要依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<!-- 热部署工具 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
创建服务端响应对象
(其实在WebSocket中已经没有了请求、响应之分,但习惯上将客户端发送的消息称为请求,服务端发送的消息称为响应)
/**
* 服务端响应
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ChatResponse {
// 返回类型
private String type;
// 来源用户HttpSessionId
private String httpSessionId;
// 来源用户host
private String host;
// 来源用户昵称
private String username;
// 有效信息
private Object payload;
public ChatResponse() {
}
public ChatResponse(String httpSessionId, String host, String username) {
this.httpSessionId = httpSessionId;
this.host = host;
this.username = username;
}
// getter、setter...
}
响应对象枚举
/**
* 服务端响应类型枚举
*/
public enum ResponseTypeEnum {
ONLINE("online", "上线提示"),
OFFLINE("offline", "下线提示"),
AUTHENTICATE("authenticate", "认证信息"),
LIST("list", "用户列表"),
ERROR("error", "连接异常"),
CHAT("chat", "聊天文本"),
TIME("time", "服务器时间");
// 响应关键字
private String key;
// 类型说明
private String info;
ResponseTypeEnum(String key, String info) {
this.key = key;
this.info = info;
}
public String getKey() {
return key;
}
public String getInfo() {
return info;
}
}
从chrome的WS控制台,我们可以看到发送的信息
WebSocketHandler实现
/**
* WebSocket处理器
* 用于处理WebSocketSession的生命周期、单播消息、广播消息
*/
@Service
@EnableScheduling
public class ChatHandler implements WebSocketHandler {
// 用于存放所有连接的WebSocketSession
private static CopyOnWriteArraySet<WebSocketSession> webSocketSessions = new CopyOnWriteArraySet<>();
// 用户存放所有在线用户信息
private static CopyOnWriteArraySet<Map<String, Object>> sessionAttributes = new CopyOnWriteArraySet<>();
private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm");
private static final Logger log = LoggerFactory.getLogger(ChatHandler.class);
@Autowired
private ObjectMapper objectMapper;
/**
* 成功连接WebSocket后执行
*
* @param session session
* @throws Exception Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 成功连接后将该连接加入集合
webSocketSessions.add(session);
sessionAttributes.add(session.getAttributes());
log.info("session {} open, attributes: {}.", session.getId(), session.getAttributes());
// 单播消息返回给该用户认证信息,httpSessionId是用户认证唯一标准
this.unicast(session, ResponseTypeEnum.AUTHENTICATE.getKey());
// 广播通知该用户上线
this.broadcast(session, ResponseTypeEnum.ONLINE.getKey());
// 广播刷新在线列表
this.broadcast(ResponseTypeEnum.LIST.getKey(), sessionAttributes);
}
/**
* 处理收到的WebSocketMessage,根据需求只处理TextMessage
* (参照org.springframework.web.socket.handler.AbstractWebSocketHandler)
*
* @param session session
* @param message message
* @throws Exception Exception
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (message instanceof TextMessage) {
// 广播聊天信息
this.broadcast(session, ResponseTypeEnum.CHAT.getKey(), ((TextMessage) message).getPayload());
} else if (message instanceof BinaryMessage) {
// 对BinaryMessage不作处理
} else if (message instanceof PongMessage) {
// 对PongMessage不作处理
} else {
throw new IllegalStateException("Unexpected WebSocket message type: " + message);
}
}
/**
* 处理WebSocketMessage transport error
*
* @param session session
* @param exception exception
* @throws Exception Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
// 对于异常连接,关闭并从webSocket移除Sessions中
if (session.isOpen()) {
session.close();
}
webSocketSessions.remove(session);
sessionAttributes.remove(session.getAttributes());
log.error("session {} error, errorMessage: {}.", session.getId(), exception.getMessage());
// 广播异常掉线信息
this.broadcast(session, ResponseTypeEnum.ERROR.getKey());
// 广播刷新在线列表
this.broadcast(ResponseTypeEnum.LIST.getKey(), sessionAttributes);
}
/**
* 在两端WebSocket connection都关闭或transport error发生后执行
*
* @param session session
* @param closeStatus closeStatus
* @throws Exception Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
boolean removeNow = webSocketSessions.remove(session);
sessionAttributes.remove(session.getAttributes());
log.info("session {} close, closeStatus: {}.", session.getId(), closeStatus);
if (removeNow) {
// 广播下线信息
this.broadcast(session, ResponseTypeEnum.OFFLINE.getKey());
}
// 广播刷新在线列表
this.broadcast(ResponseTypeEnum.LIST.getKey(), sessionAttributes);
}
/**
* Whether the WebSocketHandler handles partial messages. If this flag is set to
* {@code true} and the underlying WebSocket server supports partial messages,
* then a large WebSocket message, or one of an unknown size may be split and
* maybe received over multiple calls to
* {@link #handleMessage(WebSocketSession, WebSocketMessage)}. The flag
* {@link WebSocketMessage#isLast()} indicates if
* the message is partial and whether it is the last part.
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 封装response并转为json字符串
*
* @param session session
* @param type type
* @param payload payload
* @return json response
* @throws Exception Exception
*/
private String getResponse(WebSocketSession session, String type, Object payload) throws Exception {
ChatResponse chatResponse;
if (null == session) {
chatResponse = new ChatResponse();
} else {
Map<String, Object> attributes = session.getAttributes();
String httpSessionId = (String) attributes.get("httpSessionId");
String host = (String) attributes.get("host");
String username = (String) attributes.get("username");
chatResponse = new ChatResponse(httpSessionId, host, username);
}
chatResponse.setType(type);
chatResponse.setPayload(payload);
// 转为json字符串
return objectMapper.writeValueAsString(chatResponse);
}
/**
* 向单个WebSocketSession单播消息
*
* @param session session
* @param type type
* @param payload payload
* @throws Exception Exception
*/
private void unicast(WebSocketSession session, String type, Object payload) throws Exception {
String response = this.getResponse(session, type, payload);
session.sendMessage(new TextMessage(response));
}
/**
* 单播系统消息
*
* @param session session
* @param type type
* @throws Exception Exception
*/
private void unicast(WebSocketSession session, String type) throws Exception {
this.unicast(session, type, null);
}
/**
* 因某个WebSocketSession变动,向所有连接的WebSocketSession广播消息
*
* @param session 变动的WebSocketSession
* @param type com.njfu.chat.enums.ResponseTypeEnum 消息类型
* @param payload 消息内容
* @throws Exception Exception
*/
private void broadcast(WebSocketSession session, String type, Object payload) throws Exception {
String response = this.getResponse(session, type, payload);
// 广播消息
for (WebSocketSession webSocketSession : webSocketSessions) {
webSocketSession.sendMessage(new TextMessage(response));
}
}
/**
* 用于多播系统消息
*
* @param session session
* @param type type
* @throws Exception Exception
*/
private void broadcast(WebSocketSession session, String type) throws Exception {
this.broadcast(session, type, null);
}
/**
* 用于无差别广播消息
*
* @param type type
* @param payload payload
* @throws Exception Exception
*/
private void broadcast(String type, Object payload) throws Exception {
this.broadcast(null, type, payload);
}
/**
* 定时任务,每5分钟发送一次服务器时间
* @throws Exception Exception
*/
@Scheduled(cron = "0 0-59/5 * * * ?")
private void sendServerTime() throws Exception {
this.broadcast(ResponseTypeEnum.TIME.getKey(), simpleDateFormat.format(new Date()));
}
}
HandshakeInterceptor实现
/**
* WebSocketHandshake拦截器
*/
@Service
public class ChatHandshakeInterceptor implements HandshakeInterceptor {
private static final Logger log = LoggerFactory.getLogger(ChatHandshakeInterceptor.class);
/**
* 握手前
* 为连接的WebsocketSession配置属性
*
* @param request the current request
* @param response the current response
* @param wsHandler the target WebSocket handler
* @param attributes attributes from the HTTP handshake to associate with the WebSocket
* session; the provided attributes are copied, the original map is not used.
* @return whether to proceed with the handshake ({@code true}) or abort ({@code false}) 通过true/false决定是否连接
*
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
// 获取HttpSession
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpSession session = servletRequest.getServletRequest().getSession();
// 在握手前验证是否存在用户信息,不存在时拒绝连接
String username = (String) session.getAttribute("username");
if (null == username) {
log.error("Invalid User!");
return false;
} else {
// 将用户信息放入WebSocketSession中
attributes.put("username", username);
// httpSessionId用于唯一确定连接客户端的身份
attributes.put("httpSessionId", session.getId());
attributes.put("host", request.getRemoteAddress().getHostString());
return true;
}
}
/**
* 握手后
*
* @param request the current request
* @param response the current response
* @param wsHandler the target WebSocket handler
* @param exception an exception raised during the handshake, or {@code null} if none
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
}
}
WebSocketConfigurer实现
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Value("${origin}")
private String origin;
@Autowired
private ChatHandler chatHandler;
@Autowired
private ChatHandshakeInterceptor chatHandshakeInterceptor;
/**
* 注册WebSocket处理器
* 配置处理器、拦截器、允许域、SockJs支持
*
* @param registry registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 设置允许域,当请求的RequestHeaders中的Origin不在允许范围内,禁止连接
String[] allowedOrigins = {origin};
registry.addHandler(chatHandler, "/chatHandler")
.addInterceptors(chatHandshakeInterceptor)
.setAllowedOrigins(allowedOrigins);
// 当浏览器不支持WebSocket,使用SockJs支持
registry.addHandler(chatHandler, "/sockjs-chatHandler")
.addInterceptors(chatHandshakeInterceptor)
.setAllowedOrigins(allowedOrigins)
.withSockJS();
}
}
通过setAllowedOrigins(String... origins);方法可以限制访问,查看WebSocket Request Headers的Origin属性:
这种限制与限制跨域是类似的,不同的是端口号不在其限制范围内。可以通过setAllowedOrigins("*");的方式设置允许所有域。
Controller
@Controller
public class ChatController {
/**
* index页
*
* @return page
*/
@RequestMapping("/")
public String index() {
return "chat";
}
/**
* 验证是否存在用户信息
* 根据HttpSession唯一确定用户身份
*
* @param session session
* @return json
*/
@RequestMapping("/verifyUser")
public @ResponseBody
String verifyUser(HttpSession session) {
return (String) session.getAttribute("username");
}
/**
* 新增用户信息
*
* @param session session
* @param username username
*/
@RequestMapping("/addUser")
public @ResponseBody
void addUser(HttpSession session, String username) {
session.setAttribute("username", username);
}
}
客户端实现
html
<div class="chat-body">
<div class="chat-area" id="area"></div>
<div class="chat-bar">
<div class="chat-bar-head">在线列表</div>
<div class="chat-bar-list"></div>
</div>
<!-- contenteditable and plaintext only -->
<div class="chat-input" contenteditable="plaintext-only"></div>
<div class="chat-control">
<span class="chat-size"></span>
<button class="btn btn-primary btn-sm" id="view-online">在线列表</button>
<button class="btn btn-primary btn-sm" id="send">发送</button>
</div>
</div>
js
// 验证session中是否有用户信息,若有,进行WebSocket连接,若无,新增用户信息
var websocket;
/**
* 建立WebSocket连接
*/
function getConnect() {
var path = window.location.hostname + ":7090/" + window.location.pathname.split("/")[1];
if (window.WebSocket) {
console.log('Support WebSocket.');
websocket = new WebSocket('ws://' + path + '/chatHandler');
} else {
console.log('Not Support WebSocket! It\'s recommended to use chrome!');
bootbox.alert({
title: '提示',
message: '您的浏览器不支持WebSocket,请切换到chrome获取最佳体验!'
});
websocket = new SockJS('http://' + path + '/sockjs-chatHandler')
}
// 配置WebSocket连接生命周期
websocket.onopen = function () {
console.log('WebSocket open!');
};
websocket.onmessage = function (event) {
handleMessage(event);
};
websocket.onerror = function () {
console.log('WebSocket error!');
bootbox.alert({
title: '提示',
message: 'WebSocket连接异常,请刷新页面!',
callback: function () {
window.location.reload();
}
});
};
websocket.onclose = function () {
console.log('WebSocket close!');
bootbox.alert({
title: '提示',
message: 'WebSocket连接断开,请刷新页面!',
callback: function () {
window.location.reload();
}
});
};
window.onbeforeunload = function () {
websocket.close();
};
}
// 本地httpSessionId
var localSessionId;
/**
* 处理收到的服务端响应,根据消息类型调用响应处理方法
*/
function handleMessage(event) {
var response = JSON.parse(event.data);
// 获取消息类型
var type = response.type;
// 获取httpSessionId
/** @namespace response.httpSessionId */
var httpSessionId = response.httpSessionId;
// 获取host
var host = response.host;
// 获取username
var username = response.username;
// 获取payload
/** @namespace response.payload */
var payload = response.payload;
switch (type) {
case 'chat':
handleChatMessage(httpSessionId, username, payload);
break;
case 'online':
console.log('online: ' + username);
handleSystemMessage(username, type);
break;
case 'offline':
console.log('offline: ' + username);
handleSystemMessage(username, type);
break;
case 'error':
console.log('error: ' + username);
handleSystemMessage(username, type);
break;
case 'time':
console.log('time: ' + payload);
handleSystemMessage(null, type, payload);
break;
case 'list':
handleUserList(payload);
break;
case 'authenticate':
console.log('authenticate: ' + httpSessionId);
localSessionId = httpSessionId;
break;
default:
bootbox.alert({
title: '提示',
message: 'Unexpected message type.'
});
handleSystemMessage(null, type);
}
}
/**
* 处理聊天文本信息
* 将本地用户消息与其它用户消息区分
*/
function handleChatMessage(httpSessionId, username, payload) {
// ...
}
/**
* 维护在线列表
* @param payload
*/
function handleUserList(payload) {
// ...
}
/**
* 处理系统消息
* @param username
* @param type
* @param payload
*/
function handleSystemMessage(username, type, payload) {
// ...
}
/**
* 发送消息
*/
// ...
效果展示
苦逼的IE同志说不出话来,只算到IE11可能不支持WebSocket,没想到他其实是不支持contenteditable="plaintext-only"(后来又发现火狐也不支持)。
心跳检测
WebSocket是一个长连接,需要心跳检测机制来判断服务端与客户端之间建立的WebSocket连接是否存在和有效。当服务端断开连接时,客户端会立马断开连接,并调用websocket.close,而当客户端出现中断网络连接的情况,服务端不会立马作出反应(Spring WebSocket不会),而是过一段时间(推测是几分钟)后才将这个断掉的WebSocketSession踢出。