Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview: listen for pipeline events from spindlestream

Signed-off-by: Seongmin Lee <git@boltless.me>

+89 -86
-86
appview/state/knotstream.go
··· 18 18 "tangled.org/core/log" 19 19 "tangled.org/core/orm" 20 20 "tangled.org/core/rbac" 21 - "tangled.org/core/workflow" 22 21 23 - "github.com/bluesky-social/indigo/atproto/syntax" 24 22 "github.com/go-git/go-git/v5/plumbing" 25 23 "github.com/posthog/posthog-go" 26 24 ) ··· 65 67 switch msg.Nsid { 66 68 case tangled.GitRefUpdateNSID: 67 69 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 68 - case tangled.PipelineNSID: 69 - return ingestPipeline(d, source, msg) 70 70 } 71 71 72 72 return nil ··· 185 189 } 186 190 187 191 return tx.Commit() 188 - } 189 - 190 - func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 191 - var record tangled.Pipeline 192 - err := json.Unmarshal(msg.EventJson, &record) 193 - if err != nil { 194 - return err 195 - } 196 - 197 - if record.TriggerMetadata == nil { 198 - return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 199 - } 200 - 201 - if record.TriggerMetadata.Repo == nil { 202 - return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 203 - } 204 - 205 - // does this repo have a spindle configured? 206 - repos, err := db.GetRepos( 207 - d, 208 - 0, 209 - orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 210 - orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 211 - ) 212 - if err != nil { 213 - return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 214 - } 215 - if len(repos) != 1 { 216 - return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 217 - } 218 - if repos[0].Spindle == "" { 219 - return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 220 - } 221 - 222 - // trigger info 223 - var trigger models.Trigger 224 - var sha string 225 - trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 226 - switch trigger.Kind { 227 - case workflow.TriggerKindPush: 228 - trigger.PushRef = &record.TriggerMetadata.Push.Ref 229 - trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 230 - trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 231 - sha = *trigger.PushNewSha 232 - case workflow.TriggerKindPullRequest: 233 - trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 234 - trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 235 - trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 236 - trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 237 - sha = *trigger.PRSourceSha 238 - } 239 - 240 - tx, err := d.Begin() 241 - if err != nil { 242 - return fmt.Errorf("failed to start txn: %w", err) 243 - } 244 - 245 - triggerId, err := db.AddTrigger(tx, trigger) 246 - if err != nil { 247 - return fmt.Errorf("failed to add trigger entry: %w", err) 248 - } 249 - 250 - pipeline := models.Pipeline{ 251 - Rkey: msg.Rkey, 252 - Knot: source.Key(), 253 - RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 254 - RepoName: record.TriggerMetadata.Repo.Repo, 255 - TriggerId: int(triggerId), 256 - Sha: sha, 257 - } 258 - 259 - err = db.AddPipeline(tx, pipeline) 260 - if err != nil { 261 - return fmt.Errorf("failed to add pipeline: %w", err) 262 - } 263 - 264 - err = tx.Commit() 265 - if err != nil { 266 - return fmt.Errorf("failed to commit txn: %w", err) 267 - } 268 - 269 - return nil 270 192 }
+89
appview/state/spindlestream.go
··· 20 20 "tangled.org/core/orm" 21 21 "tangled.org/core/rbac" 22 22 spindle "tangled.org/core/spindle/models" 23 + "tangled.org/core/workflow" 23 24 ) 24 25 25 26 func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { ··· 63 62 func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 64 63 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 65 64 switch msg.Nsid { 65 + case tangled.PipelineNSID: 66 + return ingestPipeline(logger, d, source, msg) 66 67 case tangled.PipelineStatusNSID: 67 68 return ingestPipelineStatus(ctx, logger, d, source, msg) 68 69 } 69 70 70 71 return nil 71 72 } 73 + } 74 + 75 + func ingestPipeline(l *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 76 + var record tangled.Pipeline 77 + err := json.Unmarshal(msg.EventJson, &record) 78 + if err != nil { 79 + return err 80 + } 81 + 82 + if record.TriggerMetadata == nil { 83 + return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 84 + } 85 + 86 + if record.TriggerMetadata.Repo == nil { 87 + return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 88 + } 89 + 90 + // does this repo have a spindle configured? 91 + repos, err := db.GetRepos( 92 + d, 93 + 0, 94 + orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 95 + orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 96 + ) 97 + if err != nil { 98 + return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 99 + } 100 + if len(repos) != 1 { 101 + return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 102 + } 103 + if repos[0].Spindle == "" { 104 + return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 105 + } 106 + 107 + // trigger info 108 + var trigger models.Trigger 109 + var sha string 110 + trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 111 + switch trigger.Kind { 112 + case workflow.TriggerKindPush: 113 + trigger.PushRef = &record.TriggerMetadata.Push.Ref 114 + trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 115 + trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 116 + sha = *trigger.PushNewSha 117 + case workflow.TriggerKindPullRequest: 118 + trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 119 + trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 120 + trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 121 + trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 122 + sha = *trigger.PRSourceSha 123 + } 124 + 125 + tx, err := d.Begin() 126 + if err != nil { 127 + return fmt.Errorf("failed to start txn: %w", err) 128 + } 129 + 130 + triggerId, err := db.AddTrigger(tx, trigger) 131 + if err != nil { 132 + return fmt.Errorf("failed to add trigger entry: %w", err) 133 + } 134 + 135 + // TODO: we shouldn't even use knot to identify pipelines 136 + knot := record.TriggerMetadata.Repo.Knot 137 + pipeline := models.Pipeline{ 138 + Rkey: msg.Rkey, 139 + Knot: knot, 140 + RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 141 + RepoName: record.TriggerMetadata.Repo.Repo, 142 + TriggerId: int(triggerId), 143 + Sha: sha, 144 + } 145 + 146 + err = db.AddPipeline(tx, pipeline) 147 + if err != nil { 148 + return fmt.Errorf("failed to add pipeline: %w", err) 149 + } 150 + 151 + err = tx.Commit() 152 + if err != nil { 153 + return fmt.Errorf("failed to commit txn: %w", err) 154 + } 155 + 156 + l.Info("added pipeline", "pipeline", pipeline) 157 + 158 + return nil 72 159 } 73 160 74 161 func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {