forked from
tangled.org/core
Monorepo for Tangled
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "slices"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.org/core/appview/models"
13 "tangled.org/core/orm"
14)
15
16func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) {
17 var pipelines []models.Pipeline
18
19 var conditions []string
20 var args []any
21 for _, filter := range filters {
22 conditions = append(conditions, filter.Condition())
23 args = append(args, filter.Arg()...)
24 }
25
26 whereClause := ""
27 if conditions != nil {
28 whereClause = " where " + strings.Join(conditions, " and ")
29 }
30
31 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created, repo_did from pipelines %s`, whereClause)
32
33 rows, err := e.Query(query, args...)
34
35 if err != nil {
36 return nil, err
37 }
38 defer rows.Close()
39
40 for rows.Next() {
41 var pipeline models.Pipeline
42 var createdAt string
43 var repoDid sql.NullString
44 err = rows.Scan(
45 &pipeline.Id,
46 &pipeline.Rkey,
47 &pipeline.Knot,
48 &pipeline.RepoOwner,
49 &pipeline.RepoName,
50 &pipeline.Sha,
51 &createdAt,
52 &repoDid,
53 )
54 if err != nil {
55 return nil, err
56 }
57
58 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
59 pipeline.Created = t
60 }
61 if repoDid.Valid {
62 pipeline.RepoDid = repoDid.String
63 }
64
65 pipelines = append(pipelines, pipeline)
66 }
67
68 if err = rows.Err(); err != nil {
69 return nil, err
70 }
71
72 return pipelines, nil
73}
74
75func AddPipeline(e Execer, pipeline models.Pipeline) error {
76 var repoDid *string
77 if pipeline.RepoDid != "" {
78 repoDid = &pipeline.RepoDid
79 }
80
81 args := []any{
82 pipeline.Rkey,
83 pipeline.Knot,
84 pipeline.RepoOwner,
85 pipeline.RepoName,
86 pipeline.TriggerId,
87 pipeline.Sha,
88 repoDid,
89 }
90
91 placeholders := make([]string, len(args))
92 for i := range placeholders {
93 placeholders[i] = "?"
94 }
95
96 query := fmt.Sprintf(`
97 insert or ignore into pipelines (
98 rkey,
99 knot,
100 repo_owner,
101 repo_name,
102 trigger_id,
103 sha,
104 repo_did
105 ) values (%s)
106 `, strings.Join(placeholders, ","))
107
108 _, err := e.Exec(query, args...)
109
110 return err
111}
112
113func AddTrigger(e Execer, trigger models.Trigger) (int64, error) {
114 args := []any{
115 trigger.Kind,
116 trigger.PushRef,
117 trigger.PushNewSha,
118 trigger.PushOldSha,
119 trigger.PRSourceBranch,
120 trigger.PRTargetBranch,
121 trigger.PRSourceSha,
122 trigger.PRAction,
123 }
124
125 placeholders := make([]string, len(args))
126 for i := range placeholders {
127 placeholders[i] = "?"
128 }
129
130 query := fmt.Sprintf(`insert or ignore into triggers (
131 kind,
132 push_ref,
133 push_new_sha,
134 push_old_sha,
135 pr_source_branch,
136 pr_target_branch,
137 pr_source_sha,
138 pr_action
139 ) values (%s)`, strings.Join(placeholders, ","))
140
141 res, err := e.Exec(query, args...)
142 if err != nil {
143 return 0, err
144 }
145
146 return res.LastInsertId()
147}
148
149func AddPipelineStatus(ctx context.Context, e Execer, status models.PipelineStatus) error {
150 args := []any{
151 status.Spindle,
152 status.Rkey,
153 status.PipelineKnot,
154 status.PipelineRkey,
155 status.Workflow,
156 status.Status,
157 status.Error,
158 status.ExitCode,
159 status.Created.Format(time.RFC3339),
160 }
161
162 placeholders := make([]string, len(args))
163 for i := range placeholders {
164 placeholders[i] = "?"
165 }
166
167 query := fmt.Sprintf(`
168 insert or ignore into pipeline_statuses (
169 spindle,
170 rkey,
171 pipeline_knot,
172 pipeline_rkey,
173 workflow,
174 status,
175 error,
176 exit_code,
177 created
178 ) values (%s)
179 `, strings.Join(placeholders, ","))
180
181 _, err := e.ExecContext(ctx, query, args...)
182 return err
183}
184
185// this is a mega query, but the most useful one:
186// get N pipelines, for each one get the latest status of its N workflows
187//
188// the pipelines table is aliased to `p`
189// the triggers table is aliased to `t`
190func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) {
191 var conditions []string
192 var args []any
193 for _, filter := range filters {
194 conditions = append(conditions, filter.Condition())
195 args = append(args, filter.Arg()...)
196 }
197
198 whereClause := ""
199 if conditions != nil {
200 whereClause = " where " + strings.Join(conditions, " and ")
201 }
202
203 query := fmt.Sprintf(`
204 select
205 p.id,
206 p.knot,
207 p.rkey,
208 p.repo_owner,
209 p.repo_name,
210 p.sha,
211 p.created,
212 p.repo_did,
213 t.id,
214 t.kind,
215 t.push_ref,
216 t.push_new_sha,
217 t.push_old_sha,
218 t.pr_source_branch,
219 t.pr_target_branch,
220 t.pr_source_sha,
221 t.pr_action
222 from
223 pipelines p
224 join
225 triggers t ON p.trigger_id = t.id
226 %s
227 order by p.created desc
228 limit %d
229 `, whereClause, limit)
230
231 rows, err := e.Query(query, args...)
232 if err != nil {
233 return nil, err
234 }
235 defer rows.Close()
236
237 pipelines := make(map[syntax.ATURI]models.Pipeline)
238 for rows.Next() {
239 var p models.Pipeline
240 var t models.Trigger
241 var created string
242 var repoDid sql.NullString
243
244 err := rows.Scan(
245 &p.Id,
246 &p.Knot,
247 &p.Rkey,
248 &p.RepoOwner,
249 &p.RepoName,
250 &p.Sha,
251 &created,
252 &repoDid,
253 &p.TriggerId,
254 &t.Kind,
255 &t.PushRef,
256 &t.PushNewSha,
257 &t.PushOldSha,
258 &t.PRSourceBranch,
259 &t.PRTargetBranch,
260 &t.PRSourceSha,
261 &t.PRAction,
262 )
263 if err != nil {
264 return nil, err
265 }
266
267 p.Created, err = time.Parse(time.RFC3339, created)
268 if err != nil {
269 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
270 }
271 if repoDid.Valid {
272 p.RepoDid = repoDid.String
273 }
274
275 t.Id = p.TriggerId
276 p.Trigger = &t
277 p.Statuses = make(map[string]models.WorkflowStatus)
278
279 pipelines[p.AtUri()] = p
280 }
281
282 // get all statuses
283 // the where clause here is of the form:
284 //
285 // where (pipeline_knot = k1 and pipeline_rkey = r1)
286 // or (pipeline_knot = k2 and pipeline_rkey = r2)
287 conditions = nil
288 args = nil
289 for _, p := range pipelines {
290 knotFilter := orm.FilterEq("pipeline_knot", p.Knot)
291 rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey)
292 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
293 args = append(args, p.Knot)
294 args = append(args, p.Rkey)
295 }
296 whereClause = ""
297 if conditions != nil {
298 whereClause = "where " + strings.Join(conditions, " or ")
299 }
300 query = fmt.Sprintf(`
301 select
302 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
303 from
304 pipeline_statuses
305 %s
306 `, whereClause)
307
308 rows, err = e.Query(query, args...)
309 if err != nil {
310 return nil, err
311 }
312 defer rows.Close()
313
314 for rows.Next() {
315 var ps models.PipelineStatus
316 var created string
317
318 err := rows.Scan(
319 &ps.ID,
320 &ps.Spindle,
321 &ps.Rkey,
322 &ps.PipelineKnot,
323 &ps.PipelineRkey,
324 &created,
325 &ps.Workflow,
326 &ps.Status,
327 &ps.Error,
328 &ps.ExitCode,
329 )
330 if err != nil {
331 return nil, err
332 }
333
334 ps.Created, err = time.Parse(time.RFC3339, created)
335 if err != nil {
336 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
337 }
338
339 pipelineAt := ps.PipelineAt()
340
341 // extract
342 pipeline, ok := pipelines[pipelineAt]
343 if !ok {
344 continue
345 }
346 statuses, _ := pipeline.Statuses[ps.Workflow]
347 if !ok {
348 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{}
349 }
350
351 // append
352 statuses.Data = append(statuses.Data, ps)
353
354 // reassign
355 pipeline.Statuses[ps.Workflow] = statuses
356 pipelines[pipelineAt] = pipeline
357 }
358
359 var all []models.Pipeline
360 for _, p := range pipelines {
361 for _, s := range p.Statuses {
362 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int {
363 if a.Created.After(b.Created) {
364 return 1
365 }
366 if a.Created.Before(b.Created) {
367 return -1
368 }
369 if a.ID > b.ID {
370 return 1
371 }
372 if a.ID < b.ID {
373 return -1
374 }
375 return 0
376 })
377 }
378 all = append(all, p)
379 }
380
381 // sort pipelines by date
382 slices.SortFunc(all, func(a, b models.Pipeline) int {
383 if a.Created.After(b.Created) {
384 return -1
385 }
386 return 1
387 })
388
389 return all, nil
390}
391
392// the pipelines table is aliased to `p`
393// the triggers table is aliased to `t`
394func GetTotalPipelineStatuses(e Execer, filters ...orm.Filter) (int64, error) {
395 var conditions []string
396 var args []any
397 for _, filter := range filters {
398 conditions = append(conditions, filter.Condition())
399 args = append(args, filter.Arg()...)
400 }
401
402 whereClause := ""
403 if conditions != nil {
404 whereClause = " where " + strings.Join(conditions, " and ")
405 }
406
407 query := fmt.Sprintf(`
408 select
409 count(1)
410 from
411 pipelines p
412 join
413 triggers t ON p.trigger_id = t.id
414 %s
415 `, whereClause)
416
417 rows, err := e.Query(query, args...)
418 if err != nil {
419 return 0, err
420 }
421 defer rows.Close()
422
423 for rows.Next() {
424 var count int64
425 err := rows.Scan(&count)
426 if err != nil {
427 return 0, err
428 }
429
430 return count, nil
431 }
432
433 // unreachable
434 return 0, nil
435}