1、前言
对于IM或实时消息推送技术来说,客户端的心跳算法几乎是必备品,尤其当前复杂的移动网络环境下,网络心跳保活算法的优劣更是决定了您的APP即时数据收发的实时性和用户体验,非常地关键。
本文将与大家一起探讨一种更加简单易行和实用的心跳算法,不一定适合所有人,但希望能需要的同行带来一些启发。
2、IM开发干货系列文章
本文是系列文章中的第6篇,总目录如下:
《IM消息送达保证机制实现(一):保证在线实时消息的可靠投递》
《IM消息送达保证机制实现(二):保证离线消息的可靠投递》
《如何保证IM实时消息的“时序性”与“一致性”?》
《IM单聊和群聊中的在线状态同步应该用“推”还是“拉”?》
《IM群聊消息如此复杂,如何保证不丢不重?》
《一种Android端IM智能心跳算法的设计与实现探讨(含样例代码)》(本文)
《移动端IM登录时拉取数据如何作到省流量?》
《通俗易懂:基于集群的移动端IM接入层负载均衡方案分享》
《浅谈移动端IM的多点登陆和消息漫游原理》
本系列由公号“编码前线”整理。
3、为什么TCP连接需要心跳?
因为运营商有一个NAT超时:因为IP v4的IP量有限,运营商分配给手机终端的IP是运营商内网的IP,手机要连接Internet,就需要通过运营商的网关做一个网络地址转换(Network Address Translation,NAT)。简单的说运营商的网关需要维护一个外网IP、端口到内网IP、端口的对应关系,以确保内网的手机可以跟Internet的服务器通讯,大部分移动无线网络运营商都在链路一段时间没有数据通讯时,会淘汰NAT表中的对应项,造成链路中断。
所以我们需要间隔一定的时间发送一个数据包来保证当前的TCP连接保持有效,这就是所谓的心跳包。更多详情请参见:《为什么说基于TCP的移动端IM仍然需要心跳保活?》。
4、什么是智能心跳?
智能心跳实际上就是动态的探测到最大的NAT超时时间,然后选定合适的心跳间隔区间去发送心跳包,同时在网络状况发生变化的时候能够动态的调整心跳间隔时间;如果心跳间隔不合适,例如心跳间隔过短,那么可能导致频繁的唤醒手机发送心跳包,增加耗电,心跳间隔过长,可能导致这条TCP连接已经无效但是无法及时的检测到,只能等待下一个心跳包发送的时候才能感知到,所以会导致消息接收延迟,所以探测到一个合适的心跳间隔是非常重要的,把耗电和消息接收及时性综合折中来取得一个最佳的体验。
微信开发团分享过微信Android版智能心跳算法的设计思路,有兴趣可以看看《移动端IM实践:实现Android版微信的智能心跳机制》、《微信团队原创分享:Android版微信后台保活实战分享(网络保活篇)》。
5、本文要探讨的二分法智能心跳策略
1心跳变量定义
探测心跳:程序采用不确定的时间间隔去发送心跳,目的是为了得到最大NAT超时时间
稳定心跳:当探测心跳探测到了NAT超时时间那么就会选定比这个时间点稍微小一点的时间来作为稳定心跳,以后就一直以这个稳定时间去发送心跳
minHeart:最小的心跳间隔
maxHeart:最大的心跳间隔
curMinHeart:初始值为minHeart,变换过程中的最小心跳
curMaxHeart:初始值为maxHeart,变换过程中的最大心跳
step:心跳探测步长
maxSuccessCount:稳定心跳成功次数的最大值,用来动态向上探测
maxFailedCount:心跳连续失败次数最大值,用来向下探测
curHeart:当前正在使用的心跳间隔,默认270秒,这个值可以根据不同地区的心跳区间大数据采集统计然后再设置
timeout:心跳超时时间,我们当前设置为20秒,这个其实可以调整的更小,5秒~10秒,之所以设置为20秒是考虑到网络很不好的情况下可能心跳返回的比较慢,所以间隔设的大一些
heartbeatStabledSuccessCount:稳定心跳连续成功的次数
heartbeatFailedCount:心跳连续失败的次数
networkTag:网络环境标识,对于数据网络来说分为电信,联通,移动;对于wifi来说是用wifi的名称来区分的,因为每个运营商的网络环境都可能有不同的NAT超时,所以在网络环境变换的时候要重新调整心跳。
2算法流程图
3上调curHeart(心跳成功的时候)
把当前的成功心跳区间保存到列表中
curMinHeart = heartbeat.curHeart;
如果当前心跳是稳定心跳,heartbeatStabledFailedCount = 0;heartbeatStabledSuccessCount++;如果当前心跳不是稳定心跳,curHeart = (curMinHeart + curMaxHeart) / 2,然后直接执行第6步
判断heartbeatStabledSuccessCount是否大于maxSuccessCount,如果大于的话就上调maxSuccessCount的上限,可以乘以2,或者递增固定值,这个可以自己决定,我们是maxSuccessCount默认为20,所以maxSuccessCount = maxSuccessCount + 20;
从成功心跳列表选择比当前稳定心跳更大一级心跳,如果有就把这个作为新的稳定心跳,如果没有:curMaxHeart = maxHeart;curHeart = (curMinHeart + curMaxHeart) / 2;然后再重新以curHeart开始向上探测心跳
判断curMaxHeart - curMinHeart < 10是否满足,如果满足并且当前心跳还不是稳定心跳:curHeart = curMinHeart;把二分法比较小的那个值作为稳定心跳,然后探测结束,进入稳定心跳,这里之所以这么做是因为二分法的一个特点,二分法的一个临界值就是curMaxHeart = curMinHeart,到最后curMinHeart和curMaxHeart很接近的时候其实(curMaxHeart+curMinHeart)== 2curMinHeart ==2curMaxHeart,所以会导致二分法计算出来的curHeart和curMinHeart,curMaxHeart相差就几秒,这是没什么意义的,设置一个10秒的区间来让心跳尽快进入稳定状态
4下调心跳(心跳失败的时候)
heartbeatStabledSuccessCount=0;
curMaxHeart = curHeart;
如果是稳定心跳失败了,heartbeatStabledFailedCount++;并且判断heartbeatStabledFailedCount>maxFailedCount,如果是则从成功心跳列表中选择比当前心跳略小一级的心跳并把这个心跳作为新的稳定心跳,要是不存在略小一级的成功心跳,那么curMinHeart = minHeart;curHeart = (curMinHeart + curMaxHeart) / 2;
如果是探测心跳失败了,curHeart = (curMinHeart + curMaxHeart) / 2;
判断curMaxHeart - curMinHeart < 10,如果满足并且当前心跳不是稳定心跳:curMinHeart = minHeart。
5本算法的样例实现代码(仅供参考)
public abstract class HeartbeatScheduler {
protected int timeout = 20000;
protected int minHeart = 60;
protected int maxHeart = 300;
protected int step = 30;
protected volatile boolean started = false;
protected volatile long heartbeatSuccessTime;
protected volatile int currentHeartType;
public static final String HEART_TYPE_TAG = "heart_type";
public static final int UNKNOWN_HEART = 0, SHORT_HEART = 1, PROBE_HEART = 2, STABLE_HEART = 3, REDUNDANCY_HEART = 4;
protected PendingIntent createPendingIntent(Context context, int requestCode, int heartType) {
Intent intent = new Intent();
intent.setPackage(context.getPackageName());
intent.setAction(SyncAction.HEARTBEAT_REQUEST);
intent.putExtra(HEART_TYPE_TAG, heartType);
PendingIntent pendingIntent = PendingIntent.getBroadcast(context, requestCode, intent, PendingIntent.FLAG_UPDATE_CURRENT);
return pendingIntent;
}
protected void set(int minHeart, int maxHeart, int step) {
this.minHeart = minHeart;
this.maxHeart = maxHeart;
this.step = step;
SyncLogUtil.i("set minMax:" + minHeart + ",maxHeart:" + maxHeart + ",step:" + step);
}
protected boolean isStarted() {
return started;
}
protected abstract boolean isStabled();
protected void setCurrentHeartType(int currentHeartType) {
this.currentHeartType = currentHeartType;
SyncLogUtil.i("set current heart type:" + currentHeartType);
}
protected int getTimeout() {
return timeout;
}
protected void setTimeout(int timeout) {
this.timeout = timeout;
}
protected long getHeartbeatSuccessTime() {
return heartbeatSuccessTime;
}
protected void setHeartbeatSuccessTime(long heartbeatSuccessTime) {
this.heartbeatSuccessTime = heartbeatSuccessTime;
}
protected abstract void start(Context context);
protected abstract void stop(Context context);
protected abstract void clear(Context context);
protected abstract void adjustHeart(Context context, boolean success);
protected abstract void startNextHeartbeat(Context context, int heartType);
protected abstract void resetScheduledHeart(Context context);
protected abstract void receiveHeartbeatFailed(Context context);
protected abstract void receiveHeartbeatSuccess(Context context);
protected abstract int getCurHeart();
}
public class WatchHearbeatScheduler extends HeartbeatScheduler {
private class Heartbeat {
AtomicInteger heartbeatStabledSuccessCount = new AtomicInteger(0); // 心跳连续成功次数
AtomicInteger heartbeatFailedCount = new AtomicInteger(0); // 心跳连续失败次数
int successHeart;
int failedHeart;
int curHeart = 270;
AtomicBoolean stabled = new AtomicBoolean(false);
}
private int curMaxHeart = maxHeart;
private int curMinHeart = minHeart;
private int maxFailedCount = 5;
private int maxSuccessCount = 10;
private volatile String networkTag;
private int requestCode = 700;
private Map<String, Heartbeat> heartbeatMap = new HashMap<>();
private List<Integer> successHeartList = new ArrayList<>();
protected WatchHearbeatScheduler() {
}
@Override
protected void start(Context context) {
started = true;
networkTag = NetUtil.getNetworkTag(context);
alarm(context);
SyncLogUtil.i("start heartbeat,networkTag:" + networkTag);
}
@Override
protected void stop(Context context) {
heartbeatSuccessTime = 0;
started = false;
currentHeartType = UNKNOWN_HEART;
for (Map.Entry<String, Heartbeat> entry : heartbeatMap.entrySet()) {
Heartbeat heartbeat = entry.getValue();
heartbeat.heartbeatStabledSuccessCount.set(0);
}
cancel(context);
SyncLogUtil.d("stop heartbeat...");
}
@Override
protected void setCurrentHeartType(int currentHeartType) {
this.currentHeartType = currentHeartType;
}
@Override
protected void set(int minHeart, int maxHeart, int step) {
super.set(minHeart, maxHeart, step);
curMaxHeart = maxHeart;
curMinHeart = minHeart;
}
@Override
protected boolean isStabled() {
Heartbeat heartbeat = getHeartbeat();
return heartbeat.stabled.get();
}
@TargetApi(Build.VERSION_CODES.KITKAT)
public void alarm(Context context) {
AlarmManager alarmManager = (AlarmManager) context.getSystemService(Context.ALARM_SERVICE);
Heartbeat heartbeat = getHeartbeat();
boolean stabled = heartbeat.stabled.get();
int heart;
if (stabled) {
heart = heartbeat.curHeart - 10;
if (heart < minHeart) {
heart = minHeart;
}
heart = heart * 1000;
} else {
heart = heartbeat.curHeart * 1000;
}
int heartType = stabled ? STABLE_HEART : PROBE_HEART;
PendingIntent pendingIntent = createPendingIntent(context, requestCode, heartType);
int sdk = Build.VERSION.SDK_INT;
if (sdk >= Build.VERSION_CODES.KITKAT) {
alarmManager.setExact(AlarmManager.RTC_WAKEUP, System.currentTimeMillis() + heart, pendingIntent);
} else {
alarmManager.set(AlarmManager.RTC_WAKEUP, System.currentTimeMillis() + heart, pendingIntent);
}
SyncLogUtil.i("start heartbeat,curHeart [" + heartbeat.curHeart + "],heart [" + heart + "],requestCode:" + requestCode + ",stabled:" + stabled);
}
private void cancel(Context context) {
Heartbeat heartbeat = getHeartbeat();
int heartType = heartbeat.stabled.get() ? STABLE_HEART : PROBE_HEART;
AlarmManager alarmManager = (AlarmManager) context.getSystemService(Context.ALARM_SERVICE);
PendingIntent pendingIntent = createPendingIntent(context, requestCode, heartType);
alarmManager.cancel(pendingIntent);
SyncLogUtil.d("cancel heartbeat,requestCode:" + requestCode);
}
@Override
public void startNextHeartbeat(Context context, int heartType) {
alarm(context);
}
@Override
public void resetScheduledHeart(Context context) {
alarm(context);
}
private void addSuccessHeart(Integer successHeart) {
if (!successHeartList.contains(successHeart)) {
if (successHeartList.size() > 10) {
successHeartList.remove(0);
}
successHeartList.add(successHeart);
SyncLogUtil.i("add successHeart:" + successHeart);
}
SyncLogUtil.i("successHeartList:" + successHeartList);
}
private void removeSuccessHeart(Integer successHeart) {
successHeartList.remove(Integer.valueOf(successHeart));
SyncLogUtil.i("successHeartList:" + successHeartList);
}
@Override
protected void adjustHeart(Context context, boolean success) {
if (currentHeartType == REDUNDANCY_HEART) {
SyncLogUtil.d("redundancy heart,do not adjustHeart...");
return;
}
Heartbeat heartbeat = getHeartbeat();
if (success) {
onSuccess(heartbeat);
} else {
onFailed(heartbeat);
}
SyncLogUtil.i("after success is [" + success + "] adjusted,heartbeat.curHeart:" + heartbeat.curHeart + ",networkTag:" + networkTag);
}
private void onSuccess(Heartbeat heartbeat) {
heartbeat.successHeart = heartbeat.curHeart;
curMinHeart = heartbeat.curHeart;
addSuccessHeart(heartbeat.successHeart);
heartbeat.heartbeatFailedCount.set(0);
if (heartbeat.stabled.get()) {
int count = heartbeat.heartbeatStabledSuccessCount.incrementAndGet();
SyncLogUtil.i("heartbeatStabledSuccessCount:" + heartbeat.heartbeatStabledSuccessCount.get());
if (count >= maxSuccessCount) {
maxSuccessCount += 10;
SyncLogUtil.i("maxSuccessCount:" + maxSuccessCount);
Integer successHeart = selectMinSuccessHeart(heartbeat.curHeart);
if (successHeart != null) {
heartbeat.curHeart = successHeart;
} else {
heartbeat.stabled.set(false);
curMaxHeart = maxHeart;
heartbeat.curHeart = (curMinHeart + curMaxHeart) / 2;
SyncLogUtil.i("curHeart = (" + curMinHeart + " + " + curMaxHeart + ") / 2 = " + heartbeat.curHeart);
}
}
} else {
heartbeat.curHeart = (curMinHeart + curMaxHeart) / 2;
SyncLogUtil.i("curHeart = (" + curMinHeart + " + " + curMaxHeart + ") / 2 = " + heartbeat.curHeart);
}
if (heartbeat.curHeart >= maxHeart) {
heartbeat.curHeart = maxHeart;
heartbeat.stabled.set(true);
SyncLogUtil.i("探测达到最大心跳adjust stabled:" + heartbeat.stabled.get());
} else if (curMaxHeart - curMinHeart < 10) {
if (!heartbeat.stabled.get()) {
heartbeat.curHeart = curMinHeart;
}
heartbeat.stabled.set(true);
SyncLogUtil.i("二分法探测尽头adjust stabled:" + heartbeat.stabled.get());
}
SyncLogUtil.i("curHeart:" + heartbeat.curHeart + ",curMinHeart:" + curMinHeart + ",curMaxHeart:" + curMaxHeart);
}
private void onFailed(Heartbeat heartbeat) {
removeSuccessHeart(heartbeat.curHeart);
heartbeat.failedHeart = heartbeat.curHeart;
heartbeat.heartbeatStabledSuccessCount.set(0);
int count = heartbeat.heartbeatFailedCount.incrementAndGet();
SyncLogUtil.i("heartbeatFailedCount:" + count);
if (maxSuccessCount > 10) {
maxSuccessCount -= 10;
}
if (count > maxFailedCount) {
curMaxHeart = heartbeat.curHeart;
}
if (heartbeat.stabled.get()) {
if (count > maxFailedCount) {
Integer successHeart = selectMaxSuccessHeart(heartbeat.curHeart);
if (successHeart != null) {
heartbeat.curHeart = successHeart;
} else {
heartbeat.stabled.set(false);
curMinHeart = minHeart;
heartbeat.curHeart = (curMinHeart + curMaxHeart) / 2;
SyncLogUtil.i("curHeart = (" + curMaxHeart + " + " + curMinHeart + ") / 2 = " + heartbeat.curHeart);
}
} else {
SyncLogUtil.i("continue retry heartbeat.curHeart:" + heartbeat.curHeart + ",stabled:" + heartbeat.stabled.get());
}
} else {
if (count > maxFailedCount) {
heartbeat.curHeart = (curMinHeart + curMaxHeart) / 2;
SyncLogUtil.i("curHeart = (" + curMaxHeart + " + " + curMinHeart + ") / 2 = " + heartbeat.curHeart);
} else {
SyncLogUtil.i("continue retry heartbeat.curHeart:" + heartbeat.curHeart + ",stabled:" + heartbeat.stabled.get());
}
}
if (curMaxHeart - curMinHeart < 10) {
if (!heartbeat.stabled.get()) {
curMinHeart = minHeart;
}
SyncLogUtil.i("二分法探测达到瓶颈" + ",curHeart:" + heartbeat.curHeart);
SyncLogUtil.i("curMinHeart:" + curMinHeart + ",curMaxHeart:" + curMaxHeart);
}
SyncLogUtil.i("curHeart:" + heartbeat.curHeart + ",curMinHeart:" + curMinHeart + ",curMaxHeart:" + curMaxHeart);
}
private Integer selectMaxSuccessHeart(int curHeart) {
Collections.sort(successHeartList, new Comparator<Integer>() {
@Override
public int compare(Integer lhs, Integer rhs) {
return rhs.compareTo(lhs);
}
});
SyncLogUtil.i("successHeartList:" + successHeartList);
for (Integer heart : successHeartList) {
if (curHeart >= heart) {
continue;
} else {
return heart;
}
}
return null;
}
private Integer selectMinSuccessHeart(int curHeart) {
Collections.sort(successHeartList, new Comparator<Integer>() {
@Override
public int compare(Integer lhs, Integer rhs) {
return lhs.compareTo(rhs);
}
});
SyncLogUtil.i("successHeartList:" + successHeartList);
for (Integer heart : successHeartList) {
if (curHeart >= heart) {
continue;
} else {
return heart;
}
}
return null;
}
private Heartbeat getHeartbeat() {
Heartbeat heartbeat = heartbeatMap.get(networkTag);
if (heartbeat == null) {
heartbeat = new Heartbeat();
heartbeatMap.put(networkTag, heartbeat);
}
return heartbeat;
}
@Override
protected void receiveHeartbeatFailed(Context context) {
adjustHeart(context, false);
}
@Override
protected void receiveHeartbeatSuccess(Context context) {
adjustHeart(context, true);
alarm(context);
}
@Override
protected void clear(Context context) {
stop(context);
heartbeatMap.clear();
successHeartList.clear();
curMinHeart = minHeart;
curMaxHeart = maxHeart;
networkTag = null;
SyncLogUtil.d("clear heartbeat...");
}
@Override
protected int getCurHeart() {
Heartbeat heartbeat = getHeartbeat();
return heartbeat.curHeart;
}
}
6、本算法的最终探测效果
以270秒作为curHeart开始探测,minHeart为60秒,maxHeart为300秒,在我们公司的wifi或者数据网络环境下:270,285,292就能够达到稳定心跳,最终稳定心跳会比292小10秒。也就是282秒作为稳定心跳,这里面大概在14分钟之内alarm了三次,如果把maxHeart上调的话探测到稳定心跳的时间会变长,不过平均alarm次数会降低,因为心跳周期在不断变长。
当达到稳定心跳后,在稳定心跳成功发送20次后会再次尝试上调心跳,如果由于网络环境不稳定导致当前的心跳可能失败次数超过了5次,那么就会下调心跳,总之做到一个原则,严格控制下调条件,能不下调就尽量不下调。
7、和微信智能心跳算法的对比
【1】更加省电:微信智能心跳是按照从最小还是逐渐递增的去探测的,所以在网络环境不好的条件下前期可能一直探测不上来,心跳周期一直维持在一个较小的范围,导致频繁的alarm,耗电,微信智能心跳探测过程:60秒短心跳,连续发3次后开始探测,90,120,150,180,210,240,270,这个过程中一共耗费24分钟,alarm了10次,在前14分钟之内alarm了8次,而二分法智能心跳前14分钟才唤醒3次。
【2】网络环境差的情况下不会频繁的唤醒:当网络环境很不好的情况下,心跳可能会经常失败,微信智能心跳由于是从下往上上调心跳,可能一直维持在一个间隔周期较小的心跳,会频繁alarm,二分法是从上往下下调心跳,因此心跳周期是逐渐缩小,一开始不会频繁的alarm,比较省电。
【3】探测周期短:微信智能心跳是逐渐的通过累加探测步长来上调心跳,上调的趋势比较稳定,但是如果step设置的比较小,那么会导致上调缓慢,探测到稳定心跳所需要的时间比较长(24分钟);二分法智能心跳的心跳调整波动比较大,成功了就上调一半,失败了就下调一半,所以探测到稳定心跳的时间会比较短(14分钟),但是其实这个都是相对的,如果NAT超时时间为2分钟,那么微信智能心跳一下子就能探测到了,而二分法智能心跳要调整好多次,反正是看NAT超时时间距离最初开始探测的curHeat比较接近,所以curHeart可以通过大数据搜集分析,针对各个地区给出不同的curHeart。
【4】探测期间不够稳定:微信智能心跳的探测过程很稳定,基本不会导致心跳失败,因为它是从最小的开始探测;二分法智能心跳就不一样了,以为curHeart的调整波动比较大,一开始探测一下子上调或者下调一半很容易就超出NAT超时时间,在探测前期会有比较频繁的失败心跳;当然,这个也是相对的,最终都要取决与curHeart的初始值,minHeart,maxHeart,如果这些值设置的合适,那么二分法智能心跳将会很快的探测到稳定心跳。
8、Android机子上存在的问题
也就是Android系统里alarm的对齐唤醒问题。国内的手机厂商例如华为,魅族,小米都是自定制的android系统,对于AlarmManager都有对齐唤醒策略,因此会导致心跳alarm的时间不准确,例如设置了270秒alarm一次,但是在这些手机上可能要推迟到300秒才能唤醒,那么问题来了,如果NAT超时时间是2分钟,而这些手机的alarm最小间隔是5分钟,那就坑了,永远无法探测到最佳心跳,你设置120秒的alarm,手机系统也给你延迟到5分钟才执行alarm,不过这种情况只有在手机休眠的时候才会对齐唤醒,在手机不休眠的时候,我侧过,alarm计时还是准确的。