Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

knotserver,spindle: remove all pipeline logics from knotserver

`sh.tangled.pipeline` events are now completely generated & streamed
from spindle

Signed-off-by: Seongmin Lee <git@boltless.me>

+1 -286
-136
knotserver/ingester.go
··· 7 7 "io" 8 8 "net/http" 9 9 "net/url" 10 - "path/filepath" 11 10 "strings" 12 11 13 12 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 16 17 securejoin "github.com/cyphar/filepath-securejoin" 17 18 "tangled.org/core/api/tangled" 18 19 "tangled.org/core/knotserver/db" 19 - "tangled.org/core/knotserver/git" 20 20 "tangled.org/core/log" 21 21 "tangled.org/core/rbac" 22 - "tangled.org/core/workflow" 23 22 ) 24 23 25 24 func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error { ··· 80 83 } 81 84 82 85 return nil 83 - } 84 - 85 - func (h *Knot) processPull(ctx context.Context, event *models.Event) error { 86 - raw := json.RawMessage(event.Commit.Record) 87 - did := event.Did 88 - 89 - var record tangled.RepoPull 90 - if err := json.Unmarshal(raw, &record); err != nil { 91 - return fmt.Errorf("failed to unmarshal record: %w", err) 92 - } 93 - 94 - l := log.FromContext(ctx) 95 - l = l.With("handler", "processPull") 96 - l = l.With("did", did) 97 - 98 - if record.Target == nil { 99 - return fmt.Errorf("ignoring pull record: target repo is nil") 100 - } 101 - 102 - l = l.With("target_repo", record.Target.Repo) 103 - l = l.With("target_branch", record.Target.Branch) 104 - 105 - if record.Source == nil { 106 - return fmt.Errorf("ignoring pull record: not a branch-based pull request") 107 - } 108 - 109 - if record.Source.Repo != nil { 110 - return fmt.Errorf("ignoring pull record: fork based pull") 111 - } 112 - 113 - repoAt, err := syntax.ParseATURI(record.Target.Repo) 114 - if err != nil { 115 - return fmt.Errorf("failed to parse ATURI: %w", err) 116 - } 117 - 118 - // resolve this aturi to extract the repo record 119 - ident, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 120 - if err != nil || ident.Handle.IsInvalidHandle() { 121 - return fmt.Errorf("failed to resolve handle: %w", err) 122 - } 123 - 124 - xrpcc := xrpc.Client{ 125 - Host: ident.PDSEndpoint(), 126 - } 127 - 128 - resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 129 - if err != nil { 130 - return fmt.Errorf("failed to resolver repo: %w", err) 131 - } 132 - 133 - repo := resp.Value.Val.(*tangled.Repo) 134 - 135 - if repo.Knot != h.c.Server.Hostname { 136 - return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 137 - } 138 - 139 - didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 140 - if err != nil { 141 - return fmt.Errorf("failed to construct relative repo path: %w", err) 142 - } 143 - 144 - repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 145 - if err != nil { 146 - return fmt.Errorf("failed to construct absolute repo path: %w", err) 147 - } 148 - 149 - gr, err := git.Open(repoPath, record.Source.Sha) 150 - if err != nil { 151 - return fmt.Errorf("failed to open git repository: %w", err) 152 - } 153 - 154 - workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 155 - if err != nil { 156 - return fmt.Errorf("failed to open workflow directory: %w", err) 157 - } 158 - 159 - var pipeline workflow.RawPipeline 160 - for _, e := range workflowDir { 161 - if !e.IsFile() { 162 - continue 163 - } 164 - 165 - fpath := filepath.Join(workflow.WorkflowDir, e.Name) 166 - contents, err := gr.RawContent(fpath) 167 - if err != nil { 168 - continue 169 - } 170 - 171 - pipeline = append(pipeline, workflow.RawWorkflow{ 172 - Name: e.Name, 173 - Contents: contents, 174 - }) 175 - } 176 - 177 - trigger := tangled.Pipeline_PullRequestTriggerData{ 178 - Action: "create", 179 - SourceBranch: record.Source.Branch, 180 - SourceSha: record.Source.Sha, 181 - TargetBranch: record.Target.Branch, 182 - } 183 - 184 - compiler := workflow.Compiler{ 185 - Trigger: tangled.Pipeline_TriggerMetadata{ 186 - Kind: string(workflow.TriggerKindPullRequest), 187 - PullRequest: &trigger, 188 - Repo: &tangled.Pipeline_TriggerRepo{ 189 - Did: ident.DID.String(), 190 - Knot: repo.Knot, 191 - Repo: repo.Name, 192 - }, 193 - }, 194 - } 195 - 196 - cp := compiler.Compile(compiler.Parse(pipeline)) 197 - eventJson, err := json.Marshal(cp) 198 - if err != nil { 199 - return fmt.Errorf("failed to marshal pipeline event: %w", err) 200 - } 201 - 202 - // do not run empty pipelines 203 - if cp.Workflows == nil { 204 - return nil 205 - } 206 - 207 - ev := db.Event{ 208 - Rkey: TID(), 209 - Nsid: tangled.PipelineNSID, 210 - EventJson: string(eventJson), 211 - } 212 - 213 - return h.db.InsertEvent(ev, h.n) 214 86 } 215 87 216 88 // duplicated from add collaborator ··· 204 338 err = h.processPublicKey(ctx, event) 205 339 case tangled.KnotMemberNSID: 206 340 err = h.processKnotMember(ctx, event) 207 - case tangled.RepoPullNSID: 208 - err = h.processPull(ctx, event) 209 341 case tangled.RepoCollaboratorNSID: 210 342 err = h.processCollaborator(ctx, event) 211 343 }
-109
knotserver/internal.go
··· 23 23 "tangled.org/core/log" 24 24 "tangled.org/core/notifier" 25 25 "tangled.org/core/rbac" 26 - "tangled.org/core/workflow" 27 26 ) 28 27 29 28 type InternalHandle struct { ··· 187 188 l.Error("failed to reply with compare link", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir) 188 189 // non-fatal 189 190 } 190 - 191 - err = h.triggerPipeline(&resp.Messages, line, gitUserDid, repoDid, repoName, pushOptions) 192 - if err != nil { 193 - l.Error("failed to trigger pipeline", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir) 194 - // non-fatal 195 - } 196 191 } 197 192 198 193 writeJSON(w, resp) ··· 235 242 } 236 243 237 244 return errors.Join(errs, h.db.InsertEvent(event, h.n)) 238 - } 239 - 240 - func (h *InternalHandle) triggerPipeline( 241 - clientMsgs *[]string, 242 - line git.PostReceiveLine, 243 - gitUserDid string, 244 - repoDid string, 245 - repoName string, 246 - pushOptions PushOptions, 247 - ) error { 248 - if pushOptions.skipCi { 249 - return nil 250 - } 251 - 252 - didSlashRepo, err := securejoin.SecureJoin(repoDid, repoName) 253 - if err != nil { 254 - return err 255 - } 256 - 257 - repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 258 - if err != nil { 259 - return err 260 - } 261 - 262 - gr, err := git.Open(repoPath, line.Ref) 263 - if err != nil { 264 - return err 265 - } 266 - 267 - workflowDir, err := gr.FileTree(context.Background(), workflow.WorkflowDir) 268 - if err != nil { 269 - return err 270 - } 271 - 272 - var pipeline workflow.RawPipeline 273 - for _, e := range workflowDir { 274 - if !e.IsFile() { 275 - continue 276 - } 277 - 278 - fpath := filepath.Join(workflow.WorkflowDir, e.Name) 279 - contents, err := gr.RawContent(fpath) 280 - if err != nil { 281 - continue 282 - } 283 - 284 - pipeline = append(pipeline, workflow.RawWorkflow{ 285 - Name: e.Name, 286 - Contents: contents, 287 - }) 288 - } 289 - 290 - trigger := tangled.Pipeline_PushTriggerData{ 291 - Ref: line.Ref, 292 - OldSha: line.OldSha.String(), 293 - NewSha: line.NewSha.String(), 294 - } 295 - 296 - compiler := workflow.Compiler{ 297 - Trigger: tangled.Pipeline_TriggerMetadata{ 298 - Kind: string(workflow.TriggerKindPush), 299 - Push: &trigger, 300 - Repo: &tangled.Pipeline_TriggerRepo{ 301 - Did: repoDid, 302 - Knot: h.c.Server.Hostname, 303 - Repo: repoName, 304 - }, 305 - }, 306 - } 307 - 308 - cp := compiler.Compile(compiler.Parse(pipeline)) 309 - eventJson, err := json.Marshal(cp) 310 - if err != nil { 311 - return err 312 - } 313 - 314 - for _, e := range compiler.Diagnostics.Errors { 315 - *clientMsgs = append(*clientMsgs, e.String()) 316 - } 317 - 318 - if pushOptions.verboseCi { 319 - if compiler.Diagnostics.IsEmpty() { 320 - *clientMsgs = append(*clientMsgs, "success: pipeline compiled with no diagnostics") 321 - } 322 - 323 - for _, w := range compiler.Diagnostics.Warnings { 324 - *clientMsgs = append(*clientMsgs, w.String()) 325 - } 326 - } 327 - 328 - // do not run empty pipelines 329 - if cp.Workflows == nil { 330 - return nil 331 - } 332 - 333 - event := db.Event{ 334 - Rkey: TID(), 335 - Nsid: tangled.PipelineNSID, 336 - EventJson: string(eventJson), 337 - } 338 - 339 - return h.db.InsertEvent(event, h.n) 340 245 } 341 246 342 247 func (h *InternalHandle) emitCompareLink(
-1
knotserver/server.go
··· 79 79 jc, err := jetstream.NewJetstreamClient(c.Server.JetstreamEndpoint, "knotserver", []string{ 80 80 tangled.PublicKeyNSID, 81 81 tangled.KnotMemberNSID, 82 - tangled.RepoPullNSID, 83 82 tangled.RepoCollaboratorNSID, 84 83 }, nil, log.SubLogger(logger, "jetstream"), db, true, c.Server.LogDids) 85 84 if err != nil {
+1 -40
spindle/server.go
··· 289 289 func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 290 290 l := log.FromContext(ctx).With("handler", "processKnotStream") 291 291 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 292 - if msg.Nsid == tangled.PipelineNSID { 293 - return nil 294 - tpl := tangled.Pipeline{} 295 - err := json.Unmarshal(msg.EventJson, &tpl) 296 - if err != nil { 297 - fmt.Println("error unmarshalling", err) 298 - return err 299 - } 300 - 301 - if tpl.TriggerMetadata == nil { 302 - return fmt.Errorf("no trigger metadata found") 303 - } 304 - 305 - if tpl.TriggerMetadata.Repo == nil { 306 - return fmt.Errorf("no repo data found") 307 - } 308 - 309 - if src.Key() != tpl.TriggerMetadata.Repo.Knot { 310 - return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 311 - } 312 - 313 - // filter by repos 314 - _, err = s.db.GetRepoWithName( 315 - syntax.DID(tpl.TriggerMetadata.Repo.Did), 316 - tpl.TriggerMetadata.Repo.Repo, 317 - ) 318 - if err != nil { 319 - return fmt.Errorf("failed to get repo: %w", err) 320 - } 321 - 322 - pipelineId := models.PipelineId{ 323 - Knot: src.Key(), 324 - Rkey: msg.Rkey, 325 - } 326 - 327 - err = s.processPipeline(ctx, tpl, pipelineId) 328 - if err != nil { 329 - return err 330 - } 331 - } else if msg.Nsid == tangled.GitRefUpdateNSID { 292 + if msg.Nsid == tangled.GitRefUpdateNSID { 332 293 event := tangled.GitRefUpdate{} 333 294 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 334 295 l.Error("error unmarshalling", "err", err)