Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

nix,spindle: switch to tap from jetstream

spindle-tap will collect/stream record events from:
- users dynamically added by spindle (spindle members | collaborators of
repos using spindle)
- any users with `sh.tangled.repo.pull` collection

It might be bit inefficient considering it will also stream repo
creation events from PR authors due to second rule, but at least we now
have backfill logic and Sync 1.1 based syncing.

This inefficiency can be fixed later by modifying upstream tap cli or
embedding tap into spindle.

```
+--------- all tangled users --------+
| |
| +-- users known to spindle-tap --+ |
| | (PR author / manually added) | |
| | | |
| | +----------------------------+ | |
| | | users known to spindle | | |
| | | (members / collaborators) | | |
| | +----------------------------+ | |
| +--------------------------------+ |
+------------------------------------+
```

Close: <https://tangled.org/tangled.org/core/issues/341>

Signed-off-by: Seongmin Lee <git@boltless.me>

+479 -396
+2 -8
nix/modules/spindle.nix
··· 53 53 description = "atproto PLC directory"; 54 54 }; 55 55 56 - jetstreamEndpoint = mkOption { 57 - type = types.str; 58 - default = "wss://jetstream1.us-west.bsky.network/subscribe"; 59 - description = "Jetstream endpoint to subscribe to"; 60 - }; 61 - 62 56 dev = mkOption { 63 57 type = types.bool; 64 58 default = false; ··· 143 149 144 150 systemd.services.spindle = { 145 151 description = "spindle service"; 146 - after = ["network.target" "docker.service"]; 152 + after = ["network.target" "docker.service" "spindle-tap.service"]; 147 153 wantedBy = ["multi-user.target"]; 148 154 serviceConfig = { 149 155 LogsDirectory = "spindle"; ··· 153 159 "SPINDLE_SERVER_DB_PATH=${cfg.server.dbPath}" 154 160 "SPINDLE_SERVER_HOSTNAME=${cfg.server.hostname}" 155 161 "SPINDLE_SERVER_PLC_URL=${cfg.server.plcUrl}" 156 - "SPINDLE_SERVER_JETSTREAM_ENDPOINT=${cfg.server.jetstreamEndpoint}" 157 162 "SPINDLE_SERVER_DEV=${lib.boolToString cfg.server.dev}" 158 163 "SPINDLE_SERVER_OWNER=${cfg.server.owner}" 159 164 "SPINDLE_SERVER_MAX_JOB_COUNT=${toString cfg.server.maxJobCount}" ··· 160 167 "SPINDLE_SERVER_SECRETS_PROVIDER=${cfg.server.secrets.provider}" 161 168 "SPINDLE_SERVER_SECRETS_OPENBAO_PROXY_ADDR=${cfg.server.secrets.openbao.proxyAddr}" 162 169 "SPINDLE_SERVER_SECRETS_OPENBAO_MOUNT=${cfg.server.secrets.openbao.mount}" 170 + "SPINDLE_SERVER_TAP_URL=http://localhost:2480" 163 171 "SPINDLE_NIXERY_PIPELINES_NIXERY=${cfg.pipelines.nixery}" 164 172 "SPINDLE_NIXERY_PIPELINES_WORKFLOW_TIMEOUT=${cfg.pipelines.workflowTimeout}" 165 173 ];
+5 -1
nix/vm.nix
··· 58 58 host.port = 6555; 59 59 guest.port = 6555; 60 60 } 61 + { 62 + from = "host"; 63 + host.port = 6556; 64 + guest.port = 2480; 65 + } 61 66 ]; 62 67 sharedDirectories = { 63 68 # We can't use the 9p mounts directly for most of these ··· 106 101 owner = envVar "TANGLED_VM_SPINDLE_OWNER"; 107 102 hostname = envVarOr "TANGLED_VM_SPINDLE_HOST" "localhost:6555"; 108 103 plcUrl = plcUrl; 109 - jetstreamEndpoint = jetstream; 110 104 listenAddr = "0.0.0.0:6555"; 111 105 dev = true; 112 106 queueSize = 100;
+11 -11
spindle/config/config.go
··· 9 9 ) 10 10 11 11 type Server struct { 12 - ListenAddr string `env:"LISTEN_ADDR, default=0.0.0.0:6555"` 13 - DBPath string `env:"DB_PATH, default=spindle.db"` 14 - Hostname string `env:"HOSTNAME, required"` 15 - JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"` 16 - PlcUrl string `env:"PLC_URL, default=https://plc.directory"` 17 - Dev bool `env:"DEV, default=false"` 18 - Owner syntax.DID `env:"OWNER, required"` 19 - Secrets Secrets `env:",prefix=SECRETS_"` 20 - LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 21 - QueueSize int `env:"QUEUE_SIZE, default=100"` 22 - MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time 12 + ListenAddr string `env:"LISTEN_ADDR, default=0.0.0.0:6555"` 13 + DBPath string `env:"DB_PATH, default=spindle.db"` 14 + Hostname string `env:"HOSTNAME, required"` 15 + TapUrl string `env:"TAP_URL, required"` 16 + PlcUrl string `env:"PLC_URL, default=https://plc.directory"` 17 + Dev bool `env:"DEV, default=false"` 18 + Owner syntax.DID `env:"OWNER, required"` 19 + Secrets Secrets `env:",prefix=SECRETS_"` 20 + LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 21 + QueueSize int `env:"QUEUE_SIZE, default=100"` 22 + MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time 23 23 } 24 24 25 25 func (s Server) Did() syntax.DID {
+28 -14
spindle/db/db.go
··· 5 5 "database/sql" 6 6 "strings" 7 7 8 + "github.com/bluesky-social/indigo/atproto/syntax" 8 9 _ "github.com/mattn/go-sqlite3" 9 10 "tangled.org/core/log" 10 11 "tangled.org/core/orm" ··· 56 55 addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 57 56 58 57 unique(owner, name) 58 + ); 59 + 60 + create table if not exists repo_collaborators ( 61 + -- identifiers 62 + id integer primary key autoincrement, 63 + did text not null, 64 + rkey text not null, 65 + at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo.collaborator' || '/' || rkey) stored, 66 + 67 + repo text not null, 68 + subject text not null, 69 + 70 + addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 71 + unique(did, rkey) 59 72 ); 60 73 61 74 create table if not exists spindle_members ( ··· 135 120 return &DB{db}, nil 136 121 } 137 122 138 - func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 139 - _, err := d.Exec(` 140 - insert into _jetstream (id, last_time_us) 141 - values (1, ?) 142 - on conflict(id) do update set last_time_us = excluded.last_time_us 143 - `, lastTimeUs) 144 - return err 145 - } 146 - 147 - func (d *DB) GetLastTimeUs() (int64, error) { 148 - var lastTimeUs int64 149 - row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`) 150 - err := row.Scan(&lastTimeUs) 151 - return lastTimeUs, err 123 + func (d *DB) IsKnownDid(did syntax.DID) (bool, error) { 124 + // is spindle member / repo collaborator 125 + var exists bool 126 + err := d.QueryRow( 127 + `select exists ( 128 + select 1 from repo_collaborators where subject = ? 129 + union all 130 + select 1 from spindle_members where did = ? 131 + )`, 132 + did, 133 + did, 134 + ).Scan(&exists) 135 + return exists, err 152 136 }
-44
spindle/db/known_dids.go
··· 1 - package db 2 - 3 - func (d *DB) AddDid(did string) error { 4 - _, err := d.Exec(`insert or ignore into known_dids (did) values (?)`, did) 5 - return err 6 - } 7 - 8 - func (d *DB) RemoveDid(did string) error { 9 - _, err := d.Exec(`delete from known_dids where did = ?`, did) 10 - return err 11 - } 12 - 13 - func (d *DB) GetAllDids() ([]string, error) { 14 - var dids []string 15 - 16 - rows, err := d.Query(`select did from known_dids`) 17 - if err != nil { 18 - return nil, err 19 - } 20 - defer rows.Close() 21 - 22 - for rows.Next() { 23 - var did string 24 - if err := rows.Scan(&did); err != nil { 25 - return nil, err 26 - } 27 - dids = append(dids, did) 28 - } 29 - 30 - if err := rows.Err(); err != nil { 31 - return nil, err 32 - } 33 - 34 - return dids, nil 35 - } 36 - 37 - func (d *DB) HasKnownDids() bool { 38 - var count int 39 - err := d.QueryRow(`select count(*) from known_dids`).Scan(&count) 40 - if err != nil { 41 - return false 42 - } 43 - return count > 0 44 - }
+120 -12
spindle/db/repos.go
··· 1 1 package db 2 2 3 + import "github.com/bluesky-social/indigo/atproto/syntax" 4 + 3 5 type Repo struct { 4 - Knot string 5 - Owner string 6 - Name string 6 + Did syntax.DID 7 + Rkey syntax.RecordKey 8 + Name string 9 + Knot string 7 10 } 8 11 9 - func (d *DB) AddRepo(knot, owner, name string) error { 10 - _, err := d.Exec(`insert or ignore into repos (knot, owner, name) values (?, ?, ?)`, knot, owner, name) 12 + type RepoCollaborator struct { 13 + Did syntax.DID 14 + Rkey syntax.RecordKey 15 + Repo syntax.ATURI 16 + Subject syntax.DID 17 + } 18 + 19 + func (d *DB) PutRepo(repo *Repo) error { 20 + _, err := d.Exec( 21 + `insert or ignore into repos (did, rkey, name, knot) 22 + values (?, ?, ?, ?) 23 + on conflict(did, rkey) do update set 24 + name = excluded.name, 25 + knot = excluded.knot`, 26 + repo.Did, 27 + repo.Rkey, 28 + repo.Name, 29 + repo.Knot, 30 + ) 31 + return err 32 + } 33 + 34 + func (d *DB) DeleteRepo(did syntax.DID, rkey syntax.RecordKey) error { 35 + _, err := d.Exec( 36 + `delete from repos where did = ? and rkey = ?`, 37 + did, 38 + rkey, 39 + ) 11 40 return err 12 41 } 13 42 ··· 63 34 return knots, nil 64 35 } 65 36 66 - func (d *DB) GetRepo(knot, owner, name string) (*Repo, error) { 37 + func (d *DB) GetRepo(repoAt syntax.ATURI) (*Repo, error) { 67 38 var repo Repo 68 - 69 - query := "select knot, owner, name from repos where knot = ? and owner = ? and name = ?" 70 - err := d.DB.QueryRow(query, knot, owner, name). 71 - Scan(&repo.Knot, &repo.Owner, &repo.Name) 72 - 39 + err := d.DB.QueryRow( 40 + `select 41 + did, 42 + rkey, 43 + name, 44 + knot 45 + from repos where at_uri = ?`, 46 + repoAt, 47 + ).Scan( 48 + &repo.Did, 49 + &repo.Rkey, 50 + &repo.Name, 51 + &repo.Knot, 52 + ) 73 53 if err != nil { 74 54 return nil, err 75 55 } 76 - 77 56 return &repo, nil 57 + } 58 + 59 + func (d *DB) GetRepoWithName(did syntax.DID, name string) (*Repo, error) { 60 + var repo Repo 61 + err := d.DB.QueryRow( 62 + `select 63 + did, 64 + rkey, 65 + name, 66 + knot 67 + from repos where did = ? and name = ?`, 68 + did, 69 + name, 70 + ).Scan( 71 + &repo.Did, 72 + &repo.Rkey, 73 + &repo.Name, 74 + &repo.Knot, 75 + ) 76 + if err != nil { 77 + return nil, err 78 + } 79 + return &repo, nil 80 + } 81 + 82 + func (d *DB) PutRepoCollaborator(collaborator *RepoCollaborator) error { 83 + _, err := d.Exec( 84 + `insert into repo_collaborators (did, rkey, repo, subject) 85 + values (?, ?, ?, ?) 86 + on conflict(did, rkey) do update set 87 + repo = excluded.repo, 88 + subject = excluded.subject`, 89 + collaborator.Did, 90 + collaborator.Rkey, 91 + collaborator.Repo, 92 + collaborator.Subject, 93 + ) 94 + return err 95 + } 96 + 97 + func (d *DB) RemoveRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) error { 98 + _, err := d.Exec( 99 + `delete from repo_collaborators where did = ? and rkey = ?`, 100 + did, 101 + rkey, 102 + ) 103 + return err 104 + } 105 + 106 + func (d *DB) GetRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) (*RepoCollaborator, error) { 107 + var collaborator RepoCollaborator 108 + err := d.DB.QueryRow( 109 + `select 110 + did, 111 + rkey, 112 + repo, 113 + subject 114 + from repo_collaborators 115 + where did = ? and rkey = ?`, 116 + did, 117 + rkey, 118 + ).Scan( 119 + &collaborator.Did, 120 + &collaborator.Rkey, 121 + &collaborator.Repo, 122 + &collaborator.Subject, 123 + ) 124 + if err != nil { 125 + return nil, err 126 + } 127 + return &collaborator, nil 78 128 }
-276
spindle/ingester.go
··· 1 - package spindle 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "errors" 7 - "fmt" 8 - "time" 9 - 10 - "tangled.org/core/api/tangled" 11 - "tangled.org/core/eventconsumer" 12 - "tangled.org/core/spindle/db" 13 - 14 - comatproto "github.com/bluesky-social/indigo/api/atproto" 15 - "github.com/bluesky-social/indigo/atproto/syntax" 16 - "github.com/bluesky-social/indigo/xrpc" 17 - "github.com/bluesky-social/jetstream/pkg/models" 18 - ) 19 - 20 - type Ingester func(ctx context.Context, e *models.Event) error 21 - 22 - func (s *Spindle) ingest() Ingester { 23 - return func(ctx context.Context, e *models.Event) error { 24 - var err error 25 - defer func() { 26 - eventTime := e.TimeUS 27 - lastTimeUs := eventTime + 1 28 - if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 29 - err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 30 - } 31 - }() 32 - 33 - if e.Kind != models.EventKindCommit { 34 - return nil 35 - } 36 - 37 - switch e.Commit.Collection { 38 - case tangled.SpindleMemberNSID: 39 - err = s.ingestMember(ctx, e) 40 - case tangled.RepoNSID: 41 - err = s.ingestRepo(ctx, e) 42 - case tangled.RepoCollaboratorNSID: 43 - err = s.ingestCollaborator(ctx, e) 44 - } 45 - 46 - if err != nil { 47 - s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 48 - } 49 - 50 - return nil 51 - } 52 - } 53 - 54 - func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 55 - var err error 56 - did := e.Did 57 - rkey := e.Commit.RKey 58 - 59 - l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 60 - 61 - switch e.Commit.Operation { 62 - case models.CommitOperationCreate, models.CommitOperationUpdate: 63 - raw := e.Commit.Record 64 - record := tangled.SpindleMember{} 65 - err = json.Unmarshal(raw, &record) 66 - if err != nil { 67 - l.Error("invalid record", "error", err) 68 - return err 69 - } 70 - 71 - domain := s.cfg.Server.Hostname 72 - recordInstance := record.Instance 73 - 74 - if recordInstance != domain { 75 - l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 76 - return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 77 - } 78 - 79 - ok, err := s.e.IsSpindleMemberInviteAllowed(syntax.DID(did), s.cfg.Server.Did()) 80 - if err != nil || !ok { 81 - l.Error("failed to add member", "did", did, "error", err) 82 - return fmt.Errorf("failed to enforce permissions: %w", err) 83 - } 84 - 85 - if err := db.AddSpindleMember(s.db, db.SpindleMember{ 86 - Did: syntax.DID(did), 87 - Rkey: rkey, 88 - Instance: recordInstance, 89 - Subject: syntax.DID(record.Subject), 90 - Created: time.Now(), 91 - }); err != nil { 92 - l.Error("failed to add member", "error", err) 93 - return fmt.Errorf("failed to add member: %w", err) 94 - } 95 - 96 - if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 97 - l.Error("failed to add member", "error", err) 98 - return fmt.Errorf("failed to add member: %w", err) 99 - } 100 - l.Info("added member from firehose", "member", record.Subject) 101 - 102 - if err := s.db.AddDid(record.Subject); err != nil { 103 - l.Error("failed to add did", "error", err) 104 - return fmt.Errorf("failed to add did: %w", err) 105 - } 106 - s.jc.AddDid(record.Subject) 107 - 108 - return nil 109 - 110 - case models.CommitOperationDelete: 111 - record, err := db.GetSpindleMember(s.db, did, rkey) 112 - if err != nil { 113 - l.Error("failed to find member", "error", err) 114 - return fmt.Errorf("failed to find member: %w", err) 115 - } 116 - 117 - if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 118 - l.Error("failed to remove member", "error", err) 119 - return fmt.Errorf("failed to remove member: %w", err) 120 - } 121 - 122 - if err := s.e.RemoveSpindleMember(record.Subject, s.cfg.Server.Did()); err != nil { 123 - l.Error("failed to add member", "error", err) 124 - return fmt.Errorf("failed to add member: %w", err) 125 - } 126 - l.Info("added member from firehose", "member", record.Subject) 127 - 128 - if err := s.db.RemoveDid(record.Subject.String()); err != nil { 129 - l.Error("failed to add did", "error", err) 130 - return fmt.Errorf("failed to add did: %w", err) 131 - } 132 - s.jc.RemoveDid(record.Subject.String()) 133 - 134 - } 135 - return nil 136 - } 137 - 138 - func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 139 - var err error 140 - did := e.Did 141 - 142 - l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 143 - 144 - l.Info("ingesting repo record", "did", did) 145 - 146 - switch e.Commit.Operation { 147 - case models.CommitOperationCreate, models.CommitOperationUpdate: 148 - raw := e.Commit.Record 149 - record := tangled.Repo{} 150 - err = json.Unmarshal(raw, &record) 151 - if err != nil { 152 - l.Error("invalid record", "error", err) 153 - return err 154 - } 155 - 156 - domain := s.cfg.Server.Hostname 157 - 158 - // no spindle configured for this repo 159 - if record.Spindle == nil { 160 - l.Info("no spindle configured", "name", record.Name) 161 - return nil 162 - } 163 - 164 - // this repo did not want this spindle 165 - if *record.Spindle != domain { 166 - l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 167 - return nil 168 - } 169 - 170 - // add this repo to the watch list 171 - if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil { 172 - l.Error("failed to add repo", "error", err) 173 - return fmt.Errorf("failed to add repo: %w", err) 174 - } 175 - 176 - repoAt := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", did, e.Commit.Collection, e.Commit.RKey)) 177 - 178 - // add repo to rbac 179 - if err := s.e.AddRepo(repoAt); err != nil { 180 - l.Error("failed to add repo to enforcer", "error", err) 181 - return fmt.Errorf("failed to add repo: %w", err) 182 - } 183 - 184 - // add collaborators to rbac 185 - if err := s.fetchAndAddCollaborators(ctx, repoAt); err != nil { 186 - return err 187 - } 188 - 189 - // add this knot to the event consumer 190 - src := eventconsumer.NewKnotSource(record.Knot) 191 - s.ks.AddSource(context.Background(), src) 192 - 193 - return nil 194 - 195 - } 196 - return nil 197 - } 198 - 199 - func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 200 - var err error 201 - 202 - l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 203 - 204 - l.Info("ingesting collaborator record") 205 - 206 - switch e.Commit.Operation { 207 - case models.CommitOperationCreate, models.CommitOperationUpdate: 208 - raw := e.Commit.Record 209 - record := tangled.RepoCollaborator{} 210 - err = json.Unmarshal(raw, &record) 211 - if err != nil { 212 - l.Error("invalid record", "error", err) 213 - return err 214 - } 215 - 216 - subjectId, err := s.res.ResolveIdent(ctx, record.Subject) 217 - if err != nil || subjectId.Handle.IsInvalidHandle() { 218 - return err 219 - } 220 - 221 - repoAt, err := syntax.ParseATURI(record.Repo) 222 - if err != nil { 223 - l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 224 - return nil 225 - } 226 - 227 - // check perms for this user 228 - if ok, err := s.e.IsRepoCollaboratorInviteAllowed(syntax.DID(e.Did), repoAt); !ok || err != nil { 229 - return fmt.Errorf("insufficient permissions: %w", err) 230 - } 231 - 232 - // add collaborator to rbac 233 - if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), repoAt); err != nil { 234 - l.Error("failed to add repo to enforcer", "error", err) 235 - return fmt.Errorf("failed to add repo: %w", err) 236 - } 237 - 238 - return nil 239 - } 240 - return nil 241 - } 242 - 243 - func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, repo syntax.ATURI) error { 244 - l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 245 - 246 - l.Info("fetching and adding existing collaborators") 247 - 248 - ident, err := s.res.ResolveIdent(ctx, repo.Authority().String()) 249 - if err != nil || ident.Handle.IsInvalidHandle() { 250 - return fmt.Errorf("failed to resolve handle: %w", err) 251 - } 252 - 253 - xrpcc := xrpc.Client{ 254 - Host: ident.PDSEndpoint(), 255 - } 256 - 257 - resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, ident.DID.String(), false) 258 - if err != nil { 259 - return err 260 - } 261 - 262 - var errs error 263 - for _, r := range resp.Records { 264 - if r == nil { 265 - continue 266 - } 267 - record := r.Value.Val.(*tangled.RepoCollaborator) 268 - 269 - if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 270 - l.Error("failed to add repo to enforcer", "error", err) 271 - errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 272 - } 273 - } 274 - 275 - return errs 276 - }
+19 -30
spindle/server.go
··· 10 10 "net/http" 11 11 "sync" 12 12 13 + "github.com/bluesky-social/indigo/atproto/syntax" 13 14 "github.com/go-chi/chi/v5" 14 15 "tangled.org/core/api/tangled" 15 16 "tangled.org/core/eventconsumer" 16 17 "tangled.org/core/eventconsumer/cursor" 17 18 "tangled.org/core/idresolver" 18 - "tangled.org/core/jetstream" 19 19 "tangled.org/core/log" 20 20 "tangled.org/core/notifier" 21 21 "tangled.org/core/rbac2" ··· 27 27 "tangled.org/core/spindle/queue" 28 28 "tangled.org/core/spindle/secrets" 29 29 "tangled.org/core/spindle/xrpc" 30 + "tangled.org/core/tap" 30 31 "tangled.org/core/xrpc/serviceauth" 31 32 ) 32 33 ··· 35 34 var defaultMotd []byte 36 35 37 36 type Spindle struct { 38 - jc *jetstream.JetstreamClient 37 + tap *tap.Client 39 38 db *db.DB 40 39 e *rbac2.Enforcer 41 40 l *slog.Logger ··· 94 93 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 95 94 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 96 95 97 - collections := []string{ 98 - tangled.SpindleMemberNSID, 99 - tangled.RepoNSID, 100 - tangled.RepoCollaboratorNSID, 101 - } 102 - jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 103 - if err != nil { 104 - return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 105 - } 106 - jc.AddDid(cfg.Server.Owner.String()) 107 - 108 - // Check if the spindle knows about any Dids; 109 - dids, err := d.GetAllDids() 110 - if err != nil { 111 - return nil, fmt.Errorf("failed to get all dids: %w", err) 112 - } 113 - for _, d := range dids { 114 - jc.AddDid(d) 115 - } 96 + tap := tap.NewClient(cfg.Server.TapUrl, "") 116 97 117 98 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 118 99 119 100 spindle := &Spindle{ 120 - jc: jc, 101 + tap: &tap, 121 102 e: e, 122 103 db: d, 123 104 l: logger, ··· 121 138 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 122 139 if err != nil { 123 140 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 124 - } 125 - 126 - err = jc.StartJetstream(ctx, spindle.ingest()) 127 - if err != nil { 128 - return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 129 141 } 130 142 131 143 // for each incoming sh.tangled.pipeline, we execute ··· 204 226 s.ks.Start(ctx) 205 227 }() 206 228 229 + // ensure server owner is tracked 230 + if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 231 + return err 232 + } 233 + 234 + go func() { 235 + s.l.Info("starting tap stream consumer") 236 + s.tap.Connect(ctx, &tap.SimpleIndexer{ 237 + EventHandler: s.processEvent, 238 + }) 239 + }() 240 + 207 241 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 208 242 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 209 243 } ··· 296 306 } 297 307 298 308 // filter by repos 299 - _, err = s.db.GetRepo( 300 - tpl.TriggerMetadata.Repo.Knot, 301 - tpl.TriggerMetadata.Repo.Did, 309 + _, err = s.db.GetRepoWithName( 310 + syntax.DID(tpl.TriggerMetadata.Repo.Did), 302 311 tpl.TriggerMetadata.Repo.Repo, 303 312 ) 304 313 if err != nil {
+294
spindle/tap.go
··· 1 + package spindle 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.org/core/api/tangled" 11 + "tangled.org/core/eventconsumer" 12 + "tangled.org/core/spindle/db" 13 + "tangled.org/core/tap" 14 + ) 15 + 16 + func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { 17 + l := s.l.With("component", "tapIndexer") 18 + 19 + var err error 20 + switch evt.Type { 21 + case tap.EvtRecord: 22 + switch evt.Record.Collection.String() { 23 + case tangled.SpindleMemberNSID: 24 + err = s.processMember(ctx, evt) 25 + case tangled.RepoNSID: 26 + err = s.processRepo(ctx, evt) 27 + case tangled.RepoCollaboratorNSID: 28 + err = s.processCollaborator(ctx, evt) 29 + case tangled.RepoPullNSID: 30 + err = s.processPull(ctx, evt) 31 + } 32 + case tap.EvtIdentity: 33 + // no-op 34 + } 35 + 36 + if err != nil { 37 + l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 38 + return err 39 + } 40 + return nil 41 + } 42 + 43 + // NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 44 + 45 + func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { 46 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 47 + 48 + l.Info("processing spindle.member record") 49 + 50 + // only listen to members 51 + if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 52 + l.Warn("forbidden request: member invite not allowed", "did", evt.Record.Did, "error", err) 53 + return nil 54 + } 55 + 56 + switch evt.Record.Action { 57 + case tap.RecordCreateAction, tap.RecordUpdateAction: 58 + record := tangled.SpindleMember{} 59 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 60 + return fmt.Errorf("parsing record: %w", err) 61 + } 62 + 63 + domain := s.cfg.Server.Hostname 64 + if record.Instance != domain { 65 + l.Info("domain mismatch", "domain", record.Instance, "expected", domain) 66 + return nil 67 + } 68 + 69 + created, err := time.Parse(record.CreatedAt, time.RFC3339) 70 + if err != nil { 71 + created = time.Now() 72 + } 73 + if err := db.AddSpindleMember(s.db, db.SpindleMember{ 74 + Did: evt.Record.Did, 75 + Rkey: evt.Record.Rkey.String(), 76 + Instance: record.Instance, 77 + Subject: syntax.DID(record.Subject), 78 + Created: created, 79 + }); err != nil { 80 + l.Error("failed to add member", "error", err) 81 + return fmt.Errorf("adding member to db: %w", err) 82 + } 83 + if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 84 + return fmt.Errorf("adding member to rbac: %w", err) 85 + } 86 + if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 87 + return fmt.Errorf("adding did to tap: %w", err) 88 + } 89 + 90 + l.Info("added member", "member", record.Subject) 91 + return nil 92 + 93 + case tap.RecordDeleteAction: 94 + var ( 95 + did = evt.Record.Did.String() 96 + rkey = evt.Record.Rkey.String() 97 + ) 98 + member, err := db.GetSpindleMember(s.db, did, rkey) 99 + if err != nil { 100 + return fmt.Errorf("finding member: %w", err) 101 + } 102 + 103 + if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 104 + return fmt.Errorf("removing member from db: %w", err) 105 + } 106 + if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil { 107 + return fmt.Errorf("removing member from rbac: %w", err) 108 + } 109 + if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 110 + return fmt.Errorf("removing did from tap: %w", err) 111 + } 112 + 113 + l.Info("removed member", "member", member.Subject) 114 + return nil 115 + } 116 + return nil 117 + } 118 + 119 + func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { 120 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 121 + 122 + l.Info("processing repo.collaborator record") 123 + 124 + // only listen to members 125 + if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 126 + l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) 127 + return nil 128 + } 129 + 130 + switch evt.Record.Action { 131 + case tap.RecordCreateAction, tap.RecordUpdateAction: 132 + record := tangled.RepoCollaborator{} 133 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 134 + l.Error("invalid record", "err", err) 135 + return fmt.Errorf("parsing record: %w", err) 136 + } 137 + 138 + // retry later if target repo is not ingested yet 139 + if _, err := s.db.GetRepo(syntax.ATURI(record.Repo)); err != nil { 140 + l.Warn("target repo is not ingested yet", "repo", record.Repo, "err", err) 141 + return fmt.Errorf("target repo is unknown") 142 + } 143 + 144 + // check perms for this user 145 + if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil { 146 + l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) 147 + return nil 148 + } 149 + 150 + if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{ 151 + Did: evt.Record.Did, 152 + Rkey: evt.Record.Rkey, 153 + Repo: syntax.ATURI(record.Repo), 154 + Subject: syntax.DID(record.Subject), 155 + }); err != nil { 156 + return fmt.Errorf("adding collaborator to db: %w", err) 157 + } 158 + if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 159 + return fmt.Errorf("adding collaborator to rbac: %w", err) 160 + } 161 + if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 162 + return fmt.Errorf("adding did to tap: %w", err) 163 + } 164 + 165 + l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 166 + return nil 167 + 168 + case tap.RecordDeleteAction: 169 + // get existing collaborator 170 + collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 171 + if err != nil { 172 + return fmt.Errorf("failed to get existing collaborator info: %w", err) 173 + } 174 + 175 + // check perms for this user 176 + if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil { 177 + l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) 178 + return nil 179 + } 180 + 181 + if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil { 182 + return fmt.Errorf("removing collaborator from db: %w", err) 183 + } 184 + if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil { 185 + return fmt.Errorf("removing collaborator from rbac: %w", err) 186 + } 187 + if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 188 + return fmt.Errorf("removing did from tap: %w", err) 189 + } 190 + 191 + l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) 192 + return nil 193 + } 194 + return nil 195 + } 196 + 197 + func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { 198 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 199 + 200 + l.Info("processing repo record") 201 + 202 + // only listen to members 203 + if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 204 + l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) 205 + return nil 206 + } 207 + 208 + switch evt.Record.Action { 209 + case tap.RecordCreateAction, tap.RecordUpdateAction: 210 + record := tangled.Repo{} 211 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 212 + return fmt.Errorf("parsing record: %w", err) 213 + } 214 + 215 + domain := s.cfg.Server.Hostname 216 + if record.Spindle == nil || *record.Spindle != domain { 217 + if record.Spindle == nil { 218 + l.Info("spindle isn't configured", "name", record.Name) 219 + } else { 220 + l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 221 + } 222 + if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 223 + return fmt.Errorf("deleting repo from db: %w", err) 224 + } 225 + return nil 226 + } 227 + 228 + if err := s.db.PutRepo(&db.Repo{ 229 + Did: evt.Record.Did, 230 + Rkey: evt.Record.Rkey, 231 + Name: record.Name, 232 + Knot: record.Knot, 233 + }); err != nil { 234 + return fmt.Errorf("adding repo to db: %w", err) 235 + } 236 + 237 + if err := s.e.AddRepo(evt.Record.AtUri()); err != nil { 238 + return fmt.Errorf("adding repo to rbac") 239 + } 240 + 241 + // add this knot to the event consumer 242 + src := eventconsumer.NewKnotSource(record.Knot) 243 + s.ks.AddSource(context.Background(), src) 244 + 245 + l.Info("added repo", "repo", evt.Record.AtUri()) 246 + return nil 247 + 248 + case tap.RecordDeleteAction: 249 + // check perms for this user 250 + if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 251 + l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err) 252 + return nil 253 + } 254 + 255 + if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 256 + return fmt.Errorf("deleting repo from db: %w", err) 257 + } 258 + 259 + if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil { 260 + return fmt.Errorf("deleting repo from rbac: %w", err) 261 + } 262 + 263 + l.Info("deleted repo", "repo", evt.Record.AtUri()) 264 + return nil 265 + } 266 + return nil 267 + } 268 + 269 + func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { 270 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 271 + 272 + l.Info("processing pull record") 273 + 274 + switch evt.Record.Action { 275 + case tap.RecordCreateAction, tap.RecordUpdateAction: 276 + // TODO 277 + case tap.RecordDeleteAction: 278 + // TODO 279 + } 280 + return nil 281 + } 282 + 283 + func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error { 284 + known, err := s.db.IsKnownDid(syntax.DID(did)) 285 + if err != nil { 286 + return fmt.Errorf("ensuring did known state: %w", err) 287 + } 288 + if !known { 289 + if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 290 + return fmt.Errorf("removing did from tap: %w", err) 291 + } 292 + } 293 + return nil 294 + }