···185185 return err
186186}
187187188188-// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
189189-// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
190190-func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
191191- // Find all runs in the specified repository, reference, and workflow with non-final status
192192- runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
193193- RepoID: repoID,
194194- Ref: ref,
195195- WorkflowID: workflowID,
196196- TriggerEvent: event,
197197- Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
198198- })
199199- if err != nil {
200200- return err
201201- }
202202-203203- // If there are no runs found, there's no need to proceed with cancellation, so return nil.
204204- if total == 0 {
205205- return nil
206206- }
207207-208208- // Iterate over each found run and cancel its associated jobs.
209209- for _, run := range runs {
210210- // Find all jobs associated with the current run.
211211- jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
212212- RunID: run.ID,
213213- })
214214- if err != nil {
215215- return err
216216- }
217217-218218- // Iterate over each job and attempt to cancel it.
219219- for _, job := range jobs {
220220- // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
221221- status := job.Status
222222- if status.IsDone() {
223223- continue
224224- }
225225-226226- // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
227227- if job.TaskID == 0 {
228228- job.Status = StatusCancelled
229229- job.Stopped = timeutil.TimeStampNow()
230230-231231- // Update the job's status and stopped time in the database.
232232- n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
233233- if err != nil {
234234- return err
235235- }
236236-237237- // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
238238- if n == 0 {
239239- return fmt.Errorf("job has changed, try again")
240240- }
241241-242242- // Continue with the next job.
243243- continue
244244- }
245245-246246- // If the job has an associated task, try to stop the task, effectively cancelling the job.
247247- if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
248248- return err
249249- }
250250- }
251251- }
252252-253253- // Return nil to indicate successful cancellation of all running and waiting jobs.
254254- return nil
255255-}
256256-257188// InsertRun inserts a run
258189// The title will be cut off at 255 characters if it's longer than 255 characters.
259190func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
-22
models/actions/schedule.go
···5566import (
77 "context"
88- "fmt"
98 "time"
1091110 "forgejo.org/models/db"
···117116 }
118117119118 return committer.Commit()
120120-}
121121-122122-func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error {
123123- // If actions disabled when there is schedule task, this will remove the outdated schedule tasks
124124- // There is no other place we can do this because the app.ini will be changed manually
125125- if err := DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
126126- return fmt.Errorf("DeleteCronTaskByRepo: %v", err)
127127- }
128128- if cancelPreviousJobs {
129129- // cancel running cron jobs of this repository and delete old schedules
130130- if err := CancelPreviousJobs(
131131- ctx,
132132- repo.ID,
133133- repo.DefaultBranch,
134134- "",
135135- webhook_module.HookEventSchedule,
136136- ); err != nil {
137137- return fmt.Errorf("CancelPreviousJobs: %v", err)
138138- }
139139- }
140140- return nil
141119}
142120143121type FindScheduleOptions struct {
-143
models/actions/task.go
···1717 "forgejo.org/modules/timeutil"
1818 "forgejo.org/modules/util"
19192020- runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
2120 lru "github.com/hashicorp/golang-lru/v2"
2221 "github.com/nektos/act/pkg/jobparser"
2323- "google.golang.org/protobuf/types/known/timestamppb"
2422 "xorm.io/builder"
2523)
2624···337335 return err
338336}
339337340340-// UpdateTaskByState updates the task by the state.
341341-// It will always update the task if the state is not final, even there is no change.
342342-// So it will update ActionTask.Updated to avoid the task being judged as a zombie task.
343343-func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.TaskState) (*ActionTask, error) {
344344- stepStates := map[int64]*runnerv1.StepState{}
345345- for _, v := range state.Steps {
346346- stepStates[v.Id] = v
347347- }
348348-349349- ctx, commiter, err := db.TxContext(ctx)
350350- if err != nil {
351351- return nil, err
352352- }
353353- defer commiter.Close()
354354-355355- e := db.GetEngine(ctx)
356356-357357- task := &ActionTask{}
358358- if has, err := e.ID(state.Id).Get(task); err != nil {
359359- return nil, err
360360- } else if !has {
361361- return nil, util.ErrNotExist
362362- } else if runnerID != task.RunnerID {
363363- return nil, fmt.Errorf("invalid runner for task")
364364- }
365365-366366- if task.Status.IsDone() {
367367- // the state is final, do nothing
368368- return task, nil
369369- }
370370-371371- // state.Result is not unspecified means the task is finished
372372- if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
373373- task.Status = Status(state.Result)
374374- task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
375375- if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
376376- return nil, err
377377- }
378378- if _, err := UpdateRunJob(ctx, &ActionRunJob{
379379- ID: task.JobID,
380380- Status: task.Status,
381381- Stopped: task.Stopped,
382382- }, nil); err != nil {
383383- return nil, err
384384- }
385385- } else {
386386- // Force update ActionTask.Updated to avoid the task being judged as a zombie task
387387- task.Updated = timeutil.TimeStampNow()
388388- if err := UpdateTask(ctx, task, "updated"); err != nil {
389389- return nil, err
390390- }
391391- }
392392-393393- if err := task.LoadAttributes(ctx); err != nil {
394394- return nil, err
395395- }
396396-397397- for _, step := range task.Steps {
398398- var result runnerv1.Result
399399- if v, ok := stepStates[step.Index]; ok {
400400- result = v.Result
401401- step.LogIndex = v.LogIndex
402402- step.LogLength = v.LogLength
403403- step.Started = convertTimestamp(v.StartedAt)
404404- step.Stopped = convertTimestamp(v.StoppedAt)
405405- }
406406- if result != runnerv1.Result_RESULT_UNSPECIFIED {
407407- step.Status = Status(result)
408408- } else if step.Started != 0 {
409409- step.Status = StatusRunning
410410- }
411411- if _, err := e.ID(step.ID).Update(step); err != nil {
412412- return nil, err
413413- }
414414- }
415415-416416- if err := commiter.Commit(); err != nil {
417417- return nil, err
418418- }
419419-420420- return task, nil
421421-}
422422-423423-func StopTask(ctx context.Context, taskID int64, status Status) error {
424424- if !status.IsDone() {
425425- return fmt.Errorf("cannot stop task with status %v", status)
426426- }
427427- e := db.GetEngine(ctx)
428428-429429- task := &ActionTask{}
430430- if has, err := e.ID(taskID).Get(task); err != nil {
431431- return err
432432- } else if !has {
433433- return util.ErrNotExist
434434- }
435435- if task.Status.IsDone() {
436436- return nil
437437- }
438438-439439- now := timeutil.TimeStampNow()
440440- task.Status = status
441441- task.Stopped = now
442442- if _, err := UpdateRunJob(ctx, &ActionRunJob{
443443- ID: task.JobID,
444444- Status: task.Status,
445445- Stopped: task.Stopped,
446446- }, nil); err != nil {
447447- return err
448448- }
449449-450450- if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
451451- return err
452452- }
453453-454454- if err := task.LoadAttributes(ctx); err != nil {
455455- return err
456456- }
457457-458458- for _, step := range task.Steps {
459459- if !step.Status.IsDone() {
460460- step.Status = status
461461- if step.Started == 0 {
462462- step.Started = now
463463- }
464464- step.Stopped = now
465465- }
466466- if _, err := e.ID(step.ID).Update(step); err != nil {
467467- return err
468468- }
469469- }
470470-471471- return nil
472472-}
473473-474338func FindOldTasksToExpire(ctx context.Context, olderThan timeutil.TimeStamp, limit int) ([]*ActionTask, error) {
475339 e := db.GetEngine(ctx)
476340···479343 return tasks, e.Where("stopped > 0 AND stopped < ? AND log_expired = ?", olderThan, false).
480344 Limit(limit).
481345 Find(&tasks)
482482-}
483483-484484-func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp {
485485- if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 {
486486- return timeutil.TimeStamp(0)
487487- }
488488- return timeutil.TimeStamp(timestamp.AsTime().Unix())
489346}
490347491348func logFileName(repoFullName string, taskID int64) string {
···4141 jobs := make([]*actions_model.ActionRunJob, 0, len(tasks))
4242 for _, task := range tasks {
4343 if err := db.WithTx(ctx, func(ctx context.Context) error {
4444- if err := actions_model.StopTask(ctx, task.ID, actions_model.StatusFailure); err != nil {
4444+ if err := StopTask(ctx, task.ID, actions_model.StatusFailure); err != nil {
4545 return err
4646 }
4747 if err := task.LoadJob(ctx); err != nil {
+3-3
services/actions/notifier_helper.go
···139139 return nil
140140 }
141141 if unit_model.TypeActions.UnitGlobalDisabled() {
142142- if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo, true); err != nil {
142142+ if err := CleanRepoScheduleTasks(ctx, input.Repo, true); err != nil {
143143 log.Error("CleanRepoScheduleTasks: %v", err)
144144 }
145145 return nil
···373373 // cancel running jobs if the event is push or pull_request_sync
374374 if run.Event == webhook_module.HookEventPush ||
375375 run.Event == webhook_module.HookEventPullRequestSync {
376376- if err := actions_model.CancelPreviousJobs(
376376+ if err := CancelPreviousJobs(
377377 ctx,
378378 run.RepoID,
379379 run.Ref,
···504504 log.Error("CountSchedules: %v", err)
505505 return err
506506 } else if count > 0 {
507507- if err := actions_model.CleanRepoScheduleTasks(ctx, input.Repo, false); err != nil {
507507+ if err := CleanRepoScheduleTasks(ctx, input.Repo, false); err != nil {
508508 log.Error("CleanRepoScheduleTasks: %v", err)
509509 }
510510 }
+92-1
services/actions/schedule_tasks.go
···1717 webhook_module "forgejo.org/modules/webhook"
18181919 "github.com/nektos/act/pkg/jobparser"
2020+ "xorm.io/builder"
2021)
21222223// StartScheduleTasks start the task
···5556 // cancel running jobs if the event is push
5657 if row.Schedule.Event == webhook_module.HookEventPush {
5758 // cancel running jobs of the same workflow
5858- if err := actions_model.CancelPreviousJobs(
5959+ if err := CancelPreviousJobs(
5960 ctx,
6061 row.RepoID,
6162 row.Schedule.Ref,
···152153 // Return nil if no errors occurred
153154 return nil
154155}
156156+157157+// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
158158+// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
159159+func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
160160+ // Find all runs in the specified repository, reference, and workflow with non-final status
161161+ runs, total, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
162162+ RepoID: repoID,
163163+ Ref: ref,
164164+ WorkflowID: workflowID,
165165+ TriggerEvent: event,
166166+ Status: []actions_model.Status{actions_model.StatusRunning, actions_model.StatusWaiting, actions_model.StatusBlocked},
167167+ })
168168+ if err != nil {
169169+ return err
170170+ }
171171+172172+ // If there are no runs found, there's no need to proceed with cancellation, so return nil.
173173+ if total == 0 {
174174+ return nil
175175+ }
176176+177177+ // Iterate over each found run and cancel its associated jobs.
178178+ for _, run := range runs {
179179+ // Find all jobs associated with the current run.
180180+ jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
181181+ RunID: run.ID,
182182+ })
183183+ if err != nil {
184184+ return err
185185+ }
186186+187187+ // Iterate over each job and attempt to cancel it.
188188+ for _, job := range jobs {
189189+ // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
190190+ status := job.Status
191191+ if status.IsDone() {
192192+ continue
193193+ }
194194+195195+ // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
196196+ if job.TaskID == 0 {
197197+ job.Status = actions_model.StatusCancelled
198198+ job.Stopped = timeutil.TimeStampNow()
199199+200200+ // Update the job's status and stopped time in the database.
201201+ n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
202202+ if err != nil {
203203+ return err
204204+ }
205205+206206+ // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
207207+ if n == 0 {
208208+ return fmt.Errorf("job has changed, try again")
209209+ }
210210+211211+ // Continue with the next job.
212212+ continue
213213+ }
214214+215215+ // If the job has an associated task, try to stop the task, effectively cancelling the job.
216216+ if err := StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
217217+ return err
218218+ }
219219+ }
220220+ }
221221+222222+ // Return nil to indicate successful cancellation of all running and waiting jobs.
223223+ return nil
224224+}
225225+226226+func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository, cancelPreviousJobs bool) error {
227227+ // If actions disabled when there is schedule task, this will remove the outdated schedule tasks
228228+ // There is no other place we can do this because the app.ini will be changed manually
229229+ if err := actions_model.DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
230230+ return fmt.Errorf("DeleteCronTaskByRepo: %v", err)
231231+ }
232232+ if cancelPreviousJobs {
233233+ // cancel running cron jobs of this repository and delete old schedules
234234+ if err := CancelPreviousJobs(
235235+ ctx,
236236+ repo.ID,
237237+ repo.DefaultBranch,
238238+ "",
239239+ webhook_module.HookEventSchedule,
240240+ ); err != nil {
241241+ return fmt.Errorf("CancelPreviousJobs: %v", err)
242242+ }
243243+ }
244244+ return nil
245245+}
+144
services/actions/task.go
···1010 actions_model "forgejo.org/models/actions"
1111 "forgejo.org/models/db"
1212 secret_model "forgejo.org/models/secret"
1313+ "forgejo.org/modules/timeutil"
1414+ "forgejo.org/modules/util"
13151416 runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
1517 "google.golang.org/protobuf/types/known/structpb"
1818+ "google.golang.org/protobuf/types/known/timestamppb"
1619)
17201821func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv1.Task, bool, error) {
···105108 }
106109 return ret, nil
107110}
111111+112112+func StopTask(ctx context.Context, taskID int64, status actions_model.Status) error {
113113+ if !status.IsDone() {
114114+ return fmt.Errorf("cannot stop task with status %v", status)
115115+ }
116116+ e := db.GetEngine(ctx)
117117+118118+ task := &actions_model.ActionTask{}
119119+ if has, err := e.ID(taskID).Get(task); err != nil {
120120+ return err
121121+ } else if !has {
122122+ return util.ErrNotExist
123123+ }
124124+ if task.Status.IsDone() {
125125+ return nil
126126+ }
127127+128128+ now := timeutil.TimeStampNow()
129129+ task.Status = status
130130+ task.Stopped = now
131131+ if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{
132132+ ID: task.JobID,
133133+ Status: task.Status,
134134+ Stopped: task.Stopped,
135135+ }, nil); err != nil {
136136+ return err
137137+ }
138138+139139+ if err := actions_model.UpdateTask(ctx, task, "status", "stopped"); err != nil {
140140+ return err
141141+ }
142142+143143+ if err := task.LoadAttributes(ctx); err != nil {
144144+ return err
145145+ }
146146+147147+ for _, step := range task.Steps {
148148+ if !step.Status.IsDone() {
149149+ step.Status = status
150150+ if step.Started == 0 {
151151+ step.Started = now
152152+ }
153153+ step.Stopped = now
154154+ }
155155+ if _, err := e.ID(step.ID).Update(step); err != nil {
156156+ return err
157157+ }
158158+ }
159159+160160+ return nil
161161+}
162162+163163+// UpdateTaskByState updates the task by the state.
164164+// It will always update the task if the state is not final, even there is no change.
165165+// So it will update ActionTask.Updated to avoid the task being judged as a zombie task.
166166+func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.TaskState) (*actions_model.ActionTask, error) {
167167+ stepStates := map[int64]*runnerv1.StepState{}
168168+ for _, v := range state.Steps {
169169+ stepStates[v.Id] = v
170170+ }
171171+172172+ ctx, commiter, err := db.TxContext(ctx)
173173+ if err != nil {
174174+ return nil, err
175175+ }
176176+ defer commiter.Close()
177177+178178+ e := db.GetEngine(ctx)
179179+180180+ task := &actions_model.ActionTask{}
181181+ if has, err := e.ID(state.Id).Get(task); err != nil {
182182+ return nil, err
183183+ } else if !has {
184184+ return nil, util.ErrNotExist
185185+ } else if runnerID != task.RunnerID {
186186+ return nil, fmt.Errorf("invalid runner for task")
187187+ }
188188+189189+ if task.Status.IsDone() {
190190+ // the state is final, do nothing
191191+ return task, nil
192192+ }
193193+194194+ // state.Result is not unspecified means the task is finished
195195+ if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
196196+ task.Status = actions_model.Status(state.Result)
197197+ task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
198198+ if err := actions_model.UpdateTask(ctx, task, "status", "stopped"); err != nil {
199199+ return nil, err
200200+ }
201201+ if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{
202202+ ID: task.JobID,
203203+ Status: task.Status,
204204+ Stopped: task.Stopped,
205205+ }, nil); err != nil {
206206+ return nil, err
207207+ }
208208+ } else {
209209+ // Force update ActionTask.Updated to avoid the task being judged as a zombie task
210210+ task.Updated = timeutil.TimeStampNow()
211211+ if err := actions_model.UpdateTask(ctx, task, "updated"); err != nil {
212212+ return nil, err
213213+ }
214214+ }
215215+216216+ if err := task.LoadAttributes(ctx); err != nil {
217217+ return nil, err
218218+ }
219219+220220+ for _, step := range task.Steps {
221221+ var result runnerv1.Result
222222+ if v, ok := stepStates[step.Index]; ok {
223223+ result = v.Result
224224+ step.LogIndex = v.LogIndex
225225+ step.LogLength = v.LogLength
226226+ step.Started = convertTimestamp(v.StartedAt)
227227+ step.Stopped = convertTimestamp(v.StoppedAt)
228228+ }
229229+ if result != runnerv1.Result_RESULT_UNSPECIFIED {
230230+ step.Status = actions_model.Status(result)
231231+ } else if step.Started != 0 {
232232+ step.Status = actions_model.StatusRunning
233233+ }
234234+ if _, err := e.ID(step.ID).Update(step); err != nil {
235235+ return nil, err
236236+ }
237237+ }
238238+239239+ if err := commiter.Commit(); err != nil {
240240+ return nil, err
241241+ }
242242+243243+ return task, nil
244244+}
245245+246246+func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp {
247247+ if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 {
248248+ return timeutil.TimeStamp(0)
249249+ }
250250+ return timeutil.TimeStamp(timestamp.AsTime().Unix())
251251+}
+3-2
services/repository/branch.go
···2828 "forgejo.org/modules/timeutil"
2929 "forgejo.org/modules/util"
3030 webhook_module "forgejo.org/modules/webhook"
3131+ actions_service "forgejo.org/services/actions"
3132 notify_service "forgejo.org/services/notify"
3233 pull_service "forgejo.org/services/pull"
3334 files_service "forgejo.org/services/repository/files"
···377378 log.Error("DeleteCronTaskByRepo: %v", err)
378379 }
379380 // cancel running cron jobs of this repository and delete old schedules
380380- if err := actions_model.CancelPreviousJobs(
381381+ if err := actions_service.CancelPreviousJobs(
381382 ctx,
382383 repo.ID,
383384 from,
···578579 log.Error("DeleteCronTaskByRepo: %v", err)
579580 }
580581 // cancel running cron jobs of this repository and delete old schedules
581581- if err := actions_model.CancelPreviousJobs(
582582+ if err := actions_service.CancelPreviousJobs(
582583 ctx,
583584 repo.ID,
584585 oldDefaultBranchName,