参考链接
STOMP
定义
STOMP 中文为“面向消息的简单文本协议”,STOMP 提供了能够协作的报文格 式,以至于 STOMP 客户端可以与任何 STOMP 消息代理Brokers进行通信,从而为多语言,多平台和 Brokers 集群提供简单且普遍的消息协作。STOMP 协议可 以建立在 WebSocket 之上,也可以建立在其他应用层协议之上。通过 Websocket 建立 STOMP 连接,也就是说在 Websocket 连接的基础上再建立 STOMP 连接。
常见的 STOMP 的服务器/客户端的开源实现
- STOMP 服务器:ActiveMQ、RabbitMQ、StompServer、…
- STOMP 客户端库:stomp.js(javascript)、stomp.py(python)、Gozirra(java)、…
STOMP Over WebSocket
即 WebSocket 结合 Stomp 的实现。WebSocket 协议是基于 TCP 的一种新的网络协议,实现双工通讯,但是 websocket 没有规范payload (除控制信息外的有效载体)格式,可以是文本数据,也可以发送二进制数据,需要我们自己定义。而我们可以使用 stomp 协议去规范传输数据格式标准。
Stomp 帧格式示例
STOMP的客户端和服务器之间的通信是通过“帧”(Frame) 实现的,每个帧由多“行”(Line)组成。
- 第一行包含了命令,然后紧跟键值对形式的Header内容。
- 第二行必须是空行。
- 第三行开始就是Body内容,末尾都以空字符结尾。
MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM
{"ticker":"MMM","price":129.45}^@
STOMP Over WebSocket 优点
运行流程图
Stomp 本身
- channel:即客户端与服务端连接的通道
- /app:由程序配置定义的缀,这种前缀指该消息需要经过一些自定义处理(在Controller中)再发到 Stomp 代理(后续详解)
- /topic:同上,不过这类是不需要经过自定义处理的,直接发到 Stomp 代理(后续详解)
结合其他消息中间件
即相对于上述增加了一步 Stomp 代理与消息中间件之间的交互。
SocketJs 是什么
SockJS 是一个浏览器的 JavaScript 库,它提供了一个类似于网络的对象,SockJS 提供了一个连贯的,跨浏览器的 JavaScriptAPI,它在浏览器和 Web 服务器之间创建了一个低延迟、全双工、跨域 通信通道。SockJS 的一大好处在于提供了浏览器兼容性。即优先使用原生 WebSocket,如果浏览器不支持 WebSocket,会自动降为轮询的方式。
依赖引入
<dependencies>
<!-- web-socket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>compile</scope>
</dependency>
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- security -->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-security</artifactId>-->
<!-- </dependency>-->
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
群发实现
服务端代码
- 1、WebSocketCofig 配置类
/**
* @EnableWebSocketMessageBroker 开启 WebSocket Over Stomp
* @author 17697
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
/**
* 注册Stomp服务端点
* @param registry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// addEndpoint 设置与客户端建立连接的url
registry.addEndpoint("/ws")
// 设置允许跨域
.setAllowedOriginPatterns("*")
// 允许SocketJs使用,是为了防止某些浏览器客户端不支持websocket协议的降级策略
.withSockJS();
}
}
- 2、实体类
/**
* 消息模型类
*/
@Data
public class ChatMessage {
/**
* 消息类型
*/
private MessageType type;
/**
* 消息正文
*/
private String content;
/**
* 消息发送者
*/
private String sender;
/**
* 消息接收者
*/
private String toUser;
public enum MessageType {
CHAT,
JOIN,
LEAVE
}
}
- 3、Controller层代码
@RestController
public class ChatController {
@Autowired
private SimpMessagingTemplate messagingTemplate;
/**
* 客户端发送消息入口,群发消息
* @param chatMessage
* @return
*/
@MessageMapping("/chat/sendMessage")
@SendTo({"/topic/public"})
public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
return chatMessage;
}
/**
* 客户端新增用户消息入口,用于群发显示:新进入xx用户
* @param chatMessage
* @param headerAccessor
* @return
*/
@MessageMapping("/chat/addUser")
@SendTo({"/topic/public"})
public ChatMessage addUser(@Payload ChatMessage chatMessage,
SimpMessageHeaderAccessor headerAccessor) {
// Add username in web socket session
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
return chatMessage;
}
}
注解介绍
-
@MessageMapping:和 @RequestMapping 注解功能类似,不过该注解用于接收 Stomp 客户端向服务端发送的url地址
注意:- @MessageMapping 使用若未搭配 @SendTo 或 @SendToUser 则会默认发送同 @MessageMapping 中 url 的主题消息
- 使用该注解,则方法下尽量不要使用 SimpMessagingTemplate 的转发方法,如果非要使用,请把方法返回值改为 void ,否则会出现同时向主题发送两次消息。
@SendTo:定义方法~返回数据向其定义的 url 发送;
等同于 SimpMessagingTemplate.convertAndSendTo("/message", "新消息")
例: @SendTo({"/topic/public"}) 将消息发送到 /topic/public 主题下@SendToUser:同上,不过是向单一用户发送消息;
等同于 SimpMessagingTemplate.convertAndSendToUser(Key,"/message", "新消息")
SimpMessagingTemplate 主要方法介绍
推荐使用 SimpMessagingTemplate 处理消息,这种相对于上注解更易理解。
- void convertAndSend(D destination, Object payload)
群发消息:arg1: 目的地址,arg2: 消息内容 - void convertAndSendToUser(String user, String destination, Object payload)
单发消息,arg1: 向谁发送,arg2: 目的地址,arg3: 消息内容。
@Override
public void convertAndSendToUser(String user, String destination, Object payload,
@Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor) throws MessagingException {
Assert.notNull(user, "User must not be null");
Assert.isTrue(!user.contains("%2F"), "Invalid sequence \"%2F\" in user name: " + user);
user = StringUtils.replace(user, "/", "%2F");
destination = destination.startsWith("/") ? destination : "/" + destination;
// 实际还是调用 convertAndSend,destinationPrefix 默认值是 "/user/"(可配置修改)
super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
}
- 4、事件监听类编写
@Component
@Slf4j
public class WebSocketEventListener {
@Autowired
private SimpMessageSendingOperations messagingTemplate;
/**
* 连接建立事件
* @param event
*/
@EventListener
public void handleWebSocketConnectListener(SessionConnectEvent event) {
log.info("建立一个新的连接");
}
/**
* 连接断开事件
* @param event
*/
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
String username = (String) headerAccessor.getSessionAttributes().get("username");
if(username != null) {
log.info("用户断开连接 : " + username);
ChatMessage chatMessage = new ChatMessage();
chatMessage.setType(ChatMessage.MessageType.LEAVE);
chatMessage.setSender(username);
messagingTemplate.convertAndSend("/topic/public", chatMessage);
}
}
}
- 5、前端代码
function connect(event) {
username = document.querySelector('#name').value.trim();
if(username) {
usernamePage.classList.add('hidden');
chatPage.classList.remove('hidden');
// 建立服务端 websocket 连接,/ws 是后端服务器配置端点路径
var socket = new SockJS('/ws');
stompClient = Stomp.over(socket);
stompClient.connect({}, onConnected, onError);
}
event.preventDefault();
}
function onConnected() {
// 订阅群发主题
stompClient.subscribe('/topic/public', onMessageReceived);
stompClient.send("/chat/addUser",
{},
JSON.stringify({sender: username, type: 'JOIN'})
)
connectingElement.classList.add('hidden');
}
function onError(error) {
connectingElement.textContent = 'Could not connect to WebSocket server. Please refresh this page to try again!';
connectingElement.style.color = 'red';
}
function sendMessage(event) {
var messageContent = messageInput.value.trim();
if(messageContent && stompClient) {
var chatMessage = {
sender: username,
content: messageInput.value,
type: 'CHAT'
};
// arg1: 消息发送url,arg2: 消息头信息(例:加入用户认证头信息),arg3: 消息体
stompClient.send("/chat/sendMessage", {}, JSON.stringify(chatMessage));
messageInput.value = '';
}
event.preventDefault();
}
单发消息(一对一)
服务端代码
其他不变,在 Controller 层新增一对一方法;
使用 @RequestMapping 的方式通常用于其他服务端发送消息的入口
这里由于本人不会前端,所以采用下述通过 Postman 发送消息的形式(菜狗),会前端的大佬可以按之前的群发自行改造
/**
* 一对一消息发送
* @param chatMessage
*/
@PostMapping("/chat/single")
public void sendSingleMessage(@RequestBody ChatMessage chatMessage) {
messagingTemplate.convertAndSendToUser(chatMessage.getToUser(),"/single",chatMessage);
}
前端代码
在连接方法中新增订阅个人主题
function onConnected() {
// 订阅群发主题
stompClient.subscribe('/topic/public', onMessageReceived);
// 新增订阅一对一主题,即通过用户名等唯一性标识拼接到订阅主题地址
stompClient.subscribe('/user/'+username+'/single', onMessageReceived);
stompClient.send("/chat/addUser",
{},
JSON.stringify({sender: username, type: 'JOIN'})
)
connectingElement.classList.add('hidden');
}
拦截器配置
- 1、编写自定义拦截器实现 ChannelInterceptor
/**
* Socket拦截器
* @author 17697
*/
@Component
public class SocketChannelInterceptor implements ChannelInterceptor {
@Autowired
private StringRedisTemplate redisTemplate;
private final static String SOCKET_TOKEN_PREFIX = "webSocket:";
private final static String SOCKET_AUTH = "socket_auth:";
/**
* 发送消息到通道前
* @param message
* @param channel
* @return
*/
@SneakyThrows
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
// 获取连接头信息
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
// 连接验证token合法性(简单模拟)
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// 获取头中的token
String token = accessor.getFirstNativeHeader("token");
if (StringUtils.hasText(token)) {
String redisToken = redisTemplate.opsForValue().get(SOCKET_TOKEN_PREFIX);
if (token.equals(redisToken)) {
/* 这里可以结合 Security
UsernamePasswordAuthenticationToken authentication = new UsernamePasswordAuthenticationToken(userDetails, null, userDetails.getAuthorities());
SecurityContextHolder.getContext().setAuthentication(authentication);
accessor.setUser(authentication);
*/
// 简单处理设置对应权限。完整的应该根据用户的权限得出是否有发送/订阅到某个目的路径的权限
accessor.setUser(new UserPrincipal() {
@Override
public String getName() {
// 模拟权限类,仅有属性可发送/订阅
Permission permission = new Permission();
permission.setIsSend(true);
permission.setIsSubscribe(true);
String s = JSON.toJSONString(permission);
return s;
}
});
} else {
throw new IllegalAccessException("未授权!!!");
}
} else {
throw new IllegalAccessException("未授权!!!");
}
// 订阅权限认证
} else if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
UserPrincipal user = ((UserPrincipal) accessor.getUser());
String value = user.getName();
if (StringUtils.hasText(value)) {
JSONObject jsonObject = JSONObject.parseObject(value);
Boolean flag = ((Boolean) jsonObject.get("isSubscribe"));
if (!flag) {
throw new IllegalAccessException("无权限订阅!!!");
}
} else {
throw new IllegalAccessException("无权限订阅!!!");
}
// 发送权限验证
} else if (StompCommand.SEND.equals(accessor.getCommand())) {
UserPrincipal user = ((UserPrincipal) accessor.getUser());
String value = user.getName();
if (StringUtils.hasText(value)) {
JSONObject jsonObject = JSONObject.parseObject(value);
Boolean flag = ((Boolean) jsonObject.get("isSend"));
if (!flag) {
throw new IllegalAccessException("无权限发送!!!");
}
} else {
throw new IllegalAccessException("无权限发送!!!");
}
}
return message;
}
/**
* 发送消息到通道后
* @param message
* @param channel
* @return
*/
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
ChannelInterceptor.super.postSend(message, channel, sent);
}
/**
* 发送完成后
* @param message
* @param channel
* @return
*/
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
ChannelInterceptor.super.afterSendCompletion(message, channel, sent, ex);
}
@Override
public boolean preReceive(MessageChannel channel) {
return ChannelInterceptor.super.preReceive(channel);
}
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
return ChannelInterceptor.super.postReceive(message, channel);
}
@Override
public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
ChannelInterceptor.super.afterReceiveCompletion(message, channel, ex);
}
}
为什么保存认证信息使用 setUser 方法?
该方法表示会话的拥有者,即存储该会话拥有者信息。
每次建立连接都会创建一个 WebSocketSession 会话信息类,在该会话进行消息传递每次都会把 SessionId ,SessionAttributes 和 Principal(即我们setUser()保存的信息) 赋值到 Message 中,而 Principal 就是专门存储身份认证信息的。
- SessionId: 初始随机分配的,用于确定唯一的会话
- SessionAttributes: 用于给 WebSocketSession 设置一些额外记录属性,结构是 Map
- Principal: 用于设置 WebSocketSession 的身份认证信息
额外配置项
根据需求添加
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private SocketChannelInterceptor socketChanelInterceptor;
/**
* 注册Stomp服务端点
* @param registry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// addEndpoint 设置与客户端建立连接的url
registry.addEndpoint("/ws")
// 设置允许跨域
.setAllowedOriginPatterns("*")
// 允许SocketJs使用,是为了防止某些浏览器客户端不支持websocket协议的降级策略
.withSockJS();
}
/**
* 自定义拦截器配置
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(socketChanelInterceptor);
}
/**
* 配置消息代理的路由规则
* @param registry
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 定义服务端应用目标前缀;客户端只有以这个前缀才能进入服务端方法 @MessageMapping
registry.setApplicationDestinationPrefixes("/app/");
// 定义SimpleBroker处理的消息前缀;只有消息以这个为前缀才会被SimpleBroker处理转发
registry.enableSimpleBroker("/topic/","/user/");
// 设置一对一消息前缀,默认的是"/user/",可通过该方法修改
registry.setUserDestinationPrefix("/user/");
}
}
项目代码
最后附上所有的代码地址:https://github.com/jjhyb/websocket-master