一. 概述
在后端开发中, 会经常与其他系统对接进行数据交换, 而在这过程经常会遇到一个问题就是推送方说已经把数据推送了, 而接收方咬死说我没收到, 这就有点尴尬了, 一般最后都是只能推送方重推解决, 这就会引出重推机制, 本文介绍的就是重推Demo的实现, 仅限学习
二. 重推Demo
本Demo分为2个步骤实现: 1. 记录重推参数 2. 定制重推
2.1 结构
2.2 参数实体类
RePush,java
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RePush implements Serializable {
/**
* ID
*/
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
public Long rePushId;
/**
* 接口名称
*/
public String interfaceName;
/**
* 类名
*/
public String className;
/**
* 方法名称
*/
public String methodName;
/**
* 回调方法
*/
public String callbackObj;
/**
* 参数列表
*/
public String params;
/**
* 推送状态 0:未推送 1:已推送
*/
public Integer status;
/**
* 推送次数
*/
public Integer pushNum;
/**
* 创建时间
*/
public LocalDateTime createTime;
/**
* 更新时间
*/
public LocalDateTime updateTime;
}
2.3 工具类
ObjectUtil.java,用于对象的序列化
/**
* 把对象转成字符串
*/
public static String objectToString(Object obj) {
// 对象转字节数组
AtomicReference<String> str = new AtomicReference<>();
Optional.ofNullable(obj).ifPresent(o -> {
try {
byte[] bytes = writeObj(o);
str.set(DatatypeConverter.printBase64Binary(bytes));
} catch (Exception e) {
e.printStackTrace();
}
});
return str.get();
}
/**
* 解析字符串为对象
*/
public static Object stringToObject(String str) {
AtomicReference<Object> obj = new AtomicReference<>();
Optional.ofNullable(str).ifPresent(s -> {
try {
byte[] bytes = DatatypeConverter.parseBase64Binary(str);
obj.set(readObj(bytes));
} catch (Exception e) {
e.printStackTrace();
}
});
return obj.get();
}
/**
把对象转为字节数组
*/
public static byte[] writeObj(Object obj) throws Exception {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
outputStream.writeObject(obj);
outputStream.close();
return byteArrayOutputStream.toByteArray();
}
/**
把字节数组转为对象
*/
public static Object readObj(byte[] bytes) throws Exception {
ObjectInputStream inputStream = null;
try {
inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
return inputStream.readObject();
} finally {
inputStream.close();
}
}
2.4 service层逻辑实现
2.4.1 接口 IRePushService.java
public interface IRePushService {
/**
* 记录重推参数
* @param interfaceName 接口名称
* @param status 推送状态 0:未推送 1:已推送
* @param callbackMethod 回调方法
* @param className 类名 Class.getName()
* @param methodName 方法名
* @param params 方法参数
*/
void saveReRushLog(String interfaceName,Integer status ,Predicate<Object> callbackMethod,String className, String methodName,Object... params) throws Exception;
/**
* 重推 (后期建立定时任务,定时执行)
* @param rePushList 为重推的数据集合
*/
void rePush() throws Exception;
/**
* 查询要重推的数据
* @return
*/
List<RePush> queryRePushData();
}
2.4.2 实现类 RePushServiceImpl.java
/**
* 重推实现类
*
* @author 王培任
* @date 2021/9/9
*/
@Service
@Slf4j
public class RePushServiceImpl implements IRePushService {
@Resource
private RePushMapper rePushMapper;
/**
* 推送状态(已推送)
*/
public static final Integer STATUS_SUCCESS = 1;
/**
* 推送状态(未推送)
*/
public static final Integer STATUS_FAILURE = 0;
/**
* 分隔符
*/
public static final String DELIMITER = ",";
/**
* 最大重推次数
*/
public static final Integer MAX_RE_PUSH_NUM = 3;
@Override
public List<RePush> queryRePushData() {
Example example = Example.builder(RePush.class).build();
example.createCriteria().andEqualTo("status",STATUS_FAILURE).orLessThanOrEqualTo("pushNum",MAX_RE_PUSH_NUM);
return rePushMapper.selectByExample(example);
}
/**
* 记录重推参数
* @param interfaceName 接口名称
* @param status 推送状态 0:未推送 1:已推送 (默认成功)
* @param callbackMethod 回调方法
* @param className 类名 Class.getName()
* @param methodName 方法名
* @param params 方法参数
*/
@Override
public void saveReRushLog(String interfaceName,Integer status ,Predicate<Object> callbackMethod,String className, String methodName,Object... params) throws Exception{
// 参数校验
Assert.notNull(interfaceName,"接口名称不能为空!");
Assert.notNull(className,"类名不能为空!");
Assert.notNull(methodName,"方法名不能为空!");
status = Optional.ofNullable(status).orElse(STATUS_SUCCESS);
// 校验类名
Class<?> aClass ;
try {
aClass = Class.forName(className);
} catch (Exception e) {
throw new RuntimeException("未找到类名:"+className);
}
// 校验方法名+参数
try {
if(null != params){
List<Class<?>> classList = Arrays.stream(params).map(Object::getClass).collect(Collectors.toList());
int size = classList.size();
Class<?> [] aClassList = new Class[size];
for(int i=0;i<size;i++){
aClassList[i] = classList.get(i);
}
Method method = aClass.getMethod(methodName, aClassList);
Assert.notNull(method,String.format("方法名[%s]+参数[%s]:没有匹配的方法",methodName, JSON.toJSON(params)));
}else {
Method method = aClass.getMethod(methodName);
Assert.notNull(method,String.format("方法名[%s]:没有匹配的方法",methodName));
}
} catch (Exception e) {
throw new RuntimeException(String.format("方法名[]+参数[]:没有匹配的方法",methodName,JSON.toJSON(params)));
}
// 记录回调方法
String callbackObj = null != callbackMethod ? ObjectUtil.objectToString(callbackMethod) : null;
// 参数保存
StringBuilder builder = new StringBuilder();
if(null != params){
int length = params.length;
for(int i = 0;i<length;i++){
builder.append(ObjectUtil.objectToString(params[i]));
if(i != length-1){
builder.append(DELIMITER);
}
}
}
// 记录重推参数
LocalDateTime date = LocalDateTime.now();
RePush rePush = RePush.builder()
.interfaceName(interfaceName)
.className(className)
.methodName(methodName)
.callbackObj(callbackObj)
.params(builder.toString())
.status(status)
.createTime(date)
.updateTime(date)
.build();
// 保存重推的参数
rePushMapper.insert(rePush);
}
/**
* 重推 (后期建立定时任务,定时执行)
*/
@Override
public void rePush() throws Exception {
// 查询要重推的数据
List<RePush> rePushes = queryRePushData();
if(CollectionUtils.isEmpty(rePushes)) return;
for(RePush rePush : rePushes){
if(Objects.equals(STATUS_SUCCESS,rePush.getStatus())) return;
try {
rePushHandler(rePush);
} catch (Exception e) {
log.error("重推记录id[{}]异常:{}=={}",rePush.getRePushId(),e.getMessage(),Arrays.toString(e.getStackTrace()),e);
}
}
}
/**
* 执行重推
* @param rePush
* @throws ClassNotFoundException
* @throws InstantiationException
* @throws IllegalAccessException
* @throws InvocationTargetException
*/
public void rePushHandler(RePush rePush) throws Exception {
// 待重推的类
Class<?> aClass = Class.forName(rePush.getClassName());
// 重推方法
Method method = Arrays.stream(aClass.getDeclaredMethods())
.filter(methodTemp -> Objects.equals(methodTemp.getName(), rePush.getMethodName()))
.findAny()
.orElseThrow(() -> new RuntimeException("获取方法失败"));
method.setAccessible(true);
// 创建实例
Object instance = aClass.newInstance();
// 方法执行结果
Object result;
// 重推参数
String params = rePush.getParams();
if(StringUtils.isEmpty(params)){
result = method.invoke(instance);
}else {
// 解析参数执行方法
String[] objStrs = params.split(DELIMITER);
int length = objStrs.length;
Object[] objParams = new Object[length];
for(int i=0;i<length;i++){
objParams[i] = ObjectUtil.stringToObject(objStrs[i]);
}
result = method.invoke(instance,objParams);
}
// 请求回调方法
String callbackObj = rePush.getCallbackObj();
// 回调方法执行状态
boolean resultStatus = true;
if(StringUtils.isNotEmpty(callbackObj)){
// 回调方法执行
Predicate predicate = (Predicate) ObjectUtil.stringToObject(callbackObj);
resultStatus = predicate.test(result);
}
// 更新重推记录
rePush.setPushNum(rePush.getPushNum()+1); // 重推次数+1
rePush.setUpdateTime(LocalDateTime.now());
if(resultStatus){
rePush.setStatus(STATUS_SUCCESS);
}
rePushMapper.updateByPrimaryKey(rePush);
}
}
2.5 记录重推参数工具类
RePushUtil.java
@Slf4j
public class RePushUtil {
private static IRePushService rePushService;
/**
* 记录重推参数
* @param interfaceName 接口名称 (必传)
* @param status 推送状态 0:未推送 1:已推送 (默认成功)
* @param callbackMethod 回调方法
* @param className 类名 Class.getName() (必传)
* @param methodName 方法名 (必传)
* @param params 方法参数
*/
public static void saveReRushLog(String interfaceName, Integer status , Predicate<Object> callbackMethod, String className, String methodName, Object... params){
CompletableFuture.runAsync(() -> {
IRePushService iRePushService = getIRePushService();
try {
iRePushService.saveReRushLog(interfaceName,status,callbackMethod,className,methodName,params);
} catch (Exception e) {
log.error("记录重推参数异常:{}--{}",e.getMessage(), Arrays.toString(e.getStackTrace()));
}
});
}
public static IRePushService getIRePushService(){
if(null == rePushService){
// 从Spring容器中获取Bean
rePushService = SpringContextUtil.getBean(IRePushService.class);
}
return rePushService;
}
}
三. 使用Demo步骤
创建重推定时任务->在对外接口方法中记录重推日志
步骤一: 创建重推定时任务
建立定时任务定时执行重推方法IRePushService. rePush
步骤二:在业务中记录对外接口方法接口日志
在业务中调方法RePushUtil. saveReRushLog
记录重推参数