this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

one round of cleanup

+18 -208
+9 -101
archiver/archiver.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "database/sql" 6 5 "errors" 7 6 "fmt" 8 7 "log/slog" ··· 13 12 "sync" 14 13 "time" 15 14 16 - "github.com/bluesky-social/indigo/api/atproto" 17 15 "github.com/bluesky-social/indigo/bgs" 18 16 "github.com/bluesky-social/indigo/carstore" 19 17 "github.com/bluesky-social/indigo/did" 20 18 "github.com/bluesky-social/indigo/events" 21 - "github.com/bluesky-social/indigo/handles" 22 19 "github.com/bluesky-social/indigo/models" 23 20 "github.com/bluesky-social/indigo/repomgr" 24 21 "github.com/bluesky-social/indigo/util/svcutil" ··· 41 38 db *gorm.DB 42 39 slurper *bgs.Slurper 43 40 didr did.Resolver 44 - 45 - hr handles.HandleResolver 46 41 47 42 // TODO: work on doing away with this flag in favor of more pluggable 48 43 // pieces that abstract the need for explicit ssl checks ··· 101 96 } 102 97 } 103 98 104 - func NewArchiver(db *gorm.DB, ix *Indexer, repoman *repomgr.RepoManager, didr did.Resolver, rf *RepoFetcher, hr handles.HandleResolver, config *ArchiverConfig) (*Archiver, error) { 99 + func NewArchiver(db *gorm.DB, ix *Indexer, repoman *repomgr.RepoManager, didr did.Resolver, rf *RepoFetcher, config *ArchiverConfig) (*Archiver, error) { 105 100 if config == nil { 106 101 config = DefaultArchiverConfig() 107 102 } ··· 116 111 Index: ix, 117 112 db: db, 118 113 119 - hr: hr, 120 114 repoman: repoman, 121 115 didr: didr, 122 116 ssl: config.SSL, ··· 165 159 166 160 type User struct { 167 161 gorm.Model 168 - ID models.Uid `gorm:"primarykey;index:idx_user_id_active,where:taken_down = false AND tombstoned = false"` 169 - Handle sql.NullString `gorm:"index"` 170 - DisplayName string 171 - Did string `gorm:"uniqueindex"` 172 - Following int64 173 - Followers int64 174 - Posts int64 175 - Type string 176 - PDS uint 177 - ValidHandle bool `gorm:"default:true"` 162 + ID models.Uid `gorm:"primarykey;index:idx_user_id_active,where:taken_down = false AND tombstoned = false"` 163 + Did string `gorm:"uniqueindex"` 164 + PDS uint 178 165 179 166 // TakenDown is set to true if the user in question has been taken down. 180 167 // A user in this state will have all future events related to it dropped ··· 257 244 return nil, err 258 245 } 259 246 260 - /* 261 - // TODO: ignore this because we're just gonna get the stream from the relay anyways 262 - ban, err := s.domainIsBanned(ctx, durl.Host) 263 - if err != nil { 264 - return nil, fmt.Errorf("failed to check pds ban status: %w", err) 265 - } 266 - 267 - if ban { 268 - return nil, fmt.Errorf("cannot create user on pds with banned domain") 269 - } 270 - */ 271 - 272 247 c := &xrpc.Client{Host: durl.String()} 273 248 s.Index.ApplyPDSClientSettings(c) 274 249 275 250 if peering.ID == 0 { 276 - // TODO: the case of handling a new user on a new PDS probably requires more thought 277 - cfg, err := atproto.ServerDescribeServer(ctx, c) 278 - if err != nil { 279 - // TODO: failing this shouldn't halt our indexing 280 - return nil, fmt.Errorf("failed to check unrecognized pds: %w", err) 281 - } 282 - 283 - // since handles can be anything, checking against this list doesn't matter... 284 - _ = cfg 285 - 286 - // TODO: could check other things, a valid response is good enough for now 287 251 peering.Host = durl.Host 288 252 peering.SSL = (durl.Scheme == "https") 289 253 peering.CrawlRateLimit = float64(s.slurper.DefaultCrawlLimit) ··· 338 302 return nil, fmt.Errorf("user has no 'known as' field in their DID document") 339 303 } 340 304 341 - hurl, err := url.Parse(doc.AlsoKnownAs[0]) 342 - if err != nil { 343 - return nil, err 344 - } 345 - 346 - s.log.Debug("creating external user", "did", did, "handle", hurl.Host, "pds", peering.ID) 347 - 348 - handle := hurl.Host 349 - 350 - validHandle := true 351 - 352 - resdid, err := s.hr.ResolveHandleToDid(ctx, handle) 353 - if err != nil { 354 - s.log.Error("failed to resolve users claimed handle on pds", "handle", handle, "err", err) 355 - validHandle = false 356 - } 357 - 358 - if resdid != did { 359 - s.log.Error("claimed handle did not match servers response", "resdid", resdid, "did", did) 360 - validHandle = false 361 - } 305 + s.log.Debug("creating external user", "did", did, "pds", peering.ID) 362 306 363 307 s.extUserLk.Lock() 364 308 defer s.extUserLk.Unlock() 365 309 366 310 exu, err := s.Index.LookupUserByDid(ctx, did) 367 311 if err == nil { 368 - s.log.Debug("lost the race to create a new user", "did", did, "handle", handle, "existing_hand", exu.Handle) 312 + s.log.Debug("lost the race to create a new user", "did", did) 369 313 if exu.PDS != peering.ID { 370 314 // User is now on a different PDS, update 371 315 if err := s.db.Model(User{}).Where("id = ?", exu.ID).Update("pds", peering.ID).Error; err != nil { ··· 375 319 exu.PDS = peering.ID 376 320 } 377 321 378 - if exu.Handle.String != handle { 379 - // Users handle has changed, update 380 - if err := s.db.Model(User{}).Where("id = ?", exu.ID).Update("handle", handle).Error; err != nil { 381 - return nil, fmt.Errorf("failed to update users handle: %w", err) 382 - } 383 - 384 - exu.Handle = sql.NullString{String: handle, Valid: true} 385 - } 386 322 return exu, nil 387 323 } 388 324 ··· 390 326 return nil, err 391 327 } 392 328 393 - // TODO: request this users info from their server to fill out our data... 394 329 u := User{ 395 - Did: did, 396 - PDS: peering.ID, 397 - ValidHandle: validHandle, 398 - } 399 - if validHandle { 400 - u.Handle = sql.NullString{String: handle, Valid: true} 330 + Did: did, 331 + PDS: peering.ID, 401 332 } 402 333 403 334 if err := s.db.Create(&u).Error; err != nil { 404 - // If the new user's handle conflicts with an existing user, 405 - // since we just validated the handle for this user, we'll assume 406 - // the existing user no longer has control of the handle 407 - if errors.Is(err, gorm.ErrDuplicatedKey) { 408 - // Get the UID of the existing user 409 - var existingUser User 410 - if err := s.db.Find(&existingUser, "handle = ?", handle).Error; err != nil { 411 - return nil, fmt.Errorf("failed to find existing user: %w", err) 412 - } 413 - 414 - // Set the existing user's handle to NULL and set the valid_handle flag to false 415 - if err := s.db.Model(User{}).Where("id = ?", existingUser.ID).Update("handle", nil).Update("valid_handle", false).Error; err != nil { 416 - return nil, fmt.Errorf("failed to update outdated user's handle: %w", err) 417 - } 418 - 419 - // Create the new user 420 - if err := s.db.Create(&u).Error; err != nil { 421 - return nil, fmt.Errorf("failed to create user after handle conflict: %w", err) 422 - } 423 - 424 - s.userCache.Remove(did) 425 - } else { 426 - return nil, fmt.Errorf("failed to create other pds user: %w", err) 427 - } 335 + return nil, fmt.Errorf("failed to create other pds user: %w", err) 428 336 } 429 337 430 338 successfullyCreated = true
+4 -64
archiver/loader.go
··· 5 5 "errors" 6 6 "fmt" 7 7 "log/slog" 8 - "time" 9 8 10 - comatproto "github.com/bluesky-social/indigo/api/atproto" 11 9 "github.com/bluesky-social/indigo/did" 12 10 "github.com/bluesky-social/indigo/events" 13 - lexutil "github.com/bluesky-social/indigo/lex/util" 14 11 "github.com/bluesky-social/indigo/models" 15 - "github.com/bluesky-social/indigo/repomgr" 16 - "github.com/bluesky-social/indigo/util" 17 12 "github.com/bluesky-social/indigo/xrpc" 18 13 19 14 "go.opentelemetry.io/otel" ··· 26 21 type Indexer struct { 27 22 db *gorm.DB 28 23 29 - addEvent AddEventFunc 30 - didr did.Resolver 24 + didr did.Resolver 31 25 32 26 Crawler *CrawlDispatcher 33 27 34 - SendRemoteFollow func(context.Context, string, uint) error 35 28 CreateExternalUser func(context.Context, string) (*User, error) 36 29 ApplyPDSClientSettings func(*xrpc.Client) 37 30 ··· 40 33 41 34 type AddEventFunc func(ctx context.Context, ev *events.XRPCStreamEvent) error 42 35 43 - func NewIndexer(db *gorm.DB, addEvent AddEventFunc, didr did.Resolver, fetcher *RepoFetcher, crawl bool) (*Indexer, error) { 36 + func NewIndexer(db *gorm.DB, didr did.Resolver, fetcher *RepoFetcher, crawl bool) (*Indexer, error) { 44 37 ix := &Indexer{ 45 - db: db, 46 - addEvent: addEvent, 47 - didr: didr, 48 - SendRemoteFollow: func(context.Context, string, uint) error { 49 - return nil 50 - }, 38 + db: db, 39 + didr: didr, 51 40 ApplyPDSClientSettings: func(*xrpc.Client) {}, 52 41 log: slog.Default().With("system", "indexer"), 53 42 } ··· 69 58 if ix.Crawler != nil { 70 59 ix.Crawler.Shutdown() 71 60 } 72 - } 73 - 74 - func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error { 75 - ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent") 76 - defer span.End() 77 - 78 - ix.log.Debug("Handling Repo Event!", "uid", evt.User) 79 - 80 - outops := make([]*comatproto.SyncSubscribeRepos_RepoOp, 0, len(evt.Ops)) 81 - for _, op := range evt.Ops { 82 - link := (*lexutil.LexLink)(op.RecCid) 83 - outops = append(outops, &comatproto.SyncSubscribeRepos_RepoOp{ 84 - Path: op.Collection + "/" + op.Rkey, 85 - Action: string(op.Kind), 86 - Cid: link, 87 - }) 88 - } 89 - 90 - did, err := ix.DidForUser(ctx, evt.User) 91 - if err != nil { 92 - return err 93 - } 94 - 95 - toobig := false 96 - slice := evt.RepoSlice 97 - if len(slice) > MaxEventSliceLength || len(outops) > MaxOpsSliceLength { 98 - slice = []byte{} 99 - outops = nil 100 - toobig = true 101 - } 102 - 103 - ix.log.Debug("Sending event", "did", did) 104 - if err := ix.addEvent(ctx, &events.XRPCStreamEvent{ 105 - RepoCommit: &comatproto.SyncSubscribeRepos_Commit{ 106 - Repo: did, 107 - Blocks: slice, 108 - Rev: evt.Rev, 109 - Since: evt.Since, 110 - Commit: lexutil.LexLink(evt.NewRoot), 111 - Time: time.Now().Format(util.ISO8601), 112 - Ops: outops, 113 - TooBig: toobig, 114 - }, 115 - PrivUid: evt.User, 116 - }); err != nil { 117 - return fmt.Errorf("failed to push event: %s", err) 118 - } 119 - 120 - return nil 121 61 } 122 62 123 63 func (ix *Indexer) GetUserOrMissing(ctx context.Context, did string) (*User, error) {
+5 -43
cmd/archit/main.go
··· 4 4 "context" 5 5 "fmt" 6 6 "log/slog" 7 - "net/http" 8 7 _ "net/http/pprof" 9 8 "net/url" 10 9 "os" ··· 15 14 "time" 16 15 17 16 archiver "github.com/bluesky-social/indigo/archiver" 18 - "github.com/bluesky-social/indigo/carstore" 19 17 "github.com/bluesky-social/indigo/did" 20 - "github.com/bluesky-social/indigo/events" 21 - "github.com/bluesky-social/indigo/handles" 22 18 "github.com/bluesky-social/indigo/indexer" 23 19 "github.com/bluesky-social/indigo/plc" 24 20 "github.com/bluesky-social/indigo/repomgr" 21 + "github.com/bluesky-social/indigo/repostore" 25 22 "github.com/bluesky-social/indigo/util" 26 23 "github.com/bluesky-social/indigo/util/cliutil" 27 24 "github.com/bluesky-social/indigo/xrpc" ··· 41 38 ) 42 39 43 40 var log = slog.Default().With("system", "archiver") 44 - 45 - func init() { 46 - // control log level using, eg, GOLOG_LOG_LEVEL=debug 47 - //logging.SetAllLoggers(logging.LevelDebug) 48 - } 49 41 50 42 func main() { 51 43 if err := run(os.Args); err != nil { ··· 185 177 } 186 178 187 179 app.Action = runBigsky 188 - return app.Run(os.Args) 180 + return app.Run(args) 189 181 } 190 182 191 183 func setupOTEL(cctx *cli.Context) error { ··· 296 288 } 297 289 } 298 290 299 - cstore, err := carstore.NewCarStore(db, csdirs) 291 + cstore, err := repostore.NewCarStore(db, csdirs) 300 292 if err != nil { 301 293 return err 302 294 } ··· 335 327 336 328 rf := archiver.NewRepoFetcher(db, repoman, cctx.Int("max-fetch-concurrency")) 337 329 338 - nullfunc := func(ctx context.Context, evt *events.XRPCStreamEvent) error { 339 - return nil 340 - } 341 - 342 - ix, err := archiver.NewIndexer(db, nullfunc, cachedidr, rf, true) 330 + ix, err := archiver.NewIndexer(db, cachedidr, rf, true) 343 331 if err != nil { 344 332 return err 345 333 } ··· 364 352 } 365 353 rf.ApplyPDSClientSettings = ix.ApplyPDSClientSettings 366 354 367 - repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) { 368 - if err := ix.HandleRepoEvent(ctx, evt); err != nil { 369 - slog.Error("failed to handle repo event", "err", err) 370 - } 371 - }, false) 372 - 373 - prodHR, err := handles.NewProdHandleResolver(100_000, cctx.String("resolve-address"), cctx.Bool("force-dns-udp")) 374 - if err != nil { 375 - return fmt.Errorf("failed to set up handle resolver: %w", err) 376 - } 377 - if rlskip != "" { 378 - prodHR.ReqMod = func(req *http.Request, host string) error { 379 - if strings.HasSuffix(host, ".bsky.social") { 380 - req.Header.Set("x-ratelimit-bypass", rlskip) 381 - } 382 - return nil 383 - } 384 - } 385 - 386 - var hr handles.HandleResolver = prodHR 387 - if cctx.StringSlice("handle-resolver-hosts") != nil { 388 - hr = &handles.TestHandleResolver{ 389 - TrialHosts: cctx.StringSlice("handle-resolver-hosts"), 390 - } 391 - } 392 - 393 355 slog.Info("constructing archiver") 394 356 archiverConfig := archiver.DefaultArchiverConfig() 395 357 archiverConfig.SSL = !cctx.Bool("crawl-insecure-ws") ··· 412 374 archiverConfig.NextCrawlers = nextCrawlerUrls 413 375 } 414 376 415 - arc, err := archiver.NewArchiver(db, ix, repoman, cachedidr, rf, hr, archiverConfig) 377 + arc, err := archiver.NewArchiver(db, ix, repoman, cachedidr, rf, archiverConfig) 416 378 if err != nil { 417 379 return err 418 380 }