argo resource

V3.0.1

resource pod

apiVersion: v1
kind: Pod
metadata:
  annotations:
    workflows.argoproj.io/node-name: hobot-dag-20435.step-0
    workflows.argoproj.io/template: '{"name":"step-0","inputs":{},"outputs":{},"metadata":{},"resource":{"action":"create","manifest":"apiVersion:
      batch.volcano.sh/v1alpha1\nkind: Job\n","successCondition":"status.state.phase
      = Completed","failureCondition":"status.state.phase in (Failed, Terminated)"}}'
  labels:
    workflows.argoproj.io/completed: "false"
    workflows.argoproj.io/workflow: hobot-dag-20435
  name: hobot-dag-20435-3634668572
  namespace: argo
spec:
  containers:
  - command:
    - argoexec
    - resource
    - create
    env:
    - name: ARGO_POD_NAME
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: metadata.name
    - name: GODEBUG
      value: x509ignoreCN=0
    - name: ARGO_CONTAINER_NAME
      value: main
    image: xx/argoproj-argoexec:v3.0.1
    name: main
    resources: {}
    volumeMounts:
    - mountPath: /argo/podmetadata
      name: podmetadata
  restartPolicy: Never
  schedulerName: default-scheduler
  serviceAccount: argo
  serviceAccountName: argo
  volumes:
  - downwardAPI:
      defaultMode: 420
      items:
      - fieldRef:
          apiVersion: v1
          fieldPath: metadata.annotations
        path: annotations
    name: podmetadata
  - hostPath:
      path: /var/run/docker.sock
      type: Socket
    name: docker-sock

argoproj/argo-workflows/cmd/argoexec/commands/resource.go

func execResource(ctx context.Context, action string) error {
   wfExecutor := initExecutor()

   err := wfExecutor.StageFiles()

   isDelete := action == "delete"

   resourceNamespace, resourceName, err := wfExecutor.ExecResource(
      action, common.ExecutorResourceManifestPath, wfExecutor.Template.Resource.Flags,
   )

   if !isDelete {
      err = wfExecutor.WaitResource(ctx, resourceNamespace, resourceName)
 
      err = wfExecutor.SaveResourceParameters(ctx, resourceNamespace, resourceName)
   }
   return nil
}

argoproj/argo-workflows/cmd/argoexec/commands/root.go

func initExecutor() *executor.WorkflowExecutor {
   config, err := clientConfig.ClientConfig()
   executorType := os.Getenv(common.EnvVarContainerRuntimeExecutor)
   config = restclient.AddUserAgent(config, fmt.Sprintf("argo-workflows/%s executor/%s", version.Version, executorType))

   namespace, _, err := clientConfig.Namespace()

   clientset, err := kubernetes.NewForConfig(config)

   podName, ok := os.LookupEnv(common.EnvVarPodName)
   
   // load resource template from file
   tmpl, err := executor.LoadTemplate(podAnnotationsPath)

   wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
   
   return &wfExecutor
}

Template file

root@hobot-dag-20435-3634668572:/# cat /argo/podmetadata/annotations
kubernetes.io/config.seen="2021-06-04T14:43:11.934368503+08:00"
kubernetes.io/config.source="api"
workflows.argoproj.io/node-name="hobot-dag-20435.step-0"
workflows.argoproj.io/template="{\"name\":\"step-0\",\"inputs\":{},\"outputs\":{},\"metadata\":{},\"resource\":{\"action\":\"create\",\"manifest\":\"apiVersion: batch.volcano.sh/v1alpha1\\nkind: Job\"successCondition\":\"status.state.phase = Completed\",\"failureCondition\":\"status.state.phase in (Failed, Terminated)\"}}"

argoproj/argo-workflows/workflow/executor/executor.go

