1、RocketMQ ACL使用
ACL全称access control list,俗称访问控制列表。主要包括如下角色
- 用户(用户密码)
- 资源(topic、消费)
- 权限(是否可以发送或者消费消息)
- 角色(是否为管理员,并可以配置是否可以进行更新或删除主题和订阅组)
1.1 Broker端开启ACL验证
首先Broker.conf文件配置 aclEnable=true ,然后需要将 plain_acl.yml 放在 ${ROCKETMQ_HOME}/conf目录, plain_acl.yml
globalWhiteRemoteAddresses: // 设置IP白名单
- 10.10.103.*
- 192.168.0.*
accounts: // 配置用户信息
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: // 用户级别的IP地址白名单
admin: false // 当为 true 可以执行更新、删除主题或者订阅组
defaultTopicPerm: DENY // DENY拒绝、SUB 订阅权限、PUB 发送权限
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
1.2 服务端验证
服务端当配置好plain_acl.yml后并在 broker.conf中开启 aclEnable=true ,服务端则会进行下面逻辑验证
- 客户端请求Ip和全局白名单匹配
- 请求是否包含用户名并判断用户是否匹配
- 用户级别的白名单
- 签名验证
- 该用户是否具有admin权限
- 判断admin配置了需要验证的权限并进行验证
2、源码实现
2、1客户端层面
在构造函数添加 RPCHook ,进行创建ACL对象实例。
AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials("rocketmq", "123456"))
DefaultMQProducer producer = new DefaultMQProducer("GID_test_class", aclClientRPCHook);
发送消息前置执行钩子函数并验证ACL权限,若抛异常后则无法发送消息。
DefaultMQProducerImpl#executeSendMessageHookBefore
public void executeSendMessageHookBefore(final SendMessageContext context) {
if (!this.sendMessageHookList.isEmpty()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
hook.sendMessageBefore(context);
} catch (Throwable e) {
log.warn("failed to executeSendMessageHookBefore", e);
}
}
}
}
2、2服务端层面
Broker端初始化ALC配置, 加载 AccessValidator配置
1. 核心是基于SPI机制,读取META-INF/service/org.apache.rocketmq.acl.AccessValidator 访问验证器
2. 遍历访问验证器,向Broker注册钩子函数。RPCHook在接受请求前进行处理请求
3. 调用AccessValidator#validate,验证acl信息,如果拥有该执行权限则通过,否则报AclException
private void initialAcl() {
if (!this.brokerConfig.isAclEnable()) {
return;
}
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class){
for (AccessValidator accessValidator: accessValidators) {
final AccessValidator validator = accessValidator;
accessValidatorMap.put(validator.getClass(),validator);
this.registerServerRPCHook(new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
validator.validate(validator.parse(request, remoteAddr));
}
});
}
}
AccessValidator 是访问验证器接口,PlainAccessValidator是该接口的具体实现。
AccessResource parse(RemotingCommand request, String remoteAddr);
从远端请求中解析本次请求对应的访问资源
void validate(AccessResource accessResource);
根据本次需要访问的权限,与请求用户拥有的权限进行对比验证,判断是否拥有,如果没有则ACLException
当远端请求过来后,触发钩子函数RPCHook,调用 PlainAccessValidator#parse ,并根据 client 端创建 PlainAccessResource实例对象
PlainAccessResource
private String accessKey; // 访问Key,用户名
private String secretKey; // 用户密码
private String whiteRemoteAddress; // 远程IP地址白名单
private boolean admin; // 是否是管理员角色
private byte defaultTopicPerm = 1; //默认topic访问权限,如果没有配置topic的权限,则Topic默认的访问权限为1,表示为DENY
private byte defaultGroupPerm = 1; // 默认的消费组访问权限,默认为DENY
private Map<String, Byte> resourcePermMap; // 资源需要的访问权限映射表
private RemoteAddressStrategy remoteAddressStrategy; //远程IP地址验证
private int requestCode; //请求类型code
private byte[] content; // 请求内容
private String signature; // 签名字符串,client端进行将请求参数排序,使用secretKey生成签名字符串。服务端则验证签名
private String secretToken;
private String recognition;
- vPlainAccessValidator#parse,解析远端请求过程 进行验证并转化为PlainAccessResource实例。
- 封装PlainAccessResource对象实例,包括远程访问IP地址、requestCode、accessKey(请求用户名)、签名字符串(signature)、secretToken
根据请求命令,设置本次请求需要拥有的权限。 - 验证签名,根据扩展字段进行排序,便于生成签名字符串,然后将扩展字段与请求体(body)写入content字段。完成从请求头中解析出本次请求需要验证的权限。
PlainAccessValidator#parse
public AccessResource parse(RemotingCommand request, String remoteAddr) {
PlainAccessResource accessResource = new PlainAccessResource();
accessResource.setXXX(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
try {
switch (request.getCode()) {
case RequestCode.SEND_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
break;
case RequestCode.SEND_MESSAGE_V2:
accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
break;
case RequestCode.CONSUMER_SEND_MSG_BACK:
accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
break;
case RequestCode.PULL_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
break;
case RequestCode.QUERY_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
break;
case RequestCode.HEART_BEAT:
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
case RequestCode.UNREGISTER_CLIENT:
final UnregisterClientRequestHeader unregisterClientRequestHeader =
(UnregisterClientRequestHeader) request
.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.UPDATE_CONSUMER_OFFSET:
final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
break;
default:
break;
}
}
SortedMap<String, String> map = new TreeMap<String, String>();
for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey()) && !MixAll.UNIQUE_MSG_QUERY_FLAG.equals(entry.getKey())) {
map.put(entry.getKey(), entry.getValue());
}
}
accessResource.setContent(AclUtils.combineRequestContent(request, map));
return accessResource;
}
- 加载配置acl配置文件,可以运行时动态修改,最后加载到内存中
- PlainAccessValidator#validate -> PlainPermissionManager#validate
根据访问的权限与Broker端配置的权限(plain_acl.yml)进行对比验证,并验证.
public PlainPermissionManager() {
load();
watch();
}
// 解析用户配置的访问资源,全局白名单,并加载到内存中
public void load() {
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject.class);
if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
}
}
JSONArray accounts = plainAclConfData.getJSONArray(AclConstants.CONFIG_ACCOUNTS);
if (accounts != null && !accounts.isEmpty()) {
List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
}
}
}
// 监听器,默认以500ms的频率判断文件的内容是否变化(根据文件md5签名进行对比),并重新加载配置文件。该方法启动一个守护线程处理
private void watch() {
try {
String watchFilePath = fileHome + fileName;
FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {
@Override
public void onChanged(String path) {
load();
}
});
fileWatchService.start();
this.isWatchStart = true;
}
}
1、如果当前的请求命令属于必须是Admin用户才能访问的权限,并且当前用户并不是管理员角色,则抛出异常
2、遍历需要权限与拥有的权限进行对比,如果配置对应的权限,则判断是否匹配;如果未配置权限,则判断默认权限时是否允许
public void validate(PlainAccessResource plainAccessResource) {
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
if (remoteAddressStrategy.match(plainAccessResource)) {
return;
}
}
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
return;
}
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
checkPerm(plainAccessResource, ownedAccess);
}
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
}
Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap();
Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();
if (needCheckedPermMap == null) {
return;
}
if (ownedPermMap == null && ownedAccess.isAdmin()) {
return;
}
for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {
String resource = needCheckedEntry.getKey();
Byte neededPerm = needCheckedEntry.getValue();
boolean isGroup = PlainAccessResource.isRetryTopic(resource);
if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) {
// Check the default perm
byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() :
ownedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
continue;
}
if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
}
}