因为网关是请求的出入口,防止各调用方及服务方相互之间扯皮,响应报文也需要打出来,而这里有一个问题,如果响应过大的话,Flux会进行截断,这样有2个问题,一个是每次处理都会打一次部分报文,不过这个可以通过doOnComplete()来解决,第二就是达到一定长度出现乱码,后来查看api,有一个合并的方法,问题解决,代码如下
/**
* 处理响应的 的filter
* @author huangting
*/
@Component
public class ResponseHandlerFilter implements GlobalFilter, Ordered {
private static final Logger logger = LoggerFactory.getLogger(ResponseHandlerFilter.class);
private static final String START_TIME = "startTime";
@Autowired
private MetricService metricService;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String ip = IpUtil.getRemoteHost(request);
//执行完成后 进行调用耗时埋点
exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
//原始响应类
ServerHttpResponse originalResponse = exchange.getResponse();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
//重新包装的响应类
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffer -> {
//如果响应过大,会进行截断,出现乱码,然后看api DefaultDataBufferFactory有个join方法可以合并所有的流,乱码的问题解决
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffer);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
//释放掉内存
DataBufferUtils.release(join);
//打印响应日志
logResponse(exchange, new String(content, StandardCharsets.UTF_8));
return bufferFactory.wrap(content);
}));
}
};
return chain.filter(exchange.mutate().response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
Long startTime = exchange.getAttribute(START_TIME);
if (startTime != null) {
Long executeTime = (System.currentTimeMillis() - startTime);
//influxDB埋点
metricService.pointRequestLatency(ip, request.getURI().getPath(), executeTime);
}
}));
}
/**
* 打印响应报文
*
* @param exchange
*/
public void logResponse(ServerWebExchange exchange, String response) {
ServerHttpRequest request = exchange.getRequest();
logger.info("响应报文 URL:{},Method:{},headers:{},response:{}", request.getURI().getPath(), request.getMethod(), exchange.getResponse().getHeaders(), response);
}
@Override
public int getOrder() {
// -1 is response write filter, must be called before that
return -3;
}
}
最近发现一个问题,当接口的调用返回值为空的时候,并不会进入 writeWith 里边的map方法,所以当Flux 进行订阅时,map里的 logResponse 没有执行,所以先定义一个默认的 AtomicReference<String> responseBody 变量,当返回值不为空时,对它进行更新;然后把 logResponse 方法下移,放到 return语句中,这样就能保证打印方法总会被执行,修改后的 filter 方法代码如下
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String ip = IpUtil.getRemoteHost(request);
//执行完成后 进行调用耗时埋点
exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
//原始响应类
ServerHttpResponse originalResponse = exchange.getResponse();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
//初始化一个 默认的 responseBody
AtomicReference<String> responseBody= new AtomicReference<>("no-responseBody");
//重新包装的响应类
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffer -> {
//如果响应过大,会进行截断,出现乱码,然后看api DefaultDataBufferFactory有个join方法可以合并所有的流,乱码的问题解决
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffer);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
//释放掉内存
DataBufferUtils.release(join);
//如果有返回值,将 responseBody 覆盖
responseBody.set(new String(content, StandardCharsets.UTF_8));
return bufferFactory.wrap(content);
}));
}
};
return chain.filter(exchange.mutate().response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
//打印响应日志
logResponse(exchange, responseBody.get());
Long startTime = exchange.getAttribute(START_TIME);
if (startTime != null) {
Long executeTime = (System.currentTimeMillis() - startTime);
//influxDB埋点
metricService.pointRequestLatency(ip, request.getURI().getPath(), executeTime);
}
}));
}