Kafka-0.11.0.1鉴权机制解析

1、命令入口

经常使用Kafka的同学可能已经注意到了,赋予某个用户消费某个topic权限的命令是:

bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer \
  --authorizer-properties zookeeper.connect=localhost:2181 --add \
  --allow-principal User:* --operation read --topic test --group mygroup

在这个命令中,清晰得指出了鉴权使用的类是kafka.security.auth.SimpleAclAuthorizer;
当然你也可以不指定,查看kafka-acl脚本,只有一句话:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand "$@",

打开AclCommand类的代码,解析命令行参数的代码包含这么一段:

class AclCommandOptions(args: Array[String]) {
    val parser = new OptionParser(false)
    val authorizerOpt = parser.accepts("authorizer", "Fully qualified class name of the authorizer, defaults to kafka.security.auth.SimpleAclAuthorizer.")
      .withRequiredArg
      .describedAs("authorizer")
      .ofType(classOf[String])
      .defaultsTo(classOf[SimpleAclAuthorizer].getName)

不指定鉴权类时,默认就是kafka.security.auth.SimpleAclAuthorizer

2、 逐句解读kafka.security.auth.SimpleAclAuthorizer

在解读代码前,先解释下Acl的概念。
顾名思义,ACL是访问控制列表(access control list)。但是,大家经常会误认为权限控制是ACL实现的。其实,ACL只是提供了存储机制,具体的权限控制逻辑由权限控制模块实现。另外,ACL权限分为功能权限和数据权限。

联系到Kafka topic操作,创建、修改、删除topic的权限属于功能权限,用户能否在某个topic生产和消费的权限属于数据权限。

创建、修改、删除Topic的权限不是由kafka.security.auth.SimpleAclAuthorizer类控制的,而是由ZK控制。

org.I0Itec.zkclient.ZkClient类

/**
     * Add authentication information to the connection. This will be used to identify the user and check access to
     * nodes protected by ACLs
     * 
     * @param scheme
     * @param auth
     */
    public void addAuthInfo(final String scheme, final byte[] auth) {
        retryUntilConnected(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                _connection.addAuthInfo(scheme, auth);
                return null;
            }
        });
    }

功能权限不是本文分析的重点,不展开叙述。

SimpleAclAuthorizer类只有368行(包含注释),完成鉴权的核心方法只有不到30行。(Kafka的代码还是很简洁的!)

  override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
    val principal = session.principal
    val host = session.clientAddress.getHostAddress
    val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))

    // Check if there is any Deny acl match that would disallow this operation.
    val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls)

    // Check if there are any Allow ACLs which would allow this operation.
    // Allowing read, write, delete, or alter implies allowing describe.
    // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
    val allowOps = operation match {
      case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
      case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
      case _ => Set[Operation](operation)
    }
    val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls))

    //we allow an operation if a user is a super user or if no acls are found and user has configured to allow all users
    //when no acls are found or if no deny acls are found and at least one allow acls matches.
    val authorized = isSuperUser(operation, resource, principal, host) ||
      isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
      (!denyMatch && allowMatch)

    logAuditMessage(principal, authorized, operation, resource, host)
    authorized
  }

方法只有三个参数:session保存了使用鉴权方法的客户端信息;operation就是具体鉴权的操作。比如:consumer、producer等;Resource就是本次鉴权的资源。比如:topic,group。

方法体内,首先得到本次鉴权的principal,pricinpal是KafkaPrincipal类的实例,类定义如下:

public class KafkaPrincipal implements Principal {
    public static final String SEPARATOR = ":";
    public static final String USER_TYPE = "User";
    public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS");

    private String principalType;
    private String name;

    public KafkaPrincipal(String principalType, String name) {
        if (principalType == null || name == null) {
            throw new IllegalArgumentException("principalType and name can not be null");
        }
        this.principalType = principalType;
        this.name = name;
    }

    public static KafkaPrincipal fromString(String str) {
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
        }

        String[] split = str.split(SEPARATOR, 2);

        if (split == null || split.length != 2) {
            throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
        }

        return new KafkaPrincipal(split[0], split[1]);
    }

代码比较简单,有用的信息主要是这两句:

if (split == null || split.length != 2) {
 throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
        }

为何有用,暂且留个伏笔。

接着看下面的代码,是至关重要的一句:

val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))

这句代码会去ZK查询本资源的Acl列表,比如本次鉴权的资源对象是:new Resource(Topic, "myTopic"),有两个注意点:

1、ResourceType是个枚举类,定义了Kafka里的各种资源,比如topic,group等。

2、不是实时从ZK中读取,而是为了提高效率,从缓存中读取。那么缓存什么时候更新呢?

可能读者已经注意到了这个变量Resource.WildCardResource,这个变量是个字符串,就是*号。为何要搞个名称是*号的资源呢?

kafka可能包含成百上千的Topic,消费时候可能包含更多的group,我们不大可能对这些资源一一赋予权限,所以就需要通配机制。比如,文章开头那条命令稍作修改,将具体的资源名称改为*号:

bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer \
  --authorizer-properties zookeeper.connect=localhost:2181 --add \
  --allow-principal User:* --operation read --topic '*' --group '*'

表示对所有的topic和group赋予权限。这条语句为何能起作用,就是因为我们查询Acl时候构造了名称为*号的资源!
各资源的Acl都是以json格式存储在ZK路径上,

/kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}

/kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}

/kafka-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}

我们看下Acl类的代码:

object Acl {
  val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
  val WildCardHost: String = "*"
  val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All)
  val PrincipalKey = "principal"
  val PermissionTypeKey = "permissionType"
  val OperationKey = "operation"
  val HostsKey = "host"
  val VersionKey = "version"
  val CurrentVersion = 1
  val AclsKey = "acls"
  
  ..........
  
    def fromJson(aclJson: String): Set[Acl] = {
    if (aclJson == null || aclJson.isEmpty)
      return collection.immutable.Set.empty[Acl]

    var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]()
    Json.parseFull(aclJson) match {
      case Some(m) =>
        val aclMap = m.asInstanceOf[Map[String, Any]]
        //the acl json version.
        require(aclMap(VersionKey) == CurrentVersion)
        val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
        aclSet.foreach(item => {
          val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
          val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
          val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
          val host: String = item(HostsKey).asInstanceOf[String]
          acls += new Acl(principal, permissionType, host, operation)
        })
      case None =>
    }
    acls.toSet
  }
  

通过fromJson方法将ZK中存储的Acl json反序列化为Acl对象,KafkaPrincipal使用fromString方法反序列化,那么要求我们保存在ZK中的用户pricinpal字符串必须是类型:名称格式。(呼应上面伏笔)

PermissionType和Operation类都是通过fromString方法反序列化,这要求我们保存在ZK中的字符串必须在它们的枚举定义范围内。

接下来是真正的鉴权逻辑:

1、首先判断是否有不允许此次操作的Acl

2、接着判断是否有允许此操作的Acl(Describe操作比较特殊,允许read, write, delete, or alter都意味着允许Describe)

3、然后判断是否是超级用户

4、再判断是否没有Acl时候默认是有权限,这是Kafka的一个配置项allow.everyone.if.no.acl.found

最终判断本次鉴权是否通过的逻辑,代码的注释写得很清楚:

1、如果是超级用户,通过

2、如果配置项allow.everyone.if.no.acl.found设置为true,通过

3、如果没有Acl Deny,且至少有一个Acl Allow,通过

至此,这个方法就解读完了。是不是很简单?

上面遗留了一个问题,Acl的缓存什么时候更新呢?在初始化方法configure中可以看到,

  override def configure(javaConfigs: util.Map[String, _]) {
    val configs = javaConfigs.asScala
    val props = new java.util.Properties()
    configs.foreach { case (key, value) => props.put(key, value.toString) }

    superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
      case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet
    }.getOrElse(Set.empty[KafkaPrincipal])

    shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)

    // Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
    // means that `KafkaConfig.zkConnect` must always be set by the user (even if `SimpleAclAuthorizer.ZkUrlProp` is also
    // set).
    val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
    val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
    val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
    val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)

    zkUtils = ZkUtils(zkUrl,
                      sessionTimeout = zkSessionTimeOutMs,
                      connectionTimeout = zkConnectionTimeoutMs,
                      kafkaConfig.zkEnableSecureAcls)
    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)

    loadCache()

    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
    aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
    aclChangeListener.init()
  }

1、初始化的时候会loadCache

2、初始化时候注册了ZK的aclChange监听(/kafka-acl-changes/acl_changes_0000000000)(ZK的监听你还知道哪些?)

当某个资源的acl发生变化时,就是在ZK的/kafka-acl-changes路径下生成一个递增的流水号Node,Node的Data存储acl发生变化的资源信息。资源信息字符串参考Resource的定义:

case class Resource(resourceType: ResourceType, name: String) {

  override def toString: String = {
    resourceType.name + Resource.Separator + name
  }
}

3、 二次开发kafka.security.auth.SimpleAclAuthorizer

Kafka-0.11.0.1版本只支持对用户的鉴权,那么如果我们想支持对用户角色的鉴权,该如何做呢?

如果你读懂了上面的代码解读,很容易想到解决方案。

1、创建用户角色时候,在ZK相应的资源节点写入Acl数据(你只需构造一个表示角色的KafkaPrincipal)

2、鉴权时候,查找到用户绑定的角色,读取该角色的Acl进行鉴权(你只需构造一个表示角色的KafkaPrincipal)

3、当角色的权限发生变化时,记得在/kafka-acl-changes下生成流水记录,保证角色权限的变更实时更新。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,539评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,594评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,871评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,963评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,984评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,763评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,468评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,850评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,002评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,144评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,823评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,483评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,026评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,150评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,415评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,092评论 2 355

推荐阅读更多精彩内容