// StageFiles will create any files required by script/resource templates
func (we *WorkflowExecutor) StageFiles() error {
   var filePath string
   var body []byte
   switch we.Template.GetType() {
   case wfv1.TemplateTypeScript:
      log.Infof("Loading script source to %s", common.ExecutorScriptSourcePath)
      filePath = common.ExecutorScriptSourcePath
      body = []byte(we.Template.Script.Source)
   case wfv1.TemplateTypeResource:
      log.Infof("Loading manifest to %s", common.ExecutorResourceManifestPath)
      filePath = common.ExecutorResourceManifestPath
      body = []byte(we.Template.Resource.Manifest)
   default:
      return nil
   }
   err := ioutil.WriteFile(filePath, body, 0644)
   return nil
}

manifest.yaml

root@hobot-dag-20435-3634668572:/# cat /tmp/manifest.yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
...
status:
  state:
    lastTransitionTime: null

argoproj/argo-workflows/workflow/executor/resource.go

// ExecResource will run kubectl action against a manifest
func (we *WorkflowExecutor) ExecResource(action string, manifestPath string, flags []string) (string, string, error) {
   args, err := we.getKubectlArguments(action, manifestPath, flags)

   cmd := exec.Command("kubectl", args...)

   out, err := cmd.Output()
   
   return obj.GetNamespace(), resourceName, nil
}

argoproj/argo-workflows/workflow/executor/resource.go

// WaitResource waits for a specific resource to satisfy either the success or failure condition
func (we *WorkflowExecutor) WaitResource(ctx context.Context, resourceNamespace string, resourceName string) error {
   // Start the condition result reader using PollImmediateInfinite
   // Poll intervall of 5 seconds serves as a backoff intervall in case of immediate result reader failure
   err := wait.PollImmediateInfinite(time.Second*5,
      func() (bool, error) {
         isErrRetry, err := checkResourceState(resourceNamespace, resourceName, successReqs, failReqs)

         if err == nil {
            log.Infof("Returning from successful wait for resource %s", resourceName)
            return true, nil
         }

         if isErrRetry {
            log.Infof("Waiting for resource %s resulted in retryable error %v", resourceName, err)
            return false, nil
         }

         log.Warnf("Waiting for resource %s resulted in non-retryable error %v", resourceName, err)
         return false, err
      })
   if err != nil {
      if err == wait.ErrWaitTimeout {
         log.Warnf("Waiting for resource %s resulted in timeout due to repeated errors", resourceName)
      } else {
         log.Warnf("Waiting for resource %s resulted in error %v", resourceName, err)
      }
      return err
   }

   return nil
}
// Function to do the kubectl get -w command and then waiting on json reading.
func checkResourceState(resourceNamespace string, resourceName string, successReqs labels.Requirements, failReqs labels.Requirements) (bool, error) {
   // kubectl get xxx -w -o json, 异步处理stdout
   cmd, reader, err := startKubectlWaitCmd(resourceNamespace, resourceName)
    
   // 处理stdout
   for {
      if checkIfResourceDeleted(resourceName, resourceNamespace) {
         return false, errors.Errorf(errors.CodeNotFound, "Resource %s in namespace %s has been deleted somehow.", resourceName, resourceNamespace)
      }

      jsonBytes, err := readJSON(reader)
      if err != nil {
         resultErr := err
         _ = cmd.Process.Kill()
         return true, resultErr
      }

      log.Info(string(jsonBytes))
      // 检查workload状态是否满足预期
      for _, req := range failReqs {
         msg := fmt.Sprintf("failure condition '%s' evaluated %v", req, failed)
      }

      for _, req := range successReqs {
         log.Infof("success condition '%s' evaluated %v", req, matched)
      }
      log.Infof("%d/%d success conditions matched", numMatched, len(successReqs))
   }
}

v 3.1

https://hub.fastgit.org/argoproj/argo-workflows/issues/4467
https://hub.fastgit.org/argoproj/argo-workflows/pull/5364
变化点:
kubectl -w变成client poll

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容