1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
|
func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
}()
// key 的结构一般是 namespace/name 的格式
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false, err
}
if len(ns) == 0 || len(name) == 0 {
return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
}
// 这里叫 sharedJob 是因为取的是本地 cache 的 job,通过 Indexer 提供的能力
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {
// 如果找不到,说明被其他 goroutine 删掉了,忽略
if apierrors.IsNotFound(err) {
klog.V(4).Infof("Job has been deleted: %v", key)
jm.expectations.DeleteExpectations(key)
jm.finalizerExpectations.deleteExpectations(key)
return true, nil
}
return false, err
}
// 拷贝一份避免修改
job := *sharedJob.DeepCopy()
// 通过 JobCondition 的 Type 是否为 "Complete"/"Failed" 来判断 job 是否已经完成了
if IsJobFinished(&job) {
return true, nil
}
// 这个 feature 是 1.22 版本进入 beta 的,如果两边配置不一致,则无法继续处理
// 本质通过 .Spec.CompletionMode == "Indexed" 来判断
if !feature.DefaultFeatureGate.Enabled(features.IndexedJob) && isIndexedJob(&job) {
jm.recorder.Event(&job, v1.EventTypeWarning, "IndexedJobDisabled", "Skipped Indexed Job sync because feature is disabled.")
return false, nil
}
// CompletionMode 为 "NonIndexed"/"Indexed",如果是其他值则不识别
if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode != batch.NonIndexedCompletion && *job.Spec.CompletionMode != batch.IndexedCompletion {
jm.recorder.Event(&job, v1.EventTypeWarning, "UnknownCompletionMode", "Skipped Job sync because completion mode is unknown")
return false, nil
}
// 配置当前的 completionMode,默认为 "NonIndexed"
completionMode := string(batch.NonIndexedCompletion)
if isIndexedJob(&job) {
completionMode = string(batch.IndexedCompletion)
}
// "reconciling"
action := metrics.JobSyncActionReconciling
// metrics 逻辑
defer func() {
result := "success"
if rErr != nil {
result = "error"
}
metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(time.Since(startTime).Seconds())
metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
}()
var expectedRmFinalizers sets.String
var uncounted *uncountedTerminatedPods
// 处理 pod finalizer,1.22 版本 alpha 的特性
if trackingUncountedPods(&job) {
klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job))
if job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
}
uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
// 删除 job 的 "batch.kubernetes.io/job-tracking" 注解
} else if patch := removeTrackingAnnotationPatch(&job); patch != nil {
if err := jm.patchJobHandler(&job, patch); err != nil {
return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err)
}
}
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
// 提取相关 pods
pods, err := jm.getPodsForJob(&job, uncounted != nil)
if err != nil {
return false, err
}
// 判断依据是 PodPhase 不为 "Succeeded" 和 "Failed" 两个结果态
activePods := controller.FilterActivePods(pods)
active := int32(len(activePods))
// 计算 "Succeeded" 和 "Failed" 状态 pod 的数量
succeeded, failed := getStatus(&job, pods, uncounted, expectedRmFinalizers)
// 满足这个条件说明这个 pod 是新创建的,这时候需要设置 .Status.StartTime
if job.Status.StartTime == nil && !jobSuspended(&job) {
now := metav1.Now()
job.Status.StartTime = &now
// 如果 ActiveDeadlineSeconds 不为空,则在 ActiveDeadlineSeconds 时间到后再次调谐
if job.Spec.ActiveDeadlineSeconds != nil {
klog.V(4).Infof("Job %s has ActiveDeadlineSeconds will sync after %d seconds",
key, *job.Spec.ActiveDeadlineSeconds)
jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
}
}
var manageJobErr error
var finishedCondition *batch.JobCondition
// 有新增 failed 到 pod
jobHasNewFailure := failed > job.Status.Failed
// 有新的 failed pod 产生,而且 active 的 pod 数量不等于并发数,而且已经失败的 pod 数量大于重试次数限制
exceedsBackoffLimit := jobHasNewFailure && (active != *job.Spec.Parallelism) &&
(failed > *job.Spec.BackoffLimit)
// pastBackoffLimitOnFailure 计算的是当 pod 重启策略为 OnFailure 时重启次数是否超过限制
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// 重试次数达到上限,Condition 更新为 "Failed"
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit")
// 超时了
} else if pastActiveDeadline(&job) {
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline")
}
// 计算索引
var prevSucceededIndexes, succeededIndexes orderedIntervals
if isIndexedJob(&job) {
prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(&job, pods)
succeeded = int32(succeededIndexes.total())
}
suspendCondChanged := false
// 如果 job 失败了,这时候在 Active 状态的 Pod 需要直接删除
if finishedCondition != nil {
deleted, err := jm.deleteActivePods(&job, activePods)
if uncounted == nil {
deleted = active
} else if deleted != active {
finishedCondition = nil
}
active -= deleted
failed += deleted
manageJobErr = err
} else {
manageJobCalled := false
if jobNeedsSync && job.DeletionTimestamp == nil {
// manageJob() 方法是根据 Spec 管理运行中 Pod 数量的核心方法
active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, succeededIndexes)
manageJobCalled = true
}
// 判断 job 已经完成
complete := false
if job.Spec.Completions == nil {
complete = succeeded > 0 && active == 0
} else {
complete = succeeded >= *job.Spec.Completions && active == 0
}
if complete {
finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "")
// Job 挂起是 1.22 版本 beta 的新特性
} else if feature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled {
// 如果配置了挂起
if job.Spec.Suspend != nil && *job.Spec.Suspend {
// 只有没完成的 Job 可以被挂起
var isUpdated bool
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended")
if isUpdated {
suspendCondChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
}
} else {
// 挂起状态唤醒
var isUpdated bool
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed")
if isUpdated {
suspendCondChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
now := metav1.Now()
// 重置 StartTime
job.Status.StartTime = &now
}
}
}
}
forget = false
// 检查成功的 pod 是否多了
if job.Status.Succeeded < succeeded {
forget = true
}
if uncounted != nil {
// 挂起状态变更或者 active pod 数量变更
needsStatusUpdate := suspendCondChanged || active != job.Status.Active
job.Status.Active = active
// Finalizer 相关逻辑
err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate)
if err != nil {
return false, fmt.Errorf("tracking status: %w", err)
}
jobFinished := IsJobFinished(&job)
if jobHasNewFailure && !jobFinished {
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
forget = true
return forget, manageJobErr
}
// 移除所有 Finalizer
if err := jm.removeTrackingFinalizersFromAllPods(pods); err != nil {
return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err)
}
// 判断状态是否需要更新
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || suspendCondChanged || finishedCondition != nil {
job.Status.Active = active
job.Status.Succeeded = succeeded
job.Status.Failed = failed
if isIndexedJob(&job) {
job.Status.CompletedIndexes = succeededIndexes.String()
}
job.Status.UncountedTerminatedPods = nil
jm.enactJobFinished(&job, finishedCondition)
if _, err := jm.updateStatusHandler(&job); err != nil {
return forget, err
}
if jobHasNewFailure && !IsJobFinished(&job) {
// returning an error will re-enqueue Job after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
forget = true
}
return forget, manageJobErr
}
|