kubernetes (k8s) csi 插件开发简介 //www.greatytc.com/p/88ec8cba7507
kubernetes(k8s) csi 插件attach-detach流程 //www.greatytc.com/p/5c6e78b6b320
简介
因为k8s csi plugin的工作流程属于out-tree,所以k8s额外使用了辅助容器来与k8s组件通信,本篇主要分析卷在挂载/卸载时候的代码调用流程。
CreateVolume +------------+ DeleteVolume
+------------->| CREATED +--------------+
| +---+----+---+ |
| Controller | | Controller v
+++ Publish | | Unpublish +++
|X| Volume | | Volume | |
+-+ +---v----+---+ +-+
| NODE_READY |
+---+----^---+
Node | | Node
Publish | | Unpublish
Volume | | Volume
+---v----+---+
| PUBLISHED |
+------------+
Figure 5: The lifecycle of a dynamically provisioned volume, from
creation to destruction.
以这个挂载图为例, CreateVolume方法对应创建卷, ControllerPublishVolume代表attach卷到对应kubelet节点, 而NodePublishVolume对应mount卷到对应目录, NodeUnpublishVolume 代表unmount方法, ControllerUnpublishVolume 代表detach流程. 需要注意的是, attach和detach都是以节点为单位, 并不能具体到pod上.
kubernetes源码
以下代码都在k8s源码中:
https://github.com/kubernetes/kubernetes.git
CSI Plugin
首先看kubernetes代码中csi相关的代码。
path | usage |
---|---|
pkg/volume/csi | manager CSI plugin |
csiPlugin源码
- csiPlugin结构体
type csiPlugin struct {
# host用来与kubelet对接
host volume.VolumeHost
blockEnabled bool
# 用来列出对应节点的CSIDriver
csiDriverLister csilister.CSIDriverLister
#用来调用infomer和CSIDriverLister
csiDriverInformer csiinformer.CSIDriverInformer
}
- 加载csiPlugin
ProbeVolumePlugins函数是k8s组件用来加载对应的csiPlugin的函数,调用函数会返回一个空的csiPlugin.
// ProbeVolumePlugins returns implemented plugins
func ProbeVolumePlugins() []volume.VolumePlugin {
p := &csiPlugin{
host: nil,
blockEnabled: utilfeature.DefaultFeatureGate.Enabled(features.CSIBlockVolume),
}
return []volume.VolumePlugin{p}
}
- 初始化csiPlugin
# 传入对应的VolumeHost
func (p *csiPlugin) Init(host volume.VolumeHost) error {
p.host = host
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
csiClient := host.GetCSIClient()
if csiClient == nil {
klog.Warning("The client for CSI Custom Resources is not available, skipping informer initialization")
} else {
// Start informer for CSIDrivers.
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
p.csiDriverLister = p.csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)
}
}
// Initializing csiDrivers map and label management channels
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)
// TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver
// objects for migrated drivers.
return nil
}
- NewAttacher函数
NewAttacher函数返回csiAttacher实例
func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
k8s := p.host.GetKubeClient()
if k8s == nil {
klog.Error(log("unable to get kubernetes client from host"))
return nil, errors.New("unable to get Kubernetes client")
}
return &csiAttacher{
plugin: p,
k8s: k8s,
waitSleepTime: 1 * time.Second,
}, nil
}
- NewDetacher函数
同NewAttacher函数,也返回csiAttacher实例
- NewMounter函数
返回csiMountMgr实例
func (p *csiPlugin) NewMounter(
spec *volume.Spec,
pod *api.Pod,
_ volume.VolumeOptions) (volume.Mounter, error) {
...
mounter := &csiMountMgr{
plugin: p,
k8s: k8s,
spec: spec,
pod: pod,
podUID: pod.UID,
driverName: csiDriverName(pvSource.Driver),
volumeID: pvSource.VolumeHandle,
specVolumeID: spec.Name(),
csiClient: csi,
readOnly: readOnly,
}
...
return mounter, nil
}
- NewUnmounter方法
同上,也返回csiMountMgr实例
csiAttacher
- Attach
Attach方法负责创建VolumeAttachment
func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
...
# 创建VolumeAttachment
node := string(nodeName)
pvName := spec.PersistentVolume.GetName()
# 需要注意的是attachID只跟卷名称, driver名称, node名称相关, 所以attach只能以节点
# 为单位, 不能以pod为单位.
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, node)
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: node,
Attacher: csiSource.Driver,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
}
...
# 判断attach成功的条件是attachment.Status.Attached为True
if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil {
return "", err
}
klog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment object [%s]", attachID))
// TODO(71164): In 1.15, return empty devicePath
return attachID, nil
}
- Detach
Detach方法负责删除VolumeAttachment
func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
if volumeName == "" {
klog.Error(log("detacher.Detach missing value for parameter volumeName"))
return errors.New("missing expected parameter volumeName")
}
parts := strings.Split(volumeName, volNameSep)
if len(parts) != 2 {
klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
return errors.New("volumeName missing expected data")
}
driverName := parts[0]
volID := parts[1]
attachID := getAttachmentName(volID, driverName, string(nodeName))
# 删除VolumeAttachment
if err := c.k8s.StorageV1beta1().VolumeAttachments().Delete(attachID, nil); err != nil {
if apierrs.IsNotFound(err) {
// object deleted or never existed, done
klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volID))
return nil
}
klog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
return err
}
klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
# 等待VolumeAttachment被删除
return c.waitForVolumeDetachment(volID, attachID)
}
VolumeAttachMent的名称只与卷的名称, driver的名称, 节点的名称有关系. 所以我们在实现csi driver的时候, 切记ControllerPublishVolume和ControllerUnpublishVolume的实现只与node有关与pod无关.
func getAttachmentName(volName, csiDriverName, nodeName string) string {
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
return fmt.Sprintf("csi-%x", result)
}
csiMountMgr
- SetUp
SetUp 方法调用SetUpAt方法调用csi driver的NodePublishVolume方法.
func (c *csiMountMgr) SetUp(fsGroup *int64) error {
return c.SetUpAt(c.GetPath(), fsGroup)
}
SetUpAt中进行一些预处理, 然后传递参数到NodePublishVolume
func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
klog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
# 判断是否挂载
mounted, err := isDirMounted(c.plugin, dir)
if err != nil {
klog.Error(log("mounter.SetUpAt failed while checking mount status for dir [%s]", dir))
return err
}
# 如果挂载, 则返回
if mounted {
klog.V(4).Info(log("mounter.SetUpAt skipping mount, dir already mounted [%s]", dir))
return nil
}
# 获取卷中的spec.PersistentVolume.Spec.CSI
csiSource, err := getCSISourceFromSpec(c.spec)
if err != nil {
klog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
return err
}
csi := c.csiClient
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
// Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so
...
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
# 如果csi客户端没有获取到csi相关的上下文, 那么获取卷对应的VolumeAttachMent,
# 并从中获取attachment.Status.AttachmentMetadata
if c.publishContext == nil {
nodeName := string(c.plugin.host.GetNodeName())
c.publishContext, err = c.plugin.getPublishContext(c.k8s, c.volumeID, string(c.driverName), nodeName)
if err != nil {
return err
}
}
# csiSource.VolumeAttributes对应的是CreateVolume方法返回的csi.CreateVolumeResponse中
# 的VolumeContext
attribs := csiSource.VolumeAttributes
nodePublishSecrets := map[string]string{}
if csiSource.NodePublishSecretRef != nil {
nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodePublishSecretRef)
if err != nil {
return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v",
csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err)
}
}
// create target_dir before call to NodePublish
# 创建挂载目标目录
if err := os.MkdirAll(dir, 0750); err != nil {
klog.Error(log("mouter.SetUpAt failed to create dir %#v: %v", dir, err))
return err
}
klog.V(4).Info(log("created target path successfully [%s]", dir))
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
# 此处的accessMode是在卷定义中获取
accessMode := api.ReadWriteOnce
if c.spec.PersistentVolume.Spec.AccessModes != nil {
accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
}
// Inject pod information into volume_attributes
podAttrs, err := c.podAttributes()
...
fsType := csiSource.FSType
err = csi.NodePublishVolume(
ctx,
c.volumeID,
c.readOnly,
deviceMountPath,
dir,
accessMode,
c.publishContext,
attribs,
nodePublishSecrets,
fsType,
c.spec.PersistentVolume.Spec.MountOptions,
)
if err != nil {
klog.Errorf(log("mounter.SetupAt failed: %v", err))
if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
}
return err
}
// apply volume ownership
// The following logic is derived from https://github.com/kubernetes/kubernetes/issues/66323
// if fstype is "", then skip fsgroup (could be indication of non-block filesystem)
// if fstype is provided and pv.AccessMode == ReadWriteOnly, then apply fsgroup
err = c.applyFSGroup(fsType, fsGroup)
if err != nil {
// attempt to rollback mount.
fsGrpErr := fmt.Errorf("applyFSGroup failed for vol %s: %v", c.volumeID, err)
if unpubErr := csi.NodeUnpublishVolume(ctx, c.volumeID, dir); unpubErr != nil {
klog.Error(log("NodeUnpublishVolume failed for [%s]: %v", c.volumeID, unpubErr))
return fsGrpErr
}
if unmountErr := removeMountDir(c.plugin, dir); unmountErr != nil {
klog.Error(log("removeMountDir failed for [%s]: %v", dir, unmountErr))
return fsGrpErr
}
return fsGrpErr
}
klog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))
return nil
}
目前来看k8s并没有提供卷的AccessMode与csi driver支持的VolumeCapabilityAccessModes之间的逻辑关系, 只是做了一个简单的转化, 把卷定义中的AccessMode转成对应的类型, 所以具体的逻辑还需要我们在csi driver中进行实现.
- TearDown
TearDown 方法调用TearDownAt方法调用csi driver的NodeUnpublishVolume方法.
func (c *csiMountMgr) TearDown() error {
return c.TearDownAt(c.GetPath())
}
kube-controller-manager源码
path | usage |
---|---|
pkg/controller | contains code for controllers |
- attachDetachController
kube-controller-manager用attachDetachController管理卷的attach和detach,主要逻辑在以下函数中:
path | usage |
---|---|
pkg/controller/volume/attachdetach/reconciler/reconciler.go | manager attach/detach volume |
func (rc *reconciler) reconcile() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
// Ensure volumes that should be detached are detached.
# 遍历已经attach到节点上的卷
# actualStateOfWorld代表实际的卷与节点的对应关系
# desiredStateOfWorld代表定义的卷与节点及pod的对应关系
for _, attachedVolume := range
rc.actualStateOfWorld.GetAttachedVolumes() {
# 如果该卷不再需要,则进行detach
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
...
# DetachVolume方法最终调用plugin的NewDetacher函数,最后调用返回
# 的csiAttacher的Detach方法
err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
...
}
}
}
# attach对应的卷
rc.attachDesiredVolumes()
// Update Node Status
err := rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
klog.Warningf("UpdateNodeStatuses failed with: %v", err)
}
}
# attach卷
func (rc *reconciler) attachDesiredVolumes() {
// Ensure volumes that should be attached are attached.
# GetVolumesToAttach获取需要attach到节点上的卷
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
# 判断需求中的卷是否实际中已经存在
if rc.actualStateOfWorld.VolumeNodeExists(volumeToAttach.VolumeName, volumeToAttach.NodeName) {
// Volume/Node exists, touch it to reset detachRequestedTime
if klog.V(5) {
klog.Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))
}
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
continue
}
// Don't even try to start an operation if there is already one running
# 如果对应的卷处在pending状态, 说明对应卷的操作正在执行, 跳过本次处理.
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {
if klog.V(10) {
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
continue
}
# 根据卷的属性中的accessModes判断是否可以attach到多个节点上,
# 比如ReadWriteOnce的卷已经attach到一个节点, 这时再想挂载到其他节点则会失败.
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
if len(nodes) > 0 {
if !volumeToAttach.MultiAttachErrorReported {
rc.reportMultiAttachError(volumeToAttach, nodes)
rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
continue
}
}
// Volume/Node doesn't exist, spawn a goroutine to attach it
if klog.V(5) {
klog.Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", ""))
}
# AttachVolume方法最终调用plugin的NewAttacher函数,最后调用返回
# 的csiAttacher的Attach方法
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
if err == nil {
klog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", ""))
}
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
}
}
}
kuebelet源码
path | usage |
---|---|
pkg/kubelet | contains the libraries that drive the Kubelet binary |
- volumemanager
kubelet对卷的处理也在reconcile函数中,注意kubelet和kube-controller-manager都有各自的reconciler,actualStateOfWorld,desiredStateOfWorld定义,不要混淆。
path | usage |
---|---|
pkg/kubelet/volumemanager/reconciler/reconciler.go | Mainly used to manage the mounting and unmounting of volumes |
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
// Ensure volumes that should be unmounted are unmounted.
# GetMountedVolumes返回的是成功mount到pod上的卷
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
# 如果实际挂载的卷不需要被挂载,卸载卷
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
# UnmountVolume调用plugin的NewUnmounter创建实例,并调用TearDown方法
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
...
}
}
# GetVolumesToMount返回需要attach到节点并挂载到pod上的卷
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
# 如果需要挂载的卷还没有被挂载
if cache.IsVolumeNotAttachedError(err) {
# controllerAttachDetachEnabled为true的时候,一般为true
#
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
# 判断卷的状态是否是attach到node上
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
...
} else {
# 不使用controller的attach/detach controller,kubelet 直接调用plugin去attach
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
...
}
} else if !volMounted || cache.IsRemountRequiredError(err) {
# attach到node上的卷下次循环中会进入这个分支重新挂载
# 重新挂载
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
isRemount := cache.IsRemountRequiredError(err)
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
#
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
...
} else if cache.IsFSResizeRequiredError(err) &&
# volume need resize
utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandVolumeFSWithoutUnmounting", ""))
err := rc.operationExecutor.ExpandVolumeFSWithoutUnmounting(
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
...
}
}
# GetUnmountedVolumes返回的attach但是没有挂载到任何pod上的卷
// Ensure devices that should be detached/unmounted are detached/unmounted.
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
if attachedVolume.GloballyMounted {
// Volume is globally mounted to device, unmount it
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
...
} else {
// Volume is attached to node, detach it
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
# 等待controller detach
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
} else {
// Only detach if kubelet detach is enabled
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
...
}
}
}
}
}
external-attacher源码
https://github.com/kubernetes-csi/external-attacher.git
在attach/detach的时候external-attacher主要用来监控VolumeAttachment并调用csi driver中的相关方法
- CSIAttachController
path | usage |
---|---|
pkg/controller/controller.go | attaches / detaches CSI volumes using provided Handler interface |
这个controller负责监控VolumeAttachment并调用csi driver的相关方法。
// CSIAttachController is a controller that attaches / detaches CSI volumes using provided Handler interface
type CSIAttachController struct {
client kubernetes.Interface
attacherName string
handler Handler
eventRecorder record.EventRecorder
vaQueue workqueue.RateLimitingInterface
pvQueue workqueue.RateLimitingInterface
vaLister storagelisters.VolumeAttachmentLister
vaListerSynced cache.InformerSynced
pvLister corelisters.PersistentVolumeLister
pvListerSynced cache.InformerSynced
}
- SyncNewOrUpdatedVolumeAttachment
根据VolumeAttachment的状态来调用csi driver,并更新VolumeAttachment的状态。
func (h *csiHandler) SyncNewOrUpdatedVolumeAttachment(va *storage.VolumeAttachment) {
glog.V(4).Infof("CSIHandler: processing VA %q", va.Name)
var err error
if va.DeletionTimestamp == nil {
err = h.syncAttach(va)
} else {
err = h.syncDetach(va)
}
...
}
- syncAttach
attach操作会调用csi driver的ControllerPublishVolume方法
func (h *csiHandler) syncAttach(va *storage.VolumeAttachment) error {
if va.Status.Attached {
// Volume is attached, there is nothing to be done.
glog.V(4).Infof("%q is already attached", va.Name)
return nil
}
// Attach and report any error
glog.V(2).Infof("Attaching %q", va.Name)
# csiAttach最终调用csi driver的ControllerPublishVolume方法
va, metadata, err := h.csiAttach(va)
...
glog.V(2).Infof("Attached %q", va.Name)
// Mark as attached
if _, err := markAsAttached(h.client, va, metadata); err != nil {
return fmt.Errorf("failed to mark as attached: %s", err)
}
glog.V(4).Infof("Fully attached %q", va.Name)
return nil
}
- syncDetach
syncDetach会调用csi driver的ControllerUnpublishVolume,然后把VolumeAttachment的状态置为detach
结论
以这个卷的挂载流程来说明下各部分都是如何工作的:
CreateVolume +------------+ DeleteVolume
+------------->| CREATED +--------------+
| +---+----+---+ |
| Controller | | Controller v
+++ Publish | | Unpublish +++
|X| Volume | | Volume | |
+-+ +---v----+---+ +-+
| NODE_READY |
+---+----^---+
Node | | Node
Publish | | Unpublish
Volume | | Volume
+---v----+---+
| PUBLISHED |
+------------+
- 挂载卷的过程:
- kube-controller-manager调用csi driver plugin来创建VolumeAttachment
- external-attach 监控VolumeAttachment并调用csi driver的ControllerPublishVolume方法,根据返回值更改VolumeAttachment的状态为attach
- kubelet 判断卷的状态是否为attach, 如果是则调用csi driver plugin的NodePublishVolume来进行挂载.
- 卸载卷:
- kubelet 把未挂载到pod(pod被删除)上的卷调用csi driver plugin的NodeUnpublishVolume方法解绑
- kube-controller-manager 对attach到node但是没有使用的卷进行detach
- external-attach调用csi driver plugin的ControllerUnpublishVolume进行detach
ps:
1.一个VolumeAttachment对应一个绑定关系,所以如果是ReadWriteOnce,那其他VolumeAttachment创建不成功,
这部分逻辑是由kube-controller-manager在attach的时候获取持久卷的accessModes和卷已挂载的节点个数来判断.
2. VolumeAttachment的ID只与卷名称, csi driver名称, node名称相关. 所以确保attach和detach只与node有关,与pod无关.
3. 对于卷的accessModes, k8s在处理的时候都是以卷的accessModes为主, 并没有根据csi driver支持的accessModes进行判断,
所以需要我们在代码中进行判断并处理.
3.当kubelet服务出现问题的时候,此时k8s会调度删除此节点上的pod,但是删除的时候会因为kubelet出问题而卡住。
此时kube-controller-manager的desiredStateOfWorld中依然存在这个卷,对应的VolumeAttachment就不会被删除,卷也不会被重新挂载。
当kubelet重启的时候,pod会被彻底删除,kube-controller-manager会调用卷的detach,也就是detach首先完成。
而kubelet重新启动的时候会重新加载actualStateOfWorld和desiredStateOfWorld,
因为pod被删除,所以kubelet的actualStateOfWorld没有pod的挂载信息,后续不会对这个卷进行操作,导致上次的挂载信息依然存在。