Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 435 lines 8.5 kB view raw
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "slices" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "tangled.org/core/appview/models" 13 "tangled.org/core/orm" 14) 15 16func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) { 17 var pipelines []models.Pipeline 18 19 var conditions []string 20 var args []any 21 for _, filter := range filters { 22 conditions = append(conditions, filter.Condition()) 23 args = append(args, filter.Arg()...) 24 } 25 26 whereClause := "" 27 if conditions != nil { 28 whereClause = " where " + strings.Join(conditions, " and ") 29 } 30 31 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created, repo_did from pipelines %s`, whereClause) 32 33 rows, err := e.Query(query, args...) 34 35 if err != nil { 36 return nil, err 37 } 38 defer rows.Close() 39 40 for rows.Next() { 41 var pipeline models.Pipeline 42 var createdAt string 43 var repoDid sql.NullString 44 err = rows.Scan( 45 &pipeline.Id, 46 &pipeline.Rkey, 47 &pipeline.Knot, 48 &pipeline.RepoOwner, 49 &pipeline.RepoName, 50 &pipeline.Sha, 51 &createdAt, 52 &repoDid, 53 ) 54 if err != nil { 55 return nil, err 56 } 57 58 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 59 pipeline.Created = t 60 } 61 if repoDid.Valid { 62 pipeline.RepoDid = repoDid.String 63 } 64 65 pipelines = append(pipelines, pipeline) 66 } 67 68 if err = rows.Err(); err != nil { 69 return nil, err 70 } 71 72 return pipelines, nil 73} 74 75func AddPipeline(e Execer, pipeline models.Pipeline) error { 76 var repoDid *string 77 if pipeline.RepoDid != "" { 78 repoDid = &pipeline.RepoDid 79 } 80 81 args := []any{ 82 pipeline.Rkey, 83 pipeline.Knot, 84 pipeline.RepoOwner, 85 pipeline.RepoName, 86 pipeline.TriggerId, 87 pipeline.Sha, 88 repoDid, 89 } 90 91 placeholders := make([]string, len(args)) 92 for i := range placeholders { 93 placeholders[i] = "?" 94 } 95 96 query := fmt.Sprintf(` 97 insert or ignore into pipelines ( 98 rkey, 99 knot, 100 repo_owner, 101 repo_name, 102 trigger_id, 103 sha, 104 repo_did 105 ) values (%s) 106 `, strings.Join(placeholders, ",")) 107 108 _, err := e.Exec(query, args...) 109 110 return err 111} 112 113func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 114 args := []any{ 115 trigger.Kind, 116 trigger.PushRef, 117 trigger.PushNewSha, 118 trigger.PushOldSha, 119 trigger.PRSourceBranch, 120 trigger.PRTargetBranch, 121 trigger.PRSourceSha, 122 trigger.PRAction, 123 } 124 125 placeholders := make([]string, len(args)) 126 for i := range placeholders { 127 placeholders[i] = "?" 128 } 129 130 query := fmt.Sprintf(`insert or ignore into triggers ( 131 kind, 132 push_ref, 133 push_new_sha, 134 push_old_sha, 135 pr_source_branch, 136 pr_target_branch, 137 pr_source_sha, 138 pr_action 139 ) values (%s)`, strings.Join(placeholders, ",")) 140 141 res, err := e.Exec(query, args...) 142 if err != nil { 143 return 0, err 144 } 145 146 return res.LastInsertId() 147} 148 149func AddPipelineStatus(ctx context.Context, e Execer, status models.PipelineStatus) error { 150 args := []any{ 151 status.Spindle, 152 status.Rkey, 153 status.PipelineKnot, 154 status.PipelineRkey, 155 status.Workflow, 156 status.Status, 157 status.Error, 158 status.ExitCode, 159 status.Created.Format(time.RFC3339), 160 } 161 162 placeholders := make([]string, len(args)) 163 for i := range placeholders { 164 placeholders[i] = "?" 165 } 166 167 query := fmt.Sprintf(` 168 insert or ignore into pipeline_statuses ( 169 spindle, 170 rkey, 171 pipeline_knot, 172 pipeline_rkey, 173 workflow, 174 status, 175 error, 176 exit_code, 177 created 178 ) values (%s) 179 `, strings.Join(placeholders, ",")) 180 181 _, err := e.ExecContext(ctx, query, args...) 182 return err 183} 184 185// this is a mega query, but the most useful one: 186// get N pipelines, for each one get the latest status of its N workflows 187// 188// the pipelines table is aliased to `p` 189// the triggers table is aliased to `t` 190func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) { 191 var conditions []string 192 var args []any 193 for _, filter := range filters { 194 conditions = append(conditions, filter.Condition()) 195 args = append(args, filter.Arg()...) 196 } 197 198 whereClause := "" 199 if conditions != nil { 200 whereClause = " where " + strings.Join(conditions, " and ") 201 } 202 203 query := fmt.Sprintf(` 204 select 205 p.id, 206 p.knot, 207 p.rkey, 208 p.repo_owner, 209 p.repo_name, 210 p.sha, 211 p.created, 212 p.repo_did, 213 t.id, 214 t.kind, 215 t.push_ref, 216 t.push_new_sha, 217 t.push_old_sha, 218 t.pr_source_branch, 219 t.pr_target_branch, 220 t.pr_source_sha, 221 t.pr_action 222 from 223 pipelines p 224 join 225 triggers t ON p.trigger_id = t.id 226 %s 227 order by p.created desc 228 limit %d 229 `, whereClause, limit) 230 231 rows, err := e.Query(query, args...) 232 if err != nil { 233 return nil, err 234 } 235 defer rows.Close() 236 237 pipelines := make(map[syntax.ATURI]models.Pipeline) 238 for rows.Next() { 239 var p models.Pipeline 240 var t models.Trigger 241 var created string 242 var repoDid sql.NullString 243 244 err := rows.Scan( 245 &p.Id, 246 &p.Knot, 247 &p.Rkey, 248 &p.RepoOwner, 249 &p.RepoName, 250 &p.Sha, 251 &created, 252 &repoDid, 253 &p.TriggerId, 254 &t.Kind, 255 &t.PushRef, 256 &t.PushNewSha, 257 &t.PushOldSha, 258 &t.PRSourceBranch, 259 &t.PRTargetBranch, 260 &t.PRSourceSha, 261 &t.PRAction, 262 ) 263 if err != nil { 264 return nil, err 265 } 266 267 p.Created, err = time.Parse(time.RFC3339, created) 268 if err != nil { 269 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 270 } 271 if repoDid.Valid { 272 p.RepoDid = repoDid.String 273 } 274 275 t.Id = p.TriggerId 276 p.Trigger = &t 277 p.Statuses = make(map[string]models.WorkflowStatus) 278 279 pipelines[p.AtUri()] = p 280 } 281 282 // get all statuses 283 // the where clause here is of the form: 284 // 285 // where (pipeline_knot = k1 and pipeline_rkey = r1) 286 // or (pipeline_knot = k2 and pipeline_rkey = r2) 287 conditions = nil 288 args = nil 289 for _, p := range pipelines { 290 knotFilter := orm.FilterEq("pipeline_knot", p.Knot) 291 rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey) 292 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 293 args = append(args, p.Knot) 294 args = append(args, p.Rkey) 295 } 296 whereClause = "" 297 if conditions != nil { 298 whereClause = "where " + strings.Join(conditions, " or ") 299 } 300 query = fmt.Sprintf(` 301 select 302 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 303 from 304 pipeline_statuses 305 %s 306 `, whereClause) 307 308 rows, err = e.Query(query, args...) 309 if err != nil { 310 return nil, err 311 } 312 defer rows.Close() 313 314 for rows.Next() { 315 var ps models.PipelineStatus 316 var created string 317 318 err := rows.Scan( 319 &ps.ID, 320 &ps.Spindle, 321 &ps.Rkey, 322 &ps.PipelineKnot, 323 &ps.PipelineRkey, 324 &created, 325 &ps.Workflow, 326 &ps.Status, 327 &ps.Error, 328 &ps.ExitCode, 329 ) 330 if err != nil { 331 return nil, err 332 } 333 334 ps.Created, err = time.Parse(time.RFC3339, created) 335 if err != nil { 336 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 337 } 338 339 pipelineAt := ps.PipelineAt() 340 341 // extract 342 pipeline, ok := pipelines[pipelineAt] 343 if !ok { 344 continue 345 } 346 statuses, _ := pipeline.Statuses[ps.Workflow] 347 if !ok { 348 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{} 349 } 350 351 // append 352 statuses.Data = append(statuses.Data, ps) 353 354 // reassign 355 pipeline.Statuses[ps.Workflow] = statuses 356 pipelines[pipelineAt] = pipeline 357 } 358 359 var all []models.Pipeline 360 for _, p := range pipelines { 361 for _, s := range p.Statuses { 362 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int { 363 if a.Created.After(b.Created) { 364 return 1 365 } 366 if a.Created.Before(b.Created) { 367 return -1 368 } 369 if a.ID > b.ID { 370 return 1 371 } 372 if a.ID < b.ID { 373 return -1 374 } 375 return 0 376 }) 377 } 378 all = append(all, p) 379 } 380 381 // sort pipelines by date 382 slices.SortFunc(all, func(a, b models.Pipeline) int { 383 if a.Created.After(b.Created) { 384 return -1 385 } 386 return 1 387 }) 388 389 return all, nil 390} 391 392// the pipelines table is aliased to `p` 393// the triggers table is aliased to `t` 394func GetTotalPipelineStatuses(e Execer, filters ...orm.Filter) (int64, error) { 395 var conditions []string 396 var args []any 397 for _, filter := range filters { 398 conditions = append(conditions, filter.Condition()) 399 args = append(args, filter.Arg()...) 400 } 401 402 whereClause := "" 403 if conditions != nil { 404 whereClause = " where " + strings.Join(conditions, " and ") 405 } 406 407 query := fmt.Sprintf(` 408 select 409 count(1) 410 from 411 pipelines p 412 join 413 triggers t ON p.trigger_id = t.id 414 %s 415 `, whereClause) 416 417 rows, err := e.Query(query, args...) 418 if err != nil { 419 return 0, err 420 } 421 defer rows.Close() 422 423 for rows.Next() { 424 var count int64 425 err := rows.Scan(&count) 426 if err != nil { 427 return 0, err 428 } 429 430 return count, nil 431 } 432 433 // unreachable 434 return 0, nil 435}