问题备忘:
编号 | 问题 | 解决 |
---|---|---|
0 | 用apiserver的url方式请求的时候具备了哪些权限?以及用kubeconfig或者是incluster的方式的话权限是怎么控制的? | master url的方式的权限和kube-admin的权限是一样的,incluster的方式就是部署的时候pod使用的rbac的权限,而kubeconfig的方式就是kubeconfig使用的用户的权限 |
1 | 桶令牌的限流逻辑走向 | |
2 | 把结构体赋给k8s.io/client-go/kubernetes中的kubernetes.Interface优势 |
正文:
本文来分析从初始化一个client-go客户端,到调用k8s的apiserver,然后k8s进行资源的创建的过程中,都发生了些什么,以及中间的代码分析。
主要分为几个过程:
1.client-go客户端的初始化
2.如何去调用apiserver接口
3.apiserver中是如何定义接口请求的
4.apiserver收到资源请求的时候,如何去处理资源的
第四点涉及到deploymentController的资源循环控制,scheduler资源的调度,etcd数据的存储更新等模块,需要看的代码比较多,需要后面拆分成多个模块来分析。
本文先对1,2两点进行分析。
一、KubeClientset的客户端初始化
初始化一个k8s客户端有多种方式
(1)outOfCluster(masterUrl的方式,kubeconfig配置的方式)
(2)InClusterConfig
关于配置的权限分析先备忘。
// 1.outOfCluster
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
clientset, err := kubernetes.NewForConfig(config)
var i kubernetes.Interface = clientset // 把结构体赋给k8s.io/client-go/kubernetes中的kubernetes.Interface
kubeClientset = &i
// 2.inCluster
config, err := rest.InClusterConfig()
clientset, err := kubernetes.NewForConfig(config)
首先来分析outOfCluster的方式,看到clientcmd.BuildConfigFromFlags
// BuildConfigFromFlags is a helper function that builds configs from a master
// url or a kubeconfig filepath. These are passed in as command line flags for cluster
// components. Warnings should reflect this usage. If neither masterUrl or kubeconfigPath
// are passed in we fallback to inClusterConfig. If inClusterConfig fails, we fallback
// to the default config.
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
if kubeconfigPath == "" && masterUrl == "" {
klog.Warningf("Neither --kubeconfig nor --master was specified. Using the inClusterConfig. This might not work.")
kubeconfig, err := restclient.InClusterConfig()
if err == nil {
return kubeconfig, nil
}
klog.Warning("error creating inClusterConfig, falling back to default config: ", err)
}
return NewNonInteractiveDeferredLoadingClientConfig(
&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}
通过注释可以看到BuildConfigFromFlags通过masterUrl或者是kubeconfigPath生成了restClient的config,如果两个都没有的话,就默认是inCluster的方式。
生成了用于初始化的config之后,调用kubernetes.NewForConfig(config)生成clientSet。
抛出问题(1). clientSet是什么样的结构形式?
下面来看到kubernetes.NewForConfig(config)的代码:
// NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var cs Clientset
var err error
cs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.appsV1, err = appsv1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.appsV1beta1, err = appsv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
cs.appsV1beta2, err = appsv1beta2.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
......
return &cs, nil
}
首先来看到代码
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
看源码的解释可以知道是为了限制从这个客户端到master的qps限制的,用了桶令牌的原理来限流,具体的逻辑有时间再深入研究。
然后是一些不同版本的client的初始化,先看到clientSet的结构体
type Clientset struct {
*discovery.DiscoveryClient
admissionregistrationV1beta1 *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
appsV1 *appsv1.AppsV1Client
appsV1beta1 *appsv1beta1.AppsV1beta1Client
appsV1beta2 *appsv1beta2.AppsV1beta2Client
auditregistrationV1alpha1 *auditregistrationv1alpha1.AuditregistrationV1alpha1Client
authenticationV1 *authenticationv1.AuthenticationV1Client
authenticationV1beta1 *authenticationv1beta1.AuthenticationV1beta1Client
authorizationV1 *authorizationv1.AuthorizationV1Client
authorizationV1beta1 *authorizationv1beta1.AuthorizationV1beta1Client
autoscalingV1 *autoscalingv1.AutoscalingV1Client
......
storageV1beta1 *storagev1beta1.StorageV1beta1Client
storageV1 *storagev1.StorageV1Client
storageV1alpha1 *storagev1alpha1.StorageV1alpha1Client
}
可以看到clientSet其实很多不同的版本或者是类型的client的集合,后面比如要对pv/pvc进行资源的操作,那么就可以调用clientSet里的storageV1...几个版本的api进行资源的创建。
类似的,对应到kubectl用yaml文件进行创建的时候,yaml文件上的apiVersion指明了我创建这个资源需要用的api版本。
而某一个版本的k8s集群,apiserver中自然是会兼容上面的这些不同版本的api请求的。
下面来看clientSet里的每一个client的区别:
首先看到准入控制的admissionregistrationV1beta1:
cs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfig(&configShallowCopy)
if err != nil {
return nil, err
}
每个不同的客户端都会有自己的一套初始化config方法(NewForConfig),看到admissionregistrationv1beta1的NewConfig逻辑:
// NewForConfig creates a new AdmissionregistrationV1beta1Client for the given config.
func NewForConfig(c *rest.Config) (*AdmissionregistrationV1beta1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
return &AdmissionregistrationV1beta1Client{client}, nil
}
func setConfigDefaults(config *rest.Config) error {
gv := v1beta1.SchemeGroupVersion // 给config添加各种属性,包括组别,版本,apiPath等,然后返回。
config.GroupVersion = &gv
config.APIPath = "/apis"
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
return nil
}
const GroupName = "admissionregistration.k8s.io"
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1beta1"}
可以看到NewForConfig先调用到了setConfigDefaults,setConfigDefaults就是给config添加各种属性,然后NewForConfig对config封装成AdmissionregistrationV1beta1Client进行返回。
然后看到核心资源创建用到的appsV1:
// NewForConfig creates a new AppsV1Client for the given config.
func NewForConfig(c *rest.Config) (*AppsV1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
return &AppsV1Client{client}, nil
}
// GroupName is the group name use in this package
const GroupName = "apps"
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
可以看到除了schema.GroupVersion不同之外,其他都差不多。
可以看到AdmissionregistrationV1beta1Client 其实是一个rest.Interface接口。
这个接口具体的实现是什么呢?因为放入AdmissionregistrationV1beta1Client的是client, err := rest.RESTClientFor(&config)的返回,所以看到rest.RESTClientFor方法。
// RESTClientFor returns a RESTClient that satisfies the requested attributes on a client Config
// object. Note that a RESTClient may require fields that are optional when initializing a Client.
// A RESTClient created by this method is generic - it expects to operate on an API that follows
// the Kubernetes conventions, but may not be the Kubernetes API.
func RESTClientFor(config *Config) (*RESTClient, error) {
if config.GroupVersion == nil {
return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
}
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
// 获取baseURL(*url.URL类型,包括对http/https的判断等等), versionedAPIPath(加上api版本的api路径)
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
// 通过config生成自己的transport
transport, err := TransportFor(config)
var httpClient *http.Client
if transport != http.DefaultTransport {
httpClient = &http.Client{Transport: transport}
if config.Timeout > 0 {
httpClient.Timeout = config.Timeout
}
}
return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient)
}
首先是GroupVersion,NegotiatedSerializer是否为空的校验,
然后是QPS和Burst的设置,这两个值用于对进一步的flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)使用。这一块原理等待后续深入研究。
最终转入到NewRESTClient方法:
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
}
base.RawQuery = ""
base.Fragment = ""
if config.GroupVersion == nil {
config.GroupVersion = &schema.GroupVersion{}
}
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
serializers, err := createSerializers(config)
if err != nil {
return nil, err
}
var throttle flowcontrol.RateLimiter
if maxQPS > 0 && rateLimiter == nil {
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
} else if rateLimiter != nil {
throttle = rateLimiter
}
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
contentConfig: config,
serializers: *serializers,
createBackoffMgr: readExpBackoffConfig,
Throttle: throttle,
Client: client,
}, nil
}
这个方法也没干啥,也是一些参数的配置和路由的转换,以及对qps和burst的进行限流的配置。
最终得到的结构体是:
&RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
contentConfig: config,
serializers: *serializers,
createBackoffMgr: readExpBackoffConfig,
Throttle: throttle,
Client: client,
}
所以前面的rest.Interface是由RESTClient结构体来实现的,最终的实现接口调用的时候,也是走的这个结构体的方法来实现的。
接下来转到第二部分:
二、如何去调用apiserver接口
先来看rest.Interface接口有哪些方法由RESTClient来实现:
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}
可以看到就是一些基本的增删改查方法,具体干了些啥呢?
比如post方法:
func (c *RESTClient) Verb(verb string) *Request {
backoff := c.createBackoffMgr()
if c.Client == nil {
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, 0)
}
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, c.Client.Timeout)
}
// Post begins a POST request. Short for c.Verb("POST").
func (c *RESTClient) Post() *Request {
return c.Verb("POST")
}
其上层的封装
// Create takes the representation of a deployment and creates it. Returns the server's representation of the deployment, and an error, if there is any.
func (c *deployments) Create(deployment *v1.Deployment) (result *v1.Deployment, err error) {
result = &v1.Deployment{}
err = c.client.Post().
Namespace(c.ns).
Resource("deployments").
Body(deployment).
Do().
Into(result)
return
}
所以后面调用Post方法的时候,会调用Verb方法,Verb方法其实就是封装出一个Request结构体,最终请求的时候会用到。下面是Request的结构体。具体的每个参数暂时不整理。
type Request struct {
// required
client HTTPClient
verb string
baseURL *url.URL
content ContentConfig
serializers Serializers
// generic components accessible via method setters
pathPrefix string
subpath string
params url.Values
headers http.Header
// structural elements of the request that are part of the Kubernetes API conventions
namespace string
namespaceSet bool
resource string
resourceName string
subresource string
timeout time.Duration
// output
err error
body io.Reader
// This is only used for per-request timeouts, deadlines, and cancellations.
ctx context.Context
backoffMgr BackoffManager
throttle flowcontrol.RateLimiter
}
调完Post之后再调Namespace,Resource,Body,都是对Request进一步配置。
最终执行在Do方法中。
Into将结果输出到result。
抛出问题(##):
这里的权限校验和准入控制是怎么做的???(校验的逻辑应该在apiserver中)。但是请求的时候证书这些是怎么带入的呢?
我们用kube_config的方式来找寻逻辑:
kube-config是用的rbac的权限的,可以看到字段:
client-certificate-data:证书
client-key-data:证书私钥
全局搜索可以看到type AuthInfo结构体中用到了这个字段:
type AuthInfo struct {
// LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
LocationOfOrigin string
// ClientCertificate is the path to a client cert file for TLS.
// +optional
ClientCertificate string `json:"client-certificate,omitempty"`
// ClientCertificateData contains PEM-encoded data from a client cert file for TLS. Overrides ClientCertificate
// +optional
ClientCertificateData []byte `json:"client-certificate-data,omitempty"`
// ClientKey is the path to a client key file for TLS.
// +optional
ClientKey string `json:"client-key,omitempty"`
// ClientKeyData contains PEM-encoded data from a client key file for TLS. Overrides ClientKey
查找字段ClientCertificateData的被使用情况,可以看到tool.clientcmd.client_config.go文件中用到了这个字段:
func (config *DirectClientConfig) getUserIdentificationPartialConfig(configAuthInfo clientcmdapi.AuthInfo, fallbackReader io.Reader, persistAuthConfig restclient.AuthProviderConfigPersister) (*restclient.Config, error) {
....
mergedConfig.CertFile = configAuthInfo.ClientCertificate
mergedConfig.CertData = configAuthInfo.ClientCertificateData
mergedConfig.KeyFile = configAuthInfo.ClientKey
mergedConfig.KeyData = configAuthInfo.ClientKeyData
}
.....
return mergedConfig, nil
那么传入的configAuthInfo是从哪里来的呢,回到最开始的时候的
clientcmd.BuildConfigFromFlags("", *kubeconfig),看的深入一些可以看到下面的kubeconfig文件的加载:
return NewNonInteractiveDeferredLoadingClientConfig(
&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error){
......
config, err := LoadFromFile(filename)
....
}
接下来再看到getUserIdentificationPartialConfig方法,getUserIdentificationPartialConfig是生成一个*restclient.Config,
注意其实这个结构体就是一开始clientcmd.BuildConfigFromFlags("", kubeconfig)返回的结构体类型,只是一开始没有深入的去分析。
restclient.Config包含了后面初始化原生http的时候的很多字段。
这里我们来看restclient.Config里面的TLSClientConfig字段
/ +k8s:deepcopy-gen=true
// TLSClientConfig contains settings to enable transport layer security
type TLSClientConfig struct {
// Server should be accessed without verifying the TLS certificate. For testing only.
Insecure bool
// ServerName is passed to the server for SNI and is used in the client to check server
// ceritificates against. If ServerName is empty, the hostname used to contact the
// server is used.
ServerName string
// Server requires TLS client certificate authentication
CertFile string
// Server requires TLS client certificate authentication
KeyFile string
// Trusted root certificates for server
CAFile string
// CertData holds PEM-encoded bytes (typically read from a client certificate file).
// CertData takes precedence over CertFile
CertData []byte
// KeyData holds PEM-encoded bytes (typically read from a client certificate key file).
// KeyData takes precedence over KeyFile
KeyData []byte
// CAData holds PEM-encoded bytes (typically read from a root certificates bundle).
// CAData takes precedence over CAFile
CAData []byte
}
这个字段其实就是用于https请求的时候用来做证书认证的。
回到最开始的问题,关于权限的认证过程:
1.加载kubeconfig配置,生成证书等配置的结构体(上面已经分析过)
2.调用getUserIdentificationPartialConfig,传入证书相关信息,最后得到kubernetes.NewForConfig(config)需要的restclient.Config结构体。
这个过程的调用顺序:
(1)
cfg, err := clientcmd.BuildConfigFromFlags(apiServerAddr, "")
(2)
func BuildConfigFromFlags {
return NewNonInteractiveDeferredLoadingClientConfig(
&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}
这里调用了ClientConfig()方法
(3)
ClientConfigLoadingRules指定了kube_config的加载解析规则
(4)
func NewNonInteractiveDeferredLoadingClientConfig(loader ClientConfigLoader, overrides *ConfigOverrides) ClientConfig {
return &DeferredLoadingClientConfig{loader: loader, overrides: overrides, icc: &inClusterClientConfig{overrides: overrides}}
}
返回DeferredLoadingClientConfig结构体
所以是由DeferredLoadingClientConfig结构体来实现ClientConfig()方法的
(5)
func (config *DeferredLoadingClientConfig) ClientConfig() (*restclient.Config, error) {
mergedClientConfig, err := config.createClientConfig()
mergedConfig, err := mergedClientConfig.ClientConfig()
return mergedConfig, err
}
(6)
func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
if config.fallbackReader != nil {
mergedClientConfig = NewInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.fallbackReader, config.loader)
} else {
mergedClientConfig = NewNonInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.loader)
}
}
(7)
func NewInteractiveClientConfig(config clientcmdapi.Config, contextName string, overrides *ConfigOverrides, fallbackReader io.Reader, configAccess ConfigAccess) ClientConfig {
return &DirectClientConfig{config, contextName, overrides, fallbackReader, configAccess, promptedCredentials{}}
}
(8)
func (config *DirectClientConfig) ClientConfig() (*restclient.Config, error){
...
serverAuthPartialConfig, err := getServerIdentificationPartialConfig(configAuthInfo, configClusterInfo)
if err != nil {
return nil, err
}
...
}
(9)
func getServerIdentificationPartialConfig(configAuthInfo clientcmdapi.AuthInfo, configClusterInfo clientcmdapi.Cluster) (*restclient.Config, error) {
mergedConfig := &restclient.Config{}
// configClusterInfo holds the information identify the server provided by .kubeconfig
configClientConfig := &restclient.Config{}
configClientConfig.CAFile = configClusterInfo.CertificateAuthority
configClientConfig.CAData = configClusterInfo.CertificateAuthorityData
configClientConfig.Insecure = configClusterInfo.InsecureSkipTLSVerify
mergo.MergeWithOverwrite(mergedConfig, configClientConfig)
return mergedConfig, nil
}
// 天道好轮回,最终找到了调用getServerIdentificationPartialConfig的调用链
证书的加载到restclient.Config过程分析完了,下面问题来了:
问题(##):具体的rest请求的时候,证书是怎么带的,然后怎么使用到restclient.Config这里面的证书的??
其实这个过程就在前面的RESTClientFor过程中:
transport, err := TransportFor(config)
if err != nil {
return nil, err
}
func TransportFor(config *Config) (http.RoundTripper, error) {
cfg, err := config.TransportConfig()
if err != nil {
return nil, err
}
return transport.New(cfg)
}
// TransportConfig converts a client config to an appropriate transport config.
func (c *Config) TransportConfig() (*transport.Config, error) {
conf := &transport.Config{
UserAgent: c.UserAgent,
Transport: c.Transport,
WrapTransport: c.WrapTransport,
TLS: transport.TLSConfig{
Insecure: c.Insecure,
ServerName: c.ServerName,
CAFile: c.CAFile,
CAData: c.CAData,
CertFile: c.CertFile,
CertData: c.CertData,
KeyFile: c.KeyFile,
KeyData: c.KeyData,
},
Username: c.Username,
Password: c.Password,
BearerToken: c.BearerToken,
Impersonate: transport.ImpersonationConfig{
UserName: c.Impersonate.UserName,
Groups: c.Impersonate.Groups,
Extra: c.Impersonate.Extra,
},
Dial: c.Dial,
}
....
}
可以看到TransportConfig是http请求的时候带的,很多参数CAFile,CAData,Username等,都是kube_config中的,请求apiserver的时候apiserver会来拿的。
最终发现Transport才是带货王呀!
抛出问题(##):Request的这些参数,Do的时候怎么用的??
具体Transport对各路参数都加了之后,
其他的参数要么就是放在路由里,要么就是放在header里,
然后加一些限流呀什么的
其实就是包装好了http.Client的http请求,
最终执行原生的http的Do方法而已。