V3.0.1
resource pod
apiVersion: v1
kind: Pod
metadata:
annotations:
workflows.argoproj.io/node-name: hobot-dag-20435.step-0
workflows.argoproj.io/template: '{"name":"step-0","inputs":{},"outputs":{},"metadata":{},"resource":{"action":"create","manifest":"apiVersion:
batch.volcano.sh/v1alpha1\nkind: Job\n","successCondition":"status.state.phase
= Completed","failureCondition":"status.state.phase in (Failed, Terminated)"}}'
labels:
workflows.argoproj.io/completed: "false"
workflows.argoproj.io/workflow: hobot-dag-20435
name: hobot-dag-20435-3634668572
namespace: argo
spec:
containers:
- command:
- argoexec
- resource
- create
env:
- name: ARGO_POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: GODEBUG
value: x509ignoreCN=0
- name: ARGO_CONTAINER_NAME
value: main
image: xx/argoproj-argoexec:v3.0.1
name: main
resources: {}
volumeMounts:
- mountPath: /argo/podmetadata
name: podmetadata
restartPolicy: Never
schedulerName: default-scheduler
serviceAccount: argo
serviceAccountName: argo
volumes:
- downwardAPI:
defaultMode: 420
items:
- fieldRef:
apiVersion: v1
fieldPath: metadata.annotations
path: annotations
name: podmetadata
- hostPath:
path: /var/run/docker.sock
type: Socket
name: docker-sock
argoproj/argo-workflows/cmd/argoexec/commands/resource.go
func execResource(ctx context.Context, action string) error {
wfExecutor := initExecutor()
err := wfExecutor.StageFiles()
isDelete := action == "delete"
resourceNamespace, resourceName, err := wfExecutor.ExecResource(
action, common.ExecutorResourceManifestPath, wfExecutor.Template.Resource.Flags,
)
if !isDelete {
err = wfExecutor.WaitResource(ctx, resourceNamespace, resourceName)
err = wfExecutor.SaveResourceParameters(ctx, resourceNamespace, resourceName)
}
return nil
}
argoproj/argo-workflows/cmd/argoexec/commands/root.go
func initExecutor() *executor.WorkflowExecutor {
config, err := clientConfig.ClientConfig()
executorType := os.Getenv(common.EnvVarContainerRuntimeExecutor)
config = restclient.AddUserAgent(config, fmt.Sprintf("argo-workflows/%s executor/%s", version.Version, executorType))
namespace, _, err := clientConfig.Namespace()
clientset, err := kubernetes.NewForConfig(config)
podName, ok := os.LookupEnv(common.EnvVarPodName)
// load resource template from file
tmpl, err := executor.LoadTemplate(podAnnotationsPath)
wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
return &wfExecutor
}
Template file
root@hobot-dag-20435-3634668572:/# cat /argo/podmetadata/annotations
kubernetes.io/config.seen="2021-06-04T14:43:11.934368503+08:00"
kubernetes.io/config.source="api"
workflows.argoproj.io/node-name="hobot-dag-20435.step-0"
workflows.argoproj.io/template="{\"name\":\"step-0\",\"inputs\":{},\"outputs\":{},\"metadata\":{},\"resource\":{\"action\":\"create\",\"manifest\":\"apiVersion: batch.volcano.sh/v1alpha1\\nkind: Job\"successCondition\":\"status.state.phase = Completed\",\"failureCondition\":\"status.state.phase in (Failed, Terminated)\"}}"
argoproj/argo-workflows/workflow/executor/executor.go
// StageFiles will create any files required by script/resource templates
func (we *WorkflowExecutor) StageFiles() error {
var filePath string
var body []byte
switch we.Template.GetType() {
case wfv1.TemplateTypeScript:
log.Infof("Loading script source to %s", common.ExecutorScriptSourcePath)
filePath = common.ExecutorScriptSourcePath
body = []byte(we.Template.Script.Source)
case wfv1.TemplateTypeResource:
log.Infof("Loading manifest to %s", common.ExecutorResourceManifestPath)
filePath = common.ExecutorResourceManifestPath
body = []byte(we.Template.Resource.Manifest)
default:
return nil
}
err := ioutil.WriteFile(filePath, body, 0644)
return nil
}
manifest.yaml
root@hobot-dag-20435-3634668572:/# cat /tmp/manifest.yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
...
status:
state:
lastTransitionTime: null
argoproj/argo-workflows/workflow/executor/resource.go
// ExecResource will run kubectl action against a manifest
func (we *WorkflowExecutor) ExecResource(action string, manifestPath string, flags []string) (string, string, error) {
args, err := we.getKubectlArguments(action, manifestPath, flags)
cmd := exec.Command("kubectl", args...)
out, err := cmd.Output()
return obj.GetNamespace(), resourceName, nil
}
argoproj/argo-workflows/workflow/executor/resource.go
// WaitResource waits for a specific resource to satisfy either the success or failure condition
func (we *WorkflowExecutor) WaitResource(ctx context.Context, resourceNamespace string, resourceName string) error {
// Start the condition result reader using PollImmediateInfinite
// Poll intervall of 5 seconds serves as a backoff intervall in case of immediate result reader failure
err := wait.PollImmediateInfinite(time.Second*5,
func() (bool, error) {
isErrRetry, err := checkResourceState(resourceNamespace, resourceName, successReqs, failReqs)
if err == nil {
log.Infof("Returning from successful wait for resource %s", resourceName)
return true, nil
}
if isErrRetry {
log.Infof("Waiting for resource %s resulted in retryable error %v", resourceName, err)
return false, nil
}
log.Warnf("Waiting for resource %s resulted in non-retryable error %v", resourceName, err)
return false, err
})
if err != nil {
if err == wait.ErrWaitTimeout {
log.Warnf("Waiting for resource %s resulted in timeout due to repeated errors", resourceName)
} else {
log.Warnf("Waiting for resource %s resulted in error %v", resourceName, err)
}
return err
}
return nil
}
// Function to do the kubectl get -w command and then waiting on json reading.
func checkResourceState(resourceNamespace string, resourceName string, successReqs labels.Requirements, failReqs labels.Requirements) (bool, error) {
// kubectl get xxx -w -o json, 异步处理stdout
cmd, reader, err := startKubectlWaitCmd(resourceNamespace, resourceName)
// 处理stdout
for {
if checkIfResourceDeleted(resourceName, resourceNamespace) {
return false, errors.Errorf(errors.CodeNotFound, "Resource %s in namespace %s has been deleted somehow.", resourceName, resourceNamespace)
}
jsonBytes, err := readJSON(reader)
if err != nil {
resultErr := err
_ = cmd.Process.Kill()
return true, resultErr
}
log.Info(string(jsonBytes))
// 检查workload状态是否满足预期
for _, req := range failReqs {
msg := fmt.Sprintf("failure condition '%s' evaluated %v", req, failed)
}
for _, req := range successReqs {
log.Infof("success condition '%s' evaluated %v", req, matched)
}
log.Infof("%d/%d success conditions matched", numMatched, len(successReqs))
}
}
v 3.1
https://hub.fastgit.org/argoproj/argo-workflows/issues/4467
https://hub.fastgit.org/argoproj/argo-workflows/pull/5364
变化点:
从kubectl -w
变成client poll