Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 737 lines 20 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cloudflare" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/indexer" 20 "tangled.org/core/appview/mentions" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/notify" 23 dbnotify "tangled.org/core/appview/notify/db" 24 lognotify "tangled.org/core/appview/notify/logging" 25 phnotify "tangled.org/core/appview/notify/posthog" 26 whnotify "tangled.org/core/appview/notify/webhook" 27 "tangled.org/core/appview/oauth" 28 "tangled.org/core/appview/pages" 29 "tangled.org/core/appview/reporesolver" 30 "tangled.org/core/appview/validator" 31 xrpcclient "tangled.org/core/appview/xrpcclient" 32 "tangled.org/core/consts" 33 "tangled.org/core/eventconsumer" 34 "tangled.org/core/idresolver" 35 "tangled.org/core/jetstream" 36 "tangled.org/core/log" 37 tlog "tangled.org/core/log" 38 "tangled.org/core/orm" 39 "tangled.org/core/rbac" 40 "tangled.org/core/tid" 41 42 comatproto "github.com/bluesky-social/indigo/api/atproto" 43 "github.com/bluesky-social/indigo/atproto/atclient" 44 "github.com/bluesky-social/indigo/atproto/syntax" 45 lexutil "github.com/bluesky-social/indigo/lex/util" 46 "github.com/bluesky-social/indigo/xrpc" 47 48 "github.com/go-chi/chi/v5" 49 "github.com/posthog/posthog-go" 50) 51 52type State struct { 53 db *db.DB 54 notifier notify.Notifier 55 indexer *indexer.Indexer 56 oauth *oauth.OAuth 57 enforcer *rbac.Enforcer 58 pages *pages.Pages 59 idResolver *idresolver.Resolver 60 mentionsResolver *mentions.Resolver 61 posthog posthog.Client 62 jc *jetstream.JetstreamClient 63 config *config.Config 64 repoResolver *reporesolver.RepoResolver 65 knotstream *eventconsumer.Consumer 66 spindlestream *eventconsumer.Consumer 67 logger *slog.Logger 68 validator *validator.Validator 69 cfClient *cloudflare.Client 70} 71 72func Make(ctx context.Context, config *config.Config) (*State, error) { 73 logger := tlog.FromContext(ctx) 74 75 d, err := db.Make(ctx, config.Core.DbPath) 76 if err != nil { 77 return nil, fmt.Errorf("failed to create db: %w", err) 78 } 79 80 indexer := indexer.New(log.SubLogger(logger, "indexer")) 81 err = indexer.Init(ctx, d) 82 if err != nil { 83 return nil, fmt.Errorf("failed to create indexer: %w", err) 84 } 85 86 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 87 if err != nil { 88 return nil, fmt.Errorf("failed to create enforcer: %w", err) 89 } 90 91 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 92 if err != nil { 93 logger.Error("failed to create redis resolver", "err", err) 94 res = idresolver.DefaultResolver(config.Plc.PLCURL) 95 } 96 97 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 98 if err != nil { 99 return nil, fmt.Errorf("failed to create posthog client: %w", err) 100 } 101 102 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 103 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 104 if err != nil { 105 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 106 } 107 validator := validator.New(d, res, enforcer) 108 109 repoResolver := reporesolver.New(config, enforcer, d) 110 111 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 112 113 wrapper := db.DbWrapper{Execer: d} 114 jc, err := jetstream.NewJetstreamClient( 115 config.Jetstream.Endpoint, 116 "appview", 117 []string{ 118 tangled.GraphFollowNSID, 119 tangled.FeedStarNSID, 120 tangled.PublicKeyNSID, 121 tangled.RepoArtifactNSID, 122 tangled.ActorProfileNSID, 123 tangled.KnotMemberNSID, 124 tangled.SpindleMemberNSID, 125 tangled.SpindleNSID, 126 tangled.StringNSID, 127 tangled.RepoIssueNSID, 128 tangled.RepoIssueCommentNSID, 129 tangled.LabelDefinitionNSID, 130 tangled.LabelOpNSID, 131 }, 132 nil, 133 tlog.SubLogger(logger, "jetstream"), 134 wrapper, 135 false, 136 137 // in-memory filter is inapplicable to appview so 138 // we'll never log dids anyway. 139 false, 140 ) 141 if err != nil { 142 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 143 } 144 145 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 146 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 147 } 148 149 ingester := appview.Ingester{ 150 Db: wrapper, 151 Enforcer: enforcer, 152 IdResolver: res, 153 Config: config, 154 Logger: log.SubLogger(logger, "ingester"), 155 Validator: validator, 156 } 157 err = jc.StartJetstream(ctx, ingester.Ingest()) 158 if err != nil { 159 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 160 } 161 162 var notifiers []notify.Notifier 163 164 // Always add the database notifier 165 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 166 167 // Add other notifiers in production only 168 if !config.Core.Dev { 169 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 170 } 171 notifiers = append(notifiers, indexer) 172 173 notifiers = append(notifiers, whnotify.NewNotifier(d)) 174 175 notifier := notify.NewMergedNotifier(notifiers) 176 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 177 178 var cfClient *cloudflare.Client 179 if config.Cloudflare.ApiToken != "" { 180 cfClient, err = cloudflare.New(config) 181 if err != nil { 182 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 183 cfClient = nil 184 } 185 } 186 187 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 188 if err != nil { 189 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 190 } 191 knotstream.Start(ctx) 192 193 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 194 if err != nil { 195 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 196 } 197 spindlestream.Start(ctx) 198 199 state := &State{ 200 db: d, 201 notifier: notifier, 202 indexer: indexer, 203 oauth: oauth, 204 enforcer: enforcer, 205 pages: pages, 206 idResolver: res, 207 mentionsResolver: mentionsResolver, 208 posthog: posthog, 209 jc: jc, 210 config: config, 211 repoResolver: repoResolver, 212 knotstream: knotstream, 213 spindlestream: spindlestream, 214 logger: logger, 215 validator: validator, 216 cfClient: cfClient, 217 } 218 219 // fetch initial bluesky posts if configured 220 go fetchBskyPosts(ctx, res, config, d, logger) 221 222 return state, nil 223} 224 225func (s *State) Close() error { 226 // other close up logic goes here 227 return s.db.Close() 228} 229 230func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 231 w.Header().Set("Content-Type", "text/plain") 232 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 233 234 securityTxt := `Contact: mailto:security@tangled.org 235Preferred-Languages: en 236Canonical: https://tangled.org/.well-known/security.txt 237Expires: 2030-01-01T21:59:00.000Z 238` 239 w.Write([]byte(securityTxt)) 240} 241 242func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 243 w.Header().Set("Content-Type", "text/plain") 244 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 245 246 robotsTxt := `# Hello, Tanglers! 247User-agent: * 248Allow: / 249Disallow: /*/*/settings 250Disallow: /settings 251Disallow: /*/*/compare 252Disallow: /*/*/fork 253 254Crawl-delay: 1 255` 256 w.Write([]byte(robotsTxt)) 257} 258 259func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 260 user := s.oauth.GetMultiAccountUser(r) 261 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 262 LoggedInUser: user, 263 }) 264} 265 266func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 267 user := s.oauth.GetMultiAccountUser(r) 268 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 269 LoggedInUser: user, 270 }) 271} 272 273func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 274 user := s.oauth.GetMultiAccountUser(r) 275 s.pages.Brand(w, pages.BrandParams{ 276 LoggedInUser: user, 277 }) 278} 279 280func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 281 user := s.oauth.GetMultiAccountUser(r) 282 if user == nil { 283 return 284 } 285 286 l := s.logger.With("handler", "UpgradeBanner") 287 l = l.With("did", user.Active.Did) 288 289 regs, err := db.GetRegistrations( 290 s.db, 291 orm.FilterEq("did", user.Active.Did), 292 orm.FilterEq("needs_upgrade", 1), 293 ) 294 if err != nil { 295 l.Error("non-fatal: failed to get registrations", "err", err) 296 } 297 298 spindles, err := db.GetSpindles( 299 r.Context(), 300 s.db, 301 orm.FilterEq("owner", user.Active.Did), 302 orm.FilterEq("needs_upgrade", 1), 303 ) 304 if err != nil { 305 l.Error("non-fatal: failed to get spindles", "err", err) 306 } 307 308 if regs == nil && spindles == nil { 309 return 310 } 311 312 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 313 Registrations: regs, 314 Spindles: spindles, 315 }) 316} 317 318func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 319 user := chi.URLParam(r, "user") 320 user = strings.TrimPrefix(user, "@") 321 322 if user == "" { 323 w.WriteHeader(http.StatusBadRequest) 324 return 325 } 326 327 id, err := s.idResolver.ResolveIdent(r.Context(), user) 328 if err != nil { 329 w.WriteHeader(http.StatusInternalServerError) 330 return 331 } 332 333 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 334 if err != nil { 335 s.logger.Error("failed to get public keys", "err", err) 336 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 337 return 338 } 339 340 if len(pubKeys) == 0 { 341 w.WriteHeader(http.StatusNoContent) 342 return 343 } 344 345 for _, k := range pubKeys { 346 key := strings.TrimRight(k.Key, "\n") 347 fmt.Fprintln(w, key) 348 } 349} 350 351func validateRepoName(name string) error { 352 // check for path traversal attempts 353 if name == "." || name == ".." || 354 strings.Contains(name, "/") || strings.Contains(name, "\\") { 355 return fmt.Errorf("Repository name contains invalid path characters") 356 } 357 358 // check for sequences that could be used for traversal when normalized 359 if strings.Contains(name, "./") || strings.Contains(name, "../") || 360 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 361 return fmt.Errorf("Repository name contains invalid path sequence") 362 } 363 364 // then continue with character validation 365 for _, char := range name { 366 if !((char >= 'a' && char <= 'z') || 367 (char >= 'A' && char <= 'Z') || 368 (char >= '0' && char <= '9') || 369 char == '-' || char == '_' || char == '.') { 370 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 371 } 372 } 373 374 // additional check to prevent multiple sequential dots 375 if strings.Contains(name, "..") { 376 return fmt.Errorf("Repository name cannot contain sequential dots") 377 } 378 379 // if all checks pass 380 return nil 381} 382 383func stripGitExt(name string) string { 384 return strings.TrimSuffix(name, ".git") 385} 386 387func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 388 switch r.Method { 389 case http.MethodGet: 390 user := s.oauth.GetMultiAccountUser(r) 391 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 392 if err != nil { 393 s.pages.Notice(w, "repo", "Invalid user account.") 394 return 395 } 396 397 s.pages.NewRepo(w, pages.NewRepoParams{ 398 LoggedInUser: user, 399 Knots: knots, 400 }) 401 402 case http.MethodPost: 403 l := s.logger.With("handler", "NewRepo") 404 405 user := s.oauth.GetMultiAccountUser(r) 406 l = l.With("did", user.Active.Did) 407 408 // form validation 409 domain := r.FormValue("domain") 410 if domain == "" { 411 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 412 return 413 } 414 l = l.With("knot", domain) 415 416 repoName := r.FormValue("name") 417 if repoName == "" { 418 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 419 return 420 } 421 422 if err := validateRepoName(repoName); err != nil { 423 s.pages.Notice(w, "repo", err.Error()) 424 return 425 } 426 repoName = stripGitExt(repoName) 427 l = l.With("repoName", repoName) 428 429 defaultBranch := r.FormValue("branch") 430 if defaultBranch == "" { 431 defaultBranch = "main" 432 } 433 l = l.With("defaultBranch", defaultBranch) 434 435 description := r.FormValue("description") 436 if len([]rune(description)) > 140 { 437 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 438 return 439 } 440 441 // ACL validation 442 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 443 if err != nil || !ok { 444 l.Info("unauthorized") 445 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 446 return 447 } 448 449 // Check for existing repos 450 existingRepo, err := db.GetRepo( 451 s.db, 452 orm.FilterEq("did", user.Active.Did), 453 orm.FilterEq("name", repoName), 454 ) 455 if err == nil && existingRepo != nil { 456 l.Info("repo exists") 457 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 458 return 459 } 460 461 rkey := tid.TID() 462 463 client, err := s.oauth.ServiceClient( 464 r, 465 oauth.WithService(domain), 466 oauth.WithLxm(tangled.RepoCreateNSID), 467 oauth.WithDev(s.config.Core.Dev), 468 ) 469 if err != nil { 470 l.Error("service auth failed", "err", err) 471 s.pages.Notice(w, "repo", "Failed to reach knot server.") 472 return 473 } 474 475 input := &tangled.RepoCreate_Input{ 476 Rkey: rkey, 477 Name: repoName, 478 DefaultBranch: &defaultBranch, 479 } 480 createResp, xe := tangled.RepoCreate( 481 r.Context(), 482 client, 483 input, 484 ) 485 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 486 l.Error("xrpc error", "xe", xe) 487 s.pages.Notice(w, "repo", err.Error()) 488 return 489 } 490 491 var repoDid string 492 if createResp != nil && createResp.RepoDid != nil { 493 repoDid = *createResp.RepoDid 494 } 495 if repoDid == "" { 496 l.Error("knot returned empty repo DID") 497 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 498 return 499 } 500 501 repo := &models.Repo{ 502 Did: user.Active.Did, 503 Name: repoName, 504 Knot: domain, 505 Rkey: rkey, 506 Description: description, 507 Created: time.Now(), 508 Labels: s.config.Label.DefaultLabelDefs, 509 RepoDid: repoDid, 510 } 511 record := repo.AsRecord() 512 513 cleanupKnot := func() { 514 go func() { 515 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 516 for attempt, delay := range delays { 517 time.Sleep(delay) 518 deleteClient, dErr := s.oauth.ServiceClient( 519 r, 520 oauth.WithService(domain), 521 oauth.WithLxm(tangled.RepoDeleteNSID), 522 oauth.WithDev(s.config.Core.Dev), 523 ) 524 if dErr != nil { 525 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 526 continue 527 } 528 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 529 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 530 Did: user.Active.Did, 531 Name: repoName, 532 Rkey: rkey, 533 }); dErr != nil { 534 cancel() 535 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 536 continue 537 } 538 cancel() 539 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 540 return 541 } 542 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 543 "did", user.Active.Did, "repo", repoName, "knot", domain) 544 }() 545 } 546 547 atpClient, err := s.oauth.AuthorizedClient(r) 548 if err != nil { 549 l.Info("PDS write failed", "err", err) 550 cleanupKnot() 551 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 552 return 553 } 554 555 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 556 Collection: tangled.RepoNSID, 557 Repo: user.Active.Did, 558 Rkey: rkey, 559 Record: &lexutil.LexiconTypeDecoder{ 560 Val: &record, 561 }, 562 }) 563 if err != nil { 564 l.Info("PDS write failed", "err", err) 565 cleanupKnot() 566 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 567 return 568 } 569 570 aturi := atresp.Uri 571 l = l.With("aturi", aturi) 572 l.Info("wrote to PDS") 573 574 tx, err := s.db.BeginTx(r.Context(), nil) 575 if err != nil { 576 l.Info("txn failed", "err", err) 577 s.pages.Notice(w, "repo", "Failed to save repository information.") 578 return 579 } 580 581 rollback := func() { 582 err1 := tx.Rollback() 583 err2 := s.enforcer.E.LoadPolicy() 584 err3 := rollbackRecord(context.Background(), aturi, atpClient) 585 586 if errors.Is(err1, sql.ErrTxDone) { 587 err1 = nil 588 } 589 590 if errs := errors.Join(err1, err2, err3); errs != nil { 591 l.Error("failed to rollback changes", "errs", errs) 592 } 593 594 if aturi != "" { 595 cleanupKnot() 596 } 597 } 598 defer rollback() 599 600 err = db.AddRepo(tx, repo) 601 if err != nil { 602 l.Error("db write failed", "err", err) 603 s.pages.Notice(w, "repo", "Failed to save repository information.") 604 return 605 } 606 607 rbacPath := repo.RepoIdentifier() 608 err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath) 609 if err != nil { 610 l.Error("acl setup failed", "err", err) 611 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 612 return 613 } 614 615 err = tx.Commit() 616 if err != nil { 617 l.Error("txn commit failed", "err", err) 618 http.Error(w, err.Error(), http.StatusInternalServerError) 619 return 620 } 621 622 err = s.enforcer.E.SavePolicy() 623 if err != nil { 624 l.Error("acl save failed", "err", err) 625 http.Error(w, err.Error(), http.StatusInternalServerError) 626 return 627 } 628 629 aturi = "" 630 631 s.notifier.NewRepo(r.Context(), repo) 632 if repoDid != "" { 633 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 634 } else { 635 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 636 } 637 } 638} 639 640// this is used to rollback changes made to the PDS 641// 642// it is a no-op if the provided ATURI is empty 643func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 644 if aturi == "" { 645 return nil 646 } 647 648 parsed := syntax.ATURI(aturi) 649 650 collection := parsed.Collection().String() 651 repo := parsed.Authority().String() 652 rkey := parsed.RecordKey().String() 653 654 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 655 Collection: collection, 656 Repo: repo, 657 Rkey: rkey, 658 }) 659 return err 660} 661 662func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 663 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 664 if err != nil { 665 return err 666 } 667 // already present 668 if len(defaultLabels) == len(defaults) { 669 return nil 670 } 671 672 labelDefs, err := models.FetchLabelDefs(r, defaults) 673 if err != nil { 674 return err 675 } 676 677 // Insert each label definition to the database 678 for _, labelDef := range labelDefs { 679 _, err = db.AddLabelDefinition(e, &labelDef) 680 if err != nil { 681 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 682 } 683 } 684 685 return nil 686} 687 688func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 689 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 690 if err != nil { 691 logger.Error("failed to resolve tangled.org DID", "err", err) 692 return 693 } 694 695 pdsEndpoint := resolved.PDSEndpoint() 696 if pdsEndpoint == "" { 697 logger.Error("no PDS endpoint found for tangled.sh DID") 698 return 699 } 700 701 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 702 if err != nil { 703 logger.Error("failed to create appassword session... skipping fetch", "err", err) 704 return 705 } 706 707 client := xrpc.Client{ 708 Auth: &xrpc.AuthInfo{ 709 AccessJwt: session.AccessJwt, 710 Did: session.Did, 711 }, 712 Host: session.PdsEndpoint, 713 } 714 715 l := log.SubLogger(logger, "bluesky") 716 717 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 718 defer ticker.Stop() 719 720 for { 721 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 722 if err != nil { 723 l.Error("failed to fetch bluesky posts", "err", err) 724 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 725 l.Error("failed to insert bluesky posts", "err", err) 726 } else { 727 l.Info("inserted bluesky posts", "count", len(posts)) 728 } 729 730 select { 731 case <-ticker.C: 732 case <-ctx.Done(): 733 l.Info("stopping bluesky updater") 734 return 735 } 736 } 737}