Monorepo for Tangled tangled.org
761
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 303 lines 8.6 kB view raw
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "time" 9 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/eventconsumer" 12 "tangled.org/core/rbac" 13 "tangled.org/core/spindle/db" 14 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 "github.com/bluesky-social/indigo/atproto/identity" 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 "github.com/bluesky-social/indigo/xrpc" 19 "github.com/bluesky-social/jetstream/pkg/models" 20 securejoin "github.com/cyphar/filepath-securejoin" 21) 22 23type Ingester func(ctx context.Context, e *models.Event) error 24 25func (s *Spindle) ingest() Ingester { 26 return func(ctx context.Context, e *models.Event) error { 27 if e.Kind != models.EventKindCommit { 28 return nil 29 } 30 31 var err error 32 switch e.Commit.Collection { 33 case tangled.SpindleMemberNSID: 34 err = s.ingestMember(ctx, e) 35 case tangled.RepoNSID: 36 err = s.ingestRepo(ctx, e) 37 case tangled.RepoCollaboratorNSID: 38 err = s.ingestCollaborator(ctx, e) 39 } 40 41 if err != nil { 42 s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "err", err) 43 } 44 45 lastTimeUs := e.TimeUS + 1 46 if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 47 s.l.Error("failed to save cursor", "err", saveErr) 48 } 49 50 return nil 51 } 52} 53 54func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 55 var err error 56 did := e.Did 57 rkey := e.Commit.RKey 58 59 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 60 61 switch e.Commit.Operation { 62 case models.CommitOperationCreate, models.CommitOperationUpdate: 63 raw := e.Commit.Record 64 record := tangled.SpindleMember{} 65 err = json.Unmarshal(raw, &record) 66 if err != nil { 67 l.Error("invalid record", "error", err) 68 return err 69 } 70 71 domain := s.cfg.Server.Hostname 72 recordInstance := record.Instance 73 74 if recordInstance != domain { 75 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 76 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 77 } 78 79 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 80 if err != nil || !ok { 81 l.Error("failed to add member", "did", did, "error", err) 82 return fmt.Errorf("failed to enforce permissions: %w", err) 83 } 84 85 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 86 Did: syntax.DID(did), 87 Rkey: rkey, 88 Instance: recordInstance, 89 Subject: syntax.DID(record.Subject), 90 Created: time.Now(), 91 }); err != nil { 92 l.Error("failed to add member", "error", err) 93 return fmt.Errorf("failed to add member: %w", err) 94 } 95 96 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 97 l.Error("failed to add member", "error", err) 98 return fmt.Errorf("failed to add member: %w", err) 99 } 100 l.Info("added member from firehose", "member", record.Subject) 101 102 if err := s.db.AddDid(record.Subject); err != nil { 103 l.Error("failed to add did", "error", err) 104 return fmt.Errorf("failed to add did: %w", err) 105 } 106 s.jc.AddDid(record.Subject) 107 108 return nil 109 110 case models.CommitOperationDelete: 111 record, err := db.GetSpindleMember(s.db, did, rkey) 112 if err != nil { 113 l.Error("failed to find member", "error", err) 114 return fmt.Errorf("failed to find member: %w", err) 115 } 116 117 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 118 l.Error("failed to remove member", "error", err) 119 return fmt.Errorf("failed to remove member: %w", err) 120 } 121 122 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil { 123 l.Error("failed to add member", "error", err) 124 return fmt.Errorf("failed to add member: %w", err) 125 } 126 l.Info("added member from firehose", "member", record.Subject) 127 128 if err := s.db.RemoveDid(record.Subject.String()); err != nil { 129 l.Error("failed to add did", "error", err) 130 return fmt.Errorf("failed to add did: %w", err) 131 } 132 s.jc.RemoveDid(record.Subject.String()) 133 134 } 135 return nil 136} 137 138func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 139 var err error 140 did := e.Did 141 142 l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 143 144 l.Info("ingesting repo record", "did", did) 145 146 switch e.Commit.Operation { 147 case models.CommitOperationCreate, models.CommitOperationUpdate: 148 raw := e.Commit.Record 149 record := tangled.Repo{} 150 err = json.Unmarshal(raw, &record) 151 if err != nil { 152 l.Error("invalid record", "error", err) 153 return err 154 } 155 156 domain := s.cfg.Server.Hostname 157 158 // no spindle configured for this repo 159 if record.Spindle == nil { 160 l.Info("no spindle configured", "name", record.Name) 161 return nil 162 } 163 164 // this repo did not want this spindle 165 if *record.Spindle != domain { 166 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 167 return nil 168 } 169 170 // add this repo to the watch list 171 if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil { 172 l.Error("failed to add repo", "error", err) 173 return fmt.Errorf("failed to add repo: %w", err) 174 } 175 176 didSlashRepo, err := securejoin.SecureJoin(did, record.Name) 177 if err != nil { 178 return err 179 } 180 181 // add repo to rbac 182 if err := s.e.AddRepo(did, rbac.ThisServer, didSlashRepo); err != nil { 183 l.Error("failed to add repo to enforcer", "error", err) 184 return fmt.Errorf("failed to add repo: %w", err) 185 } 186 187 // add collaborators to rbac 188 owner, err := s.res.ResolveIdent(ctx, did) 189 if err != nil || owner.Handle.IsInvalidHandle() { 190 return err 191 } 192 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil { 193 return err 194 } 195 196 // add this knot to the event consumer 197 src := eventconsumer.NewKnotSource(record.Knot) 198 s.ks.AddSource(context.Background(), src) 199 200 return nil 201 202 } 203 return nil 204} 205 206func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 207 var err error 208 209 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 210 211 l.Info("ingesting collaborator record") 212 213 switch e.Commit.Operation { 214 case models.CommitOperationCreate, models.CommitOperationUpdate: 215 raw := e.Commit.Record 216 record := tangled.RepoCollaborator{} 217 err = json.Unmarshal(raw, &record) 218 if err != nil { 219 l.Error("invalid record", "error", err) 220 return err 221 } 222 223 subjectId, err := s.res.ResolveIdent(ctx, record.Subject) 224 if err != nil || subjectId.Handle.IsInvalidHandle() { 225 return err 226 } 227 228 var rbacResource string 229 var ownerDid string 230 switch { 231 case record.Repo != nil: 232 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 233 if parseErr != nil { 234 l.Info("rejecting record, invalid repoAt", "repoAt", *record.Repo) 235 return nil 236 } 237 238 owner, resolveErr := s.res.ResolveIdent(ctx, repoAt.Authority().String()) 239 if resolveErr != nil || owner.Handle.IsInvalidHandle() { 240 return fmt.Errorf("failed to resolve handle: %w", resolveErr) 241 } 242 243 xrpcc := xrpc.Client{ 244 Host: owner.PDSEndpoint(), 245 } 246 247 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 248 if getErr != nil { 249 return getErr 250 } 251 252 repo := resp.Value.Val.(*tangled.Repo) 253 rbacResource, _ = securejoin.SecureJoin(owner.DID.String(), repo.Name) 254 ownerDid = owner.DID.String() 255 256 default: 257 l.Info("rejecting collaborator record without repo at-uri (spindle RBAC keyed by owner/name)") 258 return nil 259 } 260 261 if ok, err := s.e.IsCollaboratorInviteAllowed(ownerDid, rbac.ThisServer, rbacResource); !ok || err != nil { 262 return fmt.Errorf("insufficient permissions: %w", err) 263 } 264 265 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, rbacResource); err != nil { 266 l.Error("failed to add collaborator to enforcer", "error", err) 267 return fmt.Errorf("failed to add collaborator: %w", err) 268 } 269 270 return nil 271 } 272 return nil 273} 274 275func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error { 276 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 277 278 l.Info("fetching and adding existing collaborators") 279 280 xrpcc := xrpc.Client{ 281 Host: owner.PDSEndpoint(), 282 } 283 284 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false) 285 if err != nil { 286 return err 287 } 288 289 var errs error 290 for _, r := range resp.Records { 291 if r == nil { 292 continue 293 } 294 record := r.Value.Val.(*tangled.RepoCollaborator) 295 296 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 297 l.Error("failed to add repo to enforcer", "error", err) 298 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 299 } 300 } 301 302 return errs 303}