本文主要讲解SpringBoot 如何基于WebSocket 实现主动推送消息给用户
消息推送的业务逻辑为服务端开启WebSocket 服务,客户端通过建立长连接进入等待状态,服务器在合适的时候推送消息给客户端,最后客户端接受消息自行处理。话不多说,上关键代码。
服务端
- Maven 项目在pom.xml 里引入websocket 依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
- Boot 启动类
//开启WebSocket
@EnableWebSocket
@SpringBootApplication
public class Application implements WebSocketConfigurer {
public static void main(String[] args) {
new SpringApplicationBuilder(Application.class).bannerMode(Banner.Mode.OFF).run(args);
}
//用请求的方式模拟推送消息的时候
@GetMapping("notice")
public String notice(String count) {
counterHandler.sendMessageToUser(count, "当前时间是:" + new Date());
return "已发送";
}
/**
* 注册WebSocket处理类
*
* @param webSocketHandlerRegistry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
//支持websocket 的 connection,指定counterHandler处理路径为/counter 的长连接请求
webSocketHandlerRegistry.addHandler(counterHandler(), "/counter")
//添加WebSocket握手请求的拦截器.
.addInterceptors(new CounterHandler.CountHandshakeInterceptor());
//不支持websocket的connenction,采用sockjs
webSocketHandlerRegistry.addHandler(counterHandler(), "/sockjs/counter")
//添加WebSocket握手请求的拦截器.
.addInterceptors(new CounterHandler.CountHandshakeInterceptor()).withSockJS();
}
@Bean
public CounterHandler counterHandler() {
return new CounterHandler();
}
}
- Socket处理类
public class CounterHandler extends TextWebSocketHandler {
public static final String COLLECTOR = "collector";
private static final List<WebSocketSession> COUNTS = new ArrayList<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
System.out.println("Connection established");
COUNTS.add(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message)
throws Exception {
System.out.println("Received:" + message.getPayload());
}
/**
* 给某个用户发送消息
*
* @param count
* @param message
*/
public void sendMessageToUser(String count, String message) {
//遍历记录的session,取出符合条件的session发送消息
for (WebSocketSession socketSession : COUNTS) {
if (socketSession.getAttributes().get(COLLECTOR).equals(count)) {
try {
if (socketSession.isOpen()) {
//最关键的一句,给客户端推送消息
socketSession.sendMessage(new TextMessage(message));
}
} catch (IOException e) {
e.printStackTrace();
}
COUNTS.remove(socketSession);
break;
}
}
}
/**
* 检查握手请求和响应, 对WebSocketHandler传递属性
*/
public static class CountHandshakeInterceptor implements HandshakeInterceptor {
/**
* 在握手之前执行该方法, 继续握手返回true, 中断握手返回false.
* 通过attributes参数设置WebSocketSession的属性
*
* @param request
* @param response
* @param wsHandler
* @param attributes
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String collector = ((ServletServerHttpRequest) request).getServletRequest().getParameter(COLLECTOR);
if (Strings.isNullOrEmpty(collector)) {
return false;
} else {
attributes.put(COLLECTOR, collector);
return true;
}
}
/**
* 在握手之后执行该方法. 无论是否握手成功都指明了响应状态码和相应头.
*
* @param request
* @param response
* @param wsHandler
* @param exception
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
}
}
}
客户端
public class WebSocketTest {
//服务器WebSocket 连接地址
private static final String WS_URI = "ws://localhost:8080/counter?collector=1";
public static void main(String[] args) throws IOException, InterruptedException {
StandardWebSocketClient client = new StandardWebSocketClient();
WebSocketConnectionManager manager = new WebSocketConnectionManager(client, new MyHandler(), WS_URI);
manager.start();
Thread.sleep(100000);
}
private static class MyHandler extends TextWebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println("connected...........");
session.sendMessage(new TextMessage("hello, web socket"));
super.afterConnectionEstablished(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message)
throws Exception {
System.out.println("receive: " + message.getPayload());
super.handleTextMessage(session, message);
}
}
}
至此便完成了主要代码逻辑。先启动服务端,然后运行客户端建立WebSocket连接,接着在浏览器地址栏输入localhost:8080/notice?count=1
,服务器便会找到对应的socketSession
对其进行推送消息。