kubernetes源码阅读——client-go 客户端

client-go

  • client-go是Kubernetes的Go语言的官方编程式交互客户端库,提供对Kubernetes API Server服务的交互访问。
  • client-go支持3种Client客户端:
  1. RESTClient是最基础的客户端。它对HTTP Request进行了封装。ClientSet、DynamicClient及DiscoveryClient客户端都是基于RESTClient实现的。
  2. ClientSet基于RESTClient,是kubernetes所有内置资源客户端的集合。每一个 Resource和Version都对应一个函数。ClientSet开发者对Kubernetes进行二次开发时最常使用的。
  3. DynamicClient是一种动态的 client,它能访问kubernetes 所有的资源,即内置资源和CRD自定义资源。

clientset

  • clientset示例:
// staging\src\k8s.io\client-go\examples\create-update-delete-deployment\main.go
func main() {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        // kubeconfig默认存放在$HOME/.kube/config路径下
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()
    
    // 读取kubeconfig配置信息并实例化rest.Config对象。配置信息包括集群、用户、命名空间和身份验证等,
    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }
    // 通过kubeconfig配置信息实例化clientset对象
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }
    // 请求v1版本下的deployment资源。
    deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)

    deployment := &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name: "demo-deployment",
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: int32Ptr(2),
            Selector: &metav1.LabelSelector{
                MatchLabels: map[string]string{
                    "app": "demo",
                },
            },
            Template: apiv1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: map[string]string{
                        "app": "demo",
                    },
                },
                Spec: apiv1.PodSpec{
                    Containers: []apiv1.Container{
                        {
                            Name:  "web",
                            Image: "nginx:1.12",
                            Ports: []apiv1.ContainerPort{
                                {
                                    Name:          "http",
                                    Protocol:      apiv1.ProtocolTCP,
                                    ContainerPort: 80,
                                },
                            },
                        },
                    },
                },
            },
        },
    }

    // Create Deployment
    fmt.Println("Creating deployment...")
    result, err := deploymentsClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
    if err != nil {
        panic(err)
    }
    fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName())

    // Update Deployment
    prompt()
    fmt.Println("Updating deployment...")
    //    You have two options to Update() this Deployment:
    //
    //    1. Modify the "deployment" variable and call: Update(deployment).
    //       This works like the "kubectl replace" command and it overwrites/loses changes
    //       made by other clients between you Create() and Update() the object.
    //    2. Modify the "result" returned by Get() and retry Update(result) until
    //       you no longer get a conflict error. This way, you can preserve changes made
    //       by other clients between Create() and Update(). This is implemented below
    //           using the retry utility package included with client-go. (RECOMMENDED)
    //
    // More Info:
    // https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency

    retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
        // Retrieve the latest version of Deployment before attempting update
        // RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
        result, getErr := deploymentsClient.Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
        if getErr != nil {
            panic(fmt.Errorf("Failed to get latest version of Deployment: %v", getErr))
        }

        result.Spec.Replicas = int32Ptr(1)                           // reduce replica count
        result.Spec.Template.Spec.Containers[0].Image = "nginx:1.13" // change nginx version
        _, updateErr := deploymentsClient.Update(context.TODO(), result, metav1.UpdateOptions{})
        return updateErr
    })
    if retryErr != nil {
        panic(fmt.Errorf("Update failed: %v", retryErr))
    }
    fmt.Println("Updated deployment...")

    // List Deployments
    prompt()
    fmt.Printf("Listing deployments in namespace %q:\n", apiv1.NamespaceDefault)
    list, err := deploymentsClient.List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        panic(err)
    }
    for _, d := range list.Items {
        fmt.Printf(" * %s (%d replicas)\n", d.Name, *d.Spec.Replicas)
    }

    // Delete Deployment
    prompt()
    fmt.Println("Deleting deployment...")
    deletePolicy := metav1.DeletePropagationForeground
    if err := deploymentsClient.Delete(context.TODO(), "demo-deployment", metav1.DeleteOptions{
        PropagationPolicy: &deletePolicy,
    }); err != nil {
        panic(err)
    }
    fmt.Println("Deleted deployment.")
}
  • Deployments函数是一个资源接口对象,用于Deployment资源对象的管理,对包含Create、Update、Delete、Get、List、Watch、Patch等方法
