背景
近期项目中需要运用到及时消息通知,就采用了websocket+Stomp方式来实现,项目框架是微服务架构,需要考虑网关(SpringCloud Gateway)的转发,多服务节点的消息通知。本文章主要讲解websocket在微服务架构下的运用。
文章目录
接下来从下面5个方面进行讲解。
1.环境
2.后端接入websocket
3.客服端接入注意点
4.网关如何进行转发
5.采用Redis的订阅发布
1. 环境
SpringCloud 相关
SpringCloud Gateway
SpringBoot
Redis
2. 后端接入websocket
后端主要采用Websocket+Stomp的方式。
2.1 导入依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.2.10.RELEASE</version>
</dependency>
2.2 新增websocket配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* 功能描述:
* websocket配置类
* @Author: nickel
* @Date: 2021/4/2 14:30
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket") //开启websocket端点
.setAllowedOrigins("*") //允许跨域访问
.withSockJS(); //设置sockJs
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry)
{
//表明在topic、queue、users这三个域上可以向客户端发消息。
registry.enableSimpleBroker("/topic","/queue","/user");
//客户端向服务端发起请求时,需要以/app为前缀。
registry.setApplicationDestinationPrefixes("/app");
//给指定用户发送一对一的消息前缀是/user/。
registry.setUserDestinationPrefix("/user/");
}
/**
* 定义用户入端通道拦截器
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(createUserInterceptor());
}
/**
* 将自定义的客户端渠道拦截器加入IOC容器中
* @return
*/
@Bean
public UserChannelInterceptor createUserInterceptor(){
return new UserChannelInterceptor();
}
}
2.3 自定义渠道拦截器
配置类如上,项目中需要对websocket用户进行校验,校验token的有效性,就需要自定义渠道连接器,需要实现接口类ChannelInterceptor,这里为什么要用渠道拦截,是因为比较好获取参数,进行校验。如下:
/**
* 功能描述:
* websocket用户相关渠道拦截
* @Author: nickel
* @Date: 2021/4/2 14:30
*/
@Slf4j
public class UserChannelInterceptor implements ChannelInterceptor {
@Autowired
private SimpUserRegistry simpUserRegistry;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())){
String username=null;
try {
String token = accessor.getNativeHeader("Authorization").get(0);
//校验token
JwtBean jwtBean = ApplicationContextUtils.getBean(JwtBean.class);
username = jwtBean.getUsername(token);
} catch (Exception e) {
e.printStackTrace();
log.error("token is error");
throw new IllegalStateException("The token is illegal");
}
if(StringUtils.isEmpty(username)){
log.error("token is overtime");
throw new IllegalStateException("The token is illegal");
}
accessor.setUser(new MyPrincipal(username));
log.info("【{}】用户上线了",username);
}else if(StompCommand.DISCONNECT.equals(accessor.getCommand())){
log.info("【{}】用户下线了",accessor.getUser().getName());
}
return message;
}
}
注:这里用SimpUserRegistry 可以获取到当前节点在线人数;
2.4 自定义用户
从上面的拦截器中可以看到用到了自定义用户MyPrincipal,需要实现Principal;如下:
import java.security.Principal;
/**
* 功能描述:
* websocket-自定义用户
* @Author: nickel
* @Date: 2021/4/2 14:37
*/
public class MyPrincipal implements Principal {
private String name;
public MyPrincipal(String name){
this.name=name;
}
@Override
public String getName() {
return name;
}
}
自定义用户方便在发送消息时,通过用户名来找到当前节点的在线用户数据;
2.5 如何运用websocket进行消息发送
上面相关配置都写好后,咱们看看具体发送如何实现,新增WebSocketService类如下:
/**
* 功能描述:
* websocket发送信息入口
* @Author: nickel
* @Date: 2021/4/2 14:37
*/
@Service
@Slf4j
public class WebSocketService {
private static final String HANDLER_NAME = "socketHandler";
@Resource
private MyRedisClient myRedisClient;
@Autowired
private SimpMessageSendingOperations simpMessageSendingOperations;
@Autowired
private SimpUserRegistry simpUserRegistry;
/**
* 服务端推送消息--一对一
* 单体服务
* 客服端 订阅地址为/users/{username}/message
*
* @param username
* @param message
*/
public void pushMessage(String username, String message,String id) {
try {
//根据用户名查询当前节点在线用户
SimpUser simpUser = simpUserRegistry.getUser(username);
if (null == simpUser) {
return;
}
log.info("--服务端指定用户发送消息,to【{}】", simpUser.getName());
String nowTime = GaeaUtils.formatDate(new Date(), GaeaConstant.TIME_PATTERN);
GaeaWsMessage gaeaWsMessage = new GaeaWsMessage(id,message, nowTime);
simpMessageSendingOperations.convertAndSendToUser(username, "/message", JSON.toJSONString(gaeaWsMessage));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 服务器端推送消息--广播
* 客服端 订阅地址为/topic/message
* 单体服务
*/
public void pushMessage(String message,String id) {
try {
String nowTime = GaeaUtils.formatDate(new Date(), GaeaConstant.TIME_PATTERN);
GaeaWsMessage gaeaWsMessage = new GaeaWsMessage(id,message, nowTime);
simpMessageSendingOperations.convertAndSend("/topic/message", JSON.toJSONString(gaeaWsMessage));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 后台发送消息到redis
* 支持微服务
* @param commonMsgDto
*/
public void sendMessage(CommonMsgDto commonMsgDto) {
log.info("【websocket消息】广播消息:" + JSON.toJSONString(commonMsgDto));
Map<String, String> msgMap = new HashMap<>();
msgMap.put("message", commonMsgDto.getMessage());
msgMap.put("id", commonMsgDto.getId());
myRedisClient.sendMessage(HANDLER_NAME, msgMap);
}
/**
* 此为单点消息--发送到redis
*
* @param userMsgDto
*/
public void sendMessage(UserMsgDto userMsgDto) {
Map<String, String> msgMap = new HashMap<>();
msgMap.put("username", userMsgDto.getUsername());
msgMap.put("message", userMsgDto.getMessage());
msgMap.put("id", userMsgDto.getId());
myRedisClient.sendMessage(HANDLER_NAME, msgMap);
}
/**
* 此为单点消息(多人)
* 支持微服务
* @param userMsgDtos
*/
public void sendMessage(List<UserMsgDto> userMsgDtos) {
if(CollectionUtils.isEmpty(userMsgDtos)){
return;
}
for (UserMsgDto userMsgDto : userMsgDtos) {
sendMessage(userMsgDto);
}
}
}
从上面的代码可以看出
项目中任何地方都可以注入SimpMessageSendingOperations ,发送消息很方便;
一对一发送
使用SimpMessageSendingOperations.convertAndSendToUser()方法群发
使用SimpMessageSendingOperations.convertAndSend()方法;支持微服务
调用了MyRedisClient中方法发布消息,后面会有详细说明;
3.客服端如何接入Websocket
客服端需要加入如下js:
<script src="/webjars/sockjs-client/sockjs.min.js"></script>
<script src="/webjars/stomp-websocket/stomp.min.js"></script>
客服端连接重要代码如下,以官网提供的例子为主:
function connect() {
//websocket端点地址,如是网关,写成对应网关地址
var socket = new SockJS('http://ip:端口/websocket');
stompClient = Stomp.over(socket);
stompClient.connect({
//传输token参数 进行用户鉴权
Authorization:$("#token").val()
}, function (frame) {
setConnected(true);
console.log('Connected: ' + frame);
//订阅公共频道
stompClient.subscribe('/topic/message', function (greeting) {
showGreeting(greeting.body);
})
//订阅个人频道
stompClient.subscribe('/user/" + $("#userName").val() +"/message', function (greeting) {
showGreeting(greeting.body);
})
});
}
3.1 连接websocket
服务端启动后,用错误的token请求后,在浏览器按F12可以看到如下的信息:
说明用户鉴权成功;
正常情况,连接成功后,浏览器控制台会有如下信息:
Connected: CONNECTED
user-name:nickel
heart-beat:0,0
version:1.1
>>> SUBSCRIBE
id:sub-0
destination:/topic/message
>>> SUBSCRIBE
id:sub-1
destination:/user/nickel/message
服务端会有如下信息:
04-13 14:20:23.485 |-INFO c.a.t.g.a.c.UserChannelInterceptor:49 - 【nickel】用户上线了
4. 网关如何转发
如果项目接入了网关(SpringCloud Gateway),如何转发websocket请求?
4.1 服务端改动点
- 网关新增websocket配置,如下
spring:
cloud:
gateway:
routes:
#表示authservice的正常http请求
- id: authservice
uri: http://127.0.0.1:9091
predicates:
- Path= /auth/**
filters:
- StripPrefix=1
#表示websocket的转发
- id: authservice-websocket
uri: ws://127.0.0.1:9091
predicates:
- Path= /auth/websocket/**
filters:
- StripPrefix=1
注意:这里的path使用了前缀【auth】,为了和authservice保持一致,这样Sockjs发送的get类型的/info请求就会被正常http请求拦截。
- 跨域问题
如果接入网关地址后出现如下问题:
Access to XMLHttpRequest at 'http://ip:9090/auth/websocket/info?t=1618297871740' from origin 'http://localhost:8080' has been blocked by CORS policy: The value of the 'Access-Control-Allow-Origin' header in the response must not be the wildcard '*' when the request's credentials mode is 'include'. The credentials mode of requests initiated by the XMLHttpRequest is controlled by the withCredentials attribute.
在网关层的配置类中,由“*”改为request.getHeaders().getOrigin() ;
@Bean
public WebFilter webFilter() {
return (exchange, chain) -> {
ServerHttpRequest request = exchange.getRequest();
if (CorsUtils.isCorsRequest(request)) {
ServerHttpResponse response = exchange.getResponse();
HttpHeaders headers = response.getHeaders();
//改动点由“*”改为request.getHeaders().getOrigin()
headers.add("Access-Control-Allow-Origin", request.getHeaders().getOrigin());
headers.add("Access-Control-Allow-Methods", ALLOWED_METHODS);
headers.add("Access-Control-Max-Age", MAX_AGE);
headers.add("Access-Control-Allow-Headers", ALLOWED_HEADERS);
headers.add("Access-Control-Expose-Headers", ALLOWED_EXPOSE);
headers.add("Access-Control-Allow-Credentials", "true");
if (request.getMethod() == HttpMethod.OPTIONS) {
response.setStatusCode(HttpStatus.OK);
return Mono.empty();
}
}
return chain.filter(exchange);
};
}
- 解决请求头重复的问题
如果接入网关地址后出现如下问题:
Opening Web Socket...
localhost/:1 Access to XMLHttpRequest at 'http://ip:9090/auth/websocket/info?t=1618297173825' from origin 'http://localhost:8080' has been blocked by CORS policy: The 'Access-Control-Allow-Origin' header contains multiple values 'http://localhost:8080, http://localhost:8080', but only one is allowed.
stomp.min.js:8 Whoops! Lost connection to http://ip:9090/auth/websocket
需要在网关层新增如下配置类,参考文章:
/**
* 功能描述:
* websocket
* 防止请求头中有重复的
* @Author: nickel
* @Date: 2021/4/1 11:14
*/
public class CorsResponseHeaderFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
// 指定此过滤器位于NettyWriteResponseFilter之后
// 即待处理完响应体后接着处理响应头
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).then(Mono.defer(() -> {
exchange.getResponse().getHeaders().entrySet().stream()
.filter(kv -> (kv.getValue() != null && kv.getValue().size() > 1))
.filter(kv -> (kv.getKey().equals(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN)
|| kv.getKey().equals(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS)))
.forEach(kv -> {
kv.setValue(new ArrayList<String>() {{
add(kv.getValue().get(0));
}});
});
return chain.filter(exchange);
}));
}
}
并在网关配置类中纳入Spring容器进行管理:
@Configuration
public class MyGatewayConfig {
@Bean
public CorsResponseHeaderFilter getCorsResponseHeaderFilter(){
return new CorsResponseHeaderFilter();
}
}
4.2 客服端改动点
只需要把地址变为网关地址即可:
如下:
//var socket = new SockJS('http://服务ip:9091/websocket');
//写成对应网关地址
var socket = new SockJS('http://网关ip:9090/auth/websocket');
stompClient = Stomp.over(socket);
注:网关转发地址有配置前缀auth
4.3 nginx 配置更改
如果项目用了nginx代理,则需要更改对应的配置,支持转发websocket请求;
主要在location配置端增加以下配置项:
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
5. Redis的订阅与发布
上面网关转发的功能已完成,还存在一个问题就是微服务架构下,多个服务节点,如何给在线用户发送及时消息;这里我们主要用的redis的订阅与发布,大家也可以使用Mq消息队列处理;
- Redis配置类修改
这里直接看配置类代码,为了实现订阅与发布,需要新增配置RedisMessageListenerContainer 和MessageListenerAdapter:
/**
* <pre>
* RedisCacheConfig
* </pre>
*
* @author nickel
* @version RedisCacheConfig.java
*/
@Configuration
@ConditionalOnClass({RedisOperations.class})
public class RedisCacheConfig {
@Bean
public RedisSerializer fastJson2JsonRedisSerializer() {
return new FastJson2JsonRedisSerializer<>(Object.class);
}
@Bean
public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory, RedisSerializer fastJson2JsonRedisSerializer) {
RedisTemplate<String, Serializable> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(fastJson2JsonRedisSerializer);
template.setHashValueSerializer(fastJson2JsonRedisSerializer);
return template;
}
/**
* redis 监听配置
* 配置一个REDIS_TOPIC_NAME channel
* @param redisConnectionFactory redis 配置
* @return
*/
@Bean
public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter commonListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(commonListenerAdapter, new ChannelTopic(MagicValueConstants.REDIS_TOPIC_NAME));
return container;
}
/**
* 配置redis监听类MyRedisReceiver
* @param redisReceiver
* @return
*/
@Bean
MessageListenerAdapter commonListenerAdapter(MyRedisReceiver redisReceiver) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisReceiver, "onMessage");
messageListenerAdapter.setSerializer(fastJson2JsonRedisSerializer());
return messageListenerAdapter;
}
}
- MyRedisReceiver 消息监听者创建
/**
* 功能描述:
* redis 订阅者
*
* @Author: nickel
* @Date: 2021/4/2 16:20
*/
@Configuration
@Slf4j
public class MyRedisReceiver {
/**
* 接受消息并调用业务逻辑处理器
*
* @param map
*/
public void onMessage(Map<String, String> map) {
String handlerName = map.get(MagicValueConstants.HANDLER_NAME);
MyRedisListener myRedisListener = null;
try {
if (StringUtils.isNotBlank(handlerName)) {
myRedisListener = ApplicationContextUtils.getBean(handlerName, MyRedisListener .class);
} else {
myRedisListener = ApplicationContextUtils.getBean(MyRedisListener .class);
}
if (null != myRedisListener ) {
myRedisListener .onMessage(map);
}
}catch (Exception e){
log.info("redis listener getbean not foud");
}
}
}
- 创建redis消息消费者
/**
* 功能描述:
* 定义redis监听接口
* @Author: nickel
* @Date: 2021/4/2 16:31
*/
public interface MyRedisListener {
void onMessage(Map<String, String> message);
}
/**
* 监听消息实现类(采用redis发布订阅方式发送消息)
*/
@Slf4j
@Component
public class SocketHandler implements MyRedisListener {
@Autowired
private WebSocketService webSocket;
@Override
public void onMessage(Map<String, String> map) {
log.info("【MySocketHandler消息】Redis Listerer:" + map.toString());
String username = map.get("username");
String message = map.get("message");
String id = map.get("id");
if (StringUtils.isNotEmpty(username)) {
webSocket.pushMessage(username, message,id);
} else {
webSocket.pushMessage(message,id);
}
}
}
- 如何发布消息
配置信息和消费者配置完成后,咱们看下redis如何发布消息:
**
* redis客户端
* 发布消息
*/
@Configuration
public class MyRedisClient {
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 发送消息
*
* @param params
*/
public void sendMessage(String handerName, Map<String,String> params) {
params.put(MagicValueConstants.HANDLER_NAME,handerName);
redisTemplate.convertAndSend(MagicValueConstants.REDIS_TOPIC_NAME, params);
}
}
直接通过redisTemplate向我们定义的REDIS_TOPIC_NAME通道放入消息;
在需要发布消息的地方注入MyRedisClient 对象即可,可以参考2.5 WebSocketService 中的代码;
介绍完毕,感谢大家能看到这里!
欢迎大家留言,提出建议,谢谢!