// staging\src\k8s.io\client-go\kubernetes\typed\apps\v1\deployment.go
// DeploymentInterface has methods to work with Deployment resources.
type DeploymentInterface interface {
    Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error)
    Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
    UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
    Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
    DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
    Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error)
    List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error)
    Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
    Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error)
    Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
    ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
    GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
    UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)

    DeploymentExpansion
}

// deployments implements DeploymentInterface
type deployments struct {
    client rest.Interface
    ns     string
}

// Create takes the representation of a deployment and creates it.  Returns the server's representation of the deployment, and an error, if there is any.
func (c *deployments) Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (result *v1.Deployment, err error) {
    result = &v1.Deployment{}
    err = c.client.Post().
        Namespace(c.ns).
        Resource("deployments").
        VersionedParams(&opts, scheme.ParameterCodec).
        Body(deployment).
        Do(ctx).
        Into(result)
    return
}

dynamicClient

  • ClientSet需要预先实现每种Resource和Version的操作,其内部的数据都是结构化数据。而DynamicClient的内部数据是通过map[string]interface{}转换的非结构化数据,因此DynamicClient能够处理CRD自定义资源。
  • dynamicClient示例:
// staging\src\k8s.io\client-go\examples\dynamic-create-update-delete-deployment\main.go
func main() {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    namespace := "default"

    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }
    // 通过kubeconfig配置信息实例化dynamicClient对象
    client, err := dynamic.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}

    deployment := &unstructured.Unstructured{
        Object: map[string]interface{}{
            "apiVersion": "apps/v1",
            "kind":       "Deployment",
            "metadata": map[string]interface{}{
                "name": "demo-deployment",
            },
            "spec": map[string]interface{}{
                "replicas": 2,
                "selector": map[string]interface{}{
                    "matchLabels": map[string]interface{}{
                        "app": "demo",
                    },
                },
                "template": map[string]interface{}{
                    "metadata": map[string]interface{}{
                        "labels": map[string]interface{}{
                            "app": "demo",
                        },
                    },

                    "spec": map[string]interface{}{
                        "containers": []map[string]interface{}{
                            {
                                "name":  "web",
                                "image": "nginx:1.12",
                                "ports": []map[string]interface{}{
                                    {
                                        "name":          "http",
                                        "protocol":      "TCP",
                                        "containerPort": 80,
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
    }

    // Create Deployment
    fmt.Println("Creating deployment...")
    result, err := client.Resource(deploymentRes).Namespace(namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
    if err != nil {
        panic(err)
    }
    fmt.Printf("Created deployment %q.\n", result.GetName())

    // Update Deployment
    prompt()
    fmt.Println("Updating deployment...")
    //    You have two options to Update() this Deployment:
    //
    //    1. Modify the "deployment" variable and call: Update(deployment).
    //       This works like the "kubectl replace" command and it overwrites/loses changes
    //       made by other clients between you Create() and Update() the object.
    //    2. Modify the "result" returned by Get() and retry Update(result) until
    //       you no longer get a conflict error. This way, you can preserve changes made
    //       by other clients between Create() and Update(). This is implemented below
    //           using the retry utility package included with client-go. (RECOMMENDED)
    //
    // More Info:
    // https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency

    retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
        // Retrieve the latest version of Deployment before attempting update
        // RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
        result, getErr := client.Resource(deploymentRes).Namespace(namespace).Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
        if getErr != nil {
            panic(fmt.Errorf("failed to get latest version of Deployment: %v", getErr))
        }

        // update replicas to 1
        if err := unstructured.SetNestedField(result.Object, int64(1), "spec", "replicas"); err != nil {
            panic(fmt.Errorf("failed to set replica value: %v", err))
        }

        // extract spec containers
        containers, found, err := unstructured.NestedSlice(result.Object, "spec", "template", "spec", "containers")
        if err != nil || !found || containers == nil {
            panic(fmt.Errorf("deployment containers not found or error in spec: %v", err))
        }

        // update container[0] image
        if err := unstructured.SetNestedField(containers[0].(map[string]interface{}), "nginx:1.13", "image"); err != nil {
            panic(err)
        }
        if err := unstructured.SetNestedField(result.Object, containers, "spec", "template", "spec", "containers"); err != nil {
            panic(err)
        }

        _, updateErr := client.Resource(deploymentRes).Namespace(namespace).Update(context.TODO(), result, metav1.UpdateOptions{})
        return updateErr
    })
    if retryErr != nil {
        panic(fmt.Errorf("update failed: %v", retryErr))
    }
    fmt.Println("Updated deployment...")

    // List Deployments
    prompt()
    fmt.Printf("Listing deployments in namespace %q:\n", apiv1.NamespaceDefault)
    list, err := client.Resource(deploymentRes).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        panic(err)
    }
    for _, d := range list.Items {
        replicas, found, err := unstructured.NestedInt64(d.Object, "spec", "replicas")
        if err != nil || !found {
            fmt.Printf("Replicas not found for deployment %s: error=%s", d.GetName(), err)
            continue
        }
        fmt.Printf(" * %s (%d replicas)\n", d.GetName(), replicas)
    }

    // Delete Deployment
    prompt()
    fmt.Println("Deleting deployment...")
    deletePolicy := metav1.DeletePropagationForeground
    deleteOptions := metav1.DeleteOptions{
        PropagationPolicy: &deletePolicy,
    }
    if err := client.Resource(deploymentRes).Namespace(namespace).Delete(context.TODO(), "demo-deployment", deleteOptions); err != nil {
        panic(err)
    }

    fmt.Println("Deleted deployment.")
}

func prompt() {
    fmt.Printf("-> Press Return key to continue.")
    scanner := bufio.NewScanner(os.Stdin)
    for scanner.Scan() {
        break
    }
    if err := scanner.Err(); err != nil {
        panic(err)
    }
    fmt.Println()
}
// staging\src\k8s.io\apimachinery\pkg\apis\meta\v1\unstructured\unstructured.go
// Unstructured allows objects that do not have Golang structs registered to be manipulated
// generically. This can be used to deal with the API objects from a plug-in. Unstructured
// objects still have functioning TypeMeta features-- kind, version, etc.
//
// WARNING: This object has accessors for the v1 standard metadata. You *MUST NOT* use this
// type if you are dealing with objects that are not in the server meta v1 schema.
//
// TODO: make the serialization part of this type distinct from the field accessors.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:deepcopy-gen=true
type Unstructured struct {
    // Object is a JSON compatible map with string, float, int, bool, []interface{}, or
    // map[string]interface{}
    // children.
    Object map[string]interface{}
}
  • dynamicResourceClient管理对Resource的Create、Update、Delete、Get、List、Watch、Patch等操作。
// staging\src\k8s.io\client-go\dynamic\simple.go
type dynamicClient struct {
    client *rest.RESTClient
}

var _ Interface = &dynamicClient{}

type dynamicResourceClient struct {
    client    *dynamicClient
    namespace string
    resource  schema.GroupVersionResource
}

func (c *dynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
    return &dynamicResourceClient{client: c, resource: resource}
}

func (c *dynamicResourceClient) Namespace(ns string) ResourceInterface {
    ret := *c
    ret.namespace = ns
    return &ret
}

func (c *dynamicResourceClient) Create(ctx context.Context, obj *unstructured.Unstructured, opts metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) {
    outBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
    if err != nil {
        return nil, err
    }
    name := ""
    if len(subresources) > 0 {
        accessor, err := meta.Accessor(obj)
        if err != nil {
            return nil, err
        }
        name = accessor.GetName()
        if len(name) == 0 {
            return nil, fmt.Errorf("name is required")
        }
    }

    result := c.client.client.
        Post().
        AbsPath(append(c.makeURLSegments(name), subresources...)...).
        Body(outBytes).
        SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
        Do(ctx)
    if err := result.Error(); err != nil {
        return nil, err
    }

    retBytes, err := result.Raw()
    if err != nil {
        return nil, err
    }
    uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
    if err != nil {
        return nil, err
    }
    return uncastObj.(*unstructured.Unstructured), nil
}

func (c *dynamicResourceClient) makeURLSegments(name string) []string {
    url := []string{}
    if len(c.resource.Group) == 0 {
        url = append(url, "api")
    } else {
        url = append(url, "apis", c.resource.Group)
    }
    url = append(url, c.resource.Version)

    if len(c.namespace) > 0 {
        url = append(url, "namespaces", c.namespace)
    }
    url = append(url, c.resource.Resource)

    if len(name) > 0 {
        url = append(url, name)
    }

    return url
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容