Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 5b01f9975e7ec4e828e57202bd7cb87ff4e137df 738 lines 20 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cloudflare" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/indexer" 20 "tangled.org/core/appview/mentions" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/notify" 23 dbnotify "tangled.org/core/appview/notify/db" 24 lognotify "tangled.org/core/appview/notify/logging" 25 phnotify "tangled.org/core/appview/notify/posthog" 26 whnotify "tangled.org/core/appview/notify/webhook" 27 "tangled.org/core/appview/oauth" 28 "tangled.org/core/appview/pages" 29 "tangled.org/core/appview/reporesolver" 30 "tangled.org/core/appview/validator" 31 xrpcclient "tangled.org/core/appview/xrpcclient" 32 "tangled.org/core/consts" 33 "tangled.org/core/eventconsumer" 34 "tangled.org/core/idresolver" 35 "tangled.org/core/jetstream" 36 "tangled.org/core/log" 37 tlog "tangled.org/core/log" 38 "tangled.org/core/orm" 39 "tangled.org/core/rbac" 40 "tangled.org/core/tid" 41 42 comatproto "github.com/bluesky-social/indigo/api/atproto" 43 "github.com/bluesky-social/indigo/atproto/atclient" 44 "github.com/bluesky-social/indigo/atproto/syntax" 45 lexutil "github.com/bluesky-social/indigo/lex/util" 46 "github.com/bluesky-social/indigo/xrpc" 47 48 "github.com/go-chi/chi/v5" 49 "github.com/posthog/posthog-go" 50) 51 52type State struct { 53 db *db.DB 54 notifier notify.Notifier 55 indexer *indexer.Indexer 56 oauth *oauth.OAuth 57 enforcer *rbac.Enforcer 58 pages *pages.Pages 59 idResolver *idresolver.Resolver 60 mentionsResolver *mentions.Resolver 61 posthog posthog.Client 62 jc *jetstream.JetstreamClient 63 config *config.Config 64 repoResolver *reporesolver.RepoResolver 65 knotstream *eventconsumer.Consumer 66 spindlestream *eventconsumer.Consumer 67 logger *slog.Logger 68 validator *validator.Validator 69 cfClient *cloudflare.Client 70} 71 72func Make(ctx context.Context, config *config.Config) (*State, error) { 73 logger := tlog.FromContext(ctx) 74 75 d, err := db.Make(ctx, config.Core.DbPath) 76 if err != nil { 77 return nil, fmt.Errorf("failed to create db: %w", err) 78 } 79 80 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 81 err = indexer.Init(ctx) 82 if err != nil { 83 return nil, fmt.Errorf("failed to create indexer: %w", err) 84 } 85 86 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 87 if err != nil { 88 return nil, fmt.Errorf("failed to create enforcer: %w", err) 89 } 90 91 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 92 if err != nil { 93 logger.Error("failed to create redis resolver", "err", err) 94 res = idresolver.DefaultResolver(config.Plc.PLCURL) 95 } 96 97 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 98 if err != nil { 99 return nil, fmt.Errorf("failed to create posthog client: %w", err) 100 } 101 102 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 103 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 104 if err != nil { 105 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 106 } 107 validator := validator.New(d, res, enforcer) 108 109 repoResolver := reporesolver.New(config, enforcer, d) 110 111 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 112 113 wrapper := db.DbWrapper{Execer: d} 114 jc, err := jetstream.NewJetstreamClient( 115 config.Jetstream.Endpoint, 116 "appview", 117 []string{ 118 tangled.GraphFollowNSID, 119 tangled.FeedStarNSID, 120 tangled.PublicKeyNSID, 121 tangled.RepoArtifactNSID, 122 tangled.ActorProfileNSID, 123 tangled.KnotMemberNSID, 124 tangled.SpindleMemberNSID, 125 tangled.SpindleNSID, 126 tangled.KnotNSID, 127 tangled.StringNSID, 128 tangled.RepoIssueNSID, 129 tangled.RepoIssueCommentNSID, 130 tangled.LabelDefinitionNSID, 131 tangled.LabelOpNSID, 132 }, 133 nil, 134 tlog.SubLogger(logger, "jetstream"), 135 wrapper, 136 false, 137 138 // in-memory filter is inapplicable to appview so 139 // we'll never log dids anyway. 140 false, 141 ) 142 if err != nil { 143 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 144 } 145 146 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 147 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 148 } 149 150 ingester := appview.Ingester{ 151 Db: wrapper, 152 Enforcer: enforcer, 153 IdResolver: res, 154 Config: config, 155 Logger: log.SubLogger(logger, "ingester"), 156 Validator: validator, 157 } 158 err = jc.StartJetstream(ctx, ingester.Ingest()) 159 if err != nil { 160 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 161 } 162 163 var notifiers []notify.Notifier 164 165 // Always add the database notifier 166 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 167 168 // Add other notifiers in production only 169 if !config.Core.Dev { 170 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 171 } 172 notifiers = append(notifiers, indexer) 173 174 notifiers = append(notifiers, whnotify.NewNotifier(d)) 175 176 notifier := notify.NewMergedNotifier(notifiers) 177 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 178 179 var cfClient *cloudflare.Client 180 if config.Cloudflare.ApiToken != "" { 181 cfClient, err = cloudflare.New(config) 182 if err != nil { 183 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 184 cfClient = nil 185 } 186 } 187 188 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 189 if err != nil { 190 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 191 } 192 knotstream.Start(ctx) 193 194 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 195 if err != nil { 196 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 197 } 198 spindlestream.Start(ctx) 199 200 state := &State{ 201 db: d, 202 notifier: notifier, 203 indexer: indexer, 204 oauth: oauth, 205 enforcer: enforcer, 206 pages: pages, 207 idResolver: res, 208 mentionsResolver: mentionsResolver, 209 posthog: posthog, 210 jc: jc, 211 config: config, 212 repoResolver: repoResolver, 213 knotstream: knotstream, 214 spindlestream: spindlestream, 215 logger: logger, 216 validator: validator, 217 cfClient: cfClient, 218 } 219 220 // fetch initial bluesky posts if configured 221 go fetchBskyPosts(ctx, res, config, d, logger) 222 223 return state, nil 224} 225 226func (s *State) Close() error { 227 // other close up logic goes here 228 return s.db.Close() 229} 230 231func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 232 w.Header().Set("Content-Type", "text/plain") 233 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 234 235 securityTxt := `Contact: mailto:security@tangled.org 236Preferred-Languages: en 237Canonical: https://tangled.org/.well-known/security.txt 238Expires: 2030-01-01T21:59:00.000Z 239` 240 w.Write([]byte(securityTxt)) 241} 242 243func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 244 w.Header().Set("Content-Type", "text/plain") 245 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 246 247 robotsTxt := `# Hello, Tanglers! 248User-agent: * 249Allow: / 250Disallow: /*/*/settings 251Disallow: /settings 252Disallow: /*/*/compare 253Disallow: /*/*/fork 254 255Crawl-delay: 1 256` 257 w.Write([]byte(robotsTxt)) 258} 259 260func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 261 user := s.oauth.GetMultiAccountUser(r) 262 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 263 LoggedInUser: user, 264 }) 265} 266 267func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 268 user := s.oauth.GetMultiAccountUser(r) 269 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 270 LoggedInUser: user, 271 }) 272} 273 274func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 275 user := s.oauth.GetMultiAccountUser(r) 276 s.pages.Brand(w, pages.BrandParams{ 277 LoggedInUser: user, 278 }) 279} 280 281func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 282 user := s.oauth.GetMultiAccountUser(r) 283 if user == nil { 284 return 285 } 286 287 l := s.logger.With("handler", "UpgradeBanner") 288 l = l.With("did", user.Active.Did) 289 290 regs, err := db.GetRegistrations( 291 s.db, 292 orm.FilterEq("did", user.Active.Did), 293 orm.FilterEq("needs_upgrade", 1), 294 ) 295 if err != nil { 296 l.Error("non-fatal: failed to get registrations", "err", err) 297 } 298 299 spindles, err := db.GetSpindles( 300 r.Context(), 301 s.db, 302 orm.FilterEq("owner", user.Active.Did), 303 orm.FilterEq("needs_upgrade", 1), 304 ) 305 if err != nil { 306 l.Error("non-fatal: failed to get spindles", "err", err) 307 } 308 309 if regs == nil && spindles == nil { 310 return 311 } 312 313 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 314 Registrations: regs, 315 Spindles: spindles, 316 }) 317} 318 319func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 320 user := chi.URLParam(r, "user") 321 user = strings.TrimPrefix(user, "@") 322 323 if user == "" { 324 w.WriteHeader(http.StatusBadRequest) 325 return 326 } 327 328 id, err := s.idResolver.ResolveIdent(r.Context(), user) 329 if err != nil { 330 w.WriteHeader(http.StatusInternalServerError) 331 return 332 } 333 334 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 335 if err != nil { 336 s.logger.Error("failed to get public keys", "err", err) 337 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 338 return 339 } 340 341 if len(pubKeys) == 0 { 342 w.WriteHeader(http.StatusNoContent) 343 return 344 } 345 346 for _, k := range pubKeys { 347 key := strings.TrimRight(k.Key, "\n") 348 fmt.Fprintln(w, key) 349 } 350} 351 352func validateRepoName(name string) error { 353 // check for path traversal attempts 354 if name == "." || name == ".." || 355 strings.Contains(name, "/") || strings.Contains(name, "\\") { 356 return fmt.Errorf("Repository name contains invalid path characters") 357 } 358 359 // check for sequences that could be used for traversal when normalized 360 if strings.Contains(name, "./") || strings.Contains(name, "../") || 361 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 362 return fmt.Errorf("Repository name contains invalid path sequence") 363 } 364 365 // then continue with character validation 366 for _, char := range name { 367 if !((char >= 'a' && char <= 'z') || 368 (char >= 'A' && char <= 'Z') || 369 (char >= '0' && char <= '9') || 370 char == '-' || char == '_' || char == '.') { 371 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 372 } 373 } 374 375 // additional check to prevent multiple sequential dots 376 if strings.Contains(name, "..") { 377 return fmt.Errorf("Repository name cannot contain sequential dots") 378 } 379 380 // if all checks pass 381 return nil 382} 383 384func stripGitExt(name string) string { 385 return strings.TrimSuffix(name, ".git") 386} 387 388func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 389 switch r.Method { 390 case http.MethodGet: 391 user := s.oauth.GetMultiAccountUser(r) 392 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 393 if err != nil { 394 s.pages.Notice(w, "repo", "Invalid user account.") 395 return 396 } 397 398 s.pages.NewRepo(w, pages.NewRepoParams{ 399 LoggedInUser: user, 400 Knots: knots, 401 }) 402 403 case http.MethodPost: 404 l := s.logger.With("handler", "NewRepo") 405 406 user := s.oauth.GetMultiAccountUser(r) 407 l = l.With("did", user.Active.Did) 408 409 // form validation 410 domain := r.FormValue("domain") 411 if domain == "" { 412 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 413 return 414 } 415 l = l.With("knot", domain) 416 417 repoName := r.FormValue("name") 418 if repoName == "" { 419 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 420 return 421 } 422 423 if err := validateRepoName(repoName); err != nil { 424 s.pages.Notice(w, "repo", err.Error()) 425 return 426 } 427 repoName = stripGitExt(repoName) 428 l = l.With("repoName", repoName) 429 430 defaultBranch := r.FormValue("branch") 431 if defaultBranch == "" { 432 defaultBranch = "main" 433 } 434 l = l.With("defaultBranch", defaultBranch) 435 436 description := r.FormValue("description") 437 if len([]rune(description)) > 140 { 438 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 439 return 440 } 441 442 // ACL validation 443 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 444 if err != nil || !ok { 445 l.Info("unauthorized") 446 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 447 return 448 } 449 450 // Check for existing repos 451 existingRepo, err := db.GetRepo( 452 s.db, 453 orm.FilterEq("did", user.Active.Did), 454 orm.FilterEq("name", repoName), 455 ) 456 if err == nil && existingRepo != nil { 457 l.Info("repo exists") 458 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 459 return 460 } 461 462 rkey := tid.TID() 463 464 client, err := s.oauth.ServiceClient( 465 r, 466 oauth.WithService(domain), 467 oauth.WithLxm(tangled.RepoCreateNSID), 468 oauth.WithDev(s.config.Core.Dev), 469 ) 470 if err != nil { 471 l.Error("service auth failed", "err", err) 472 s.pages.Notice(w, "repo", "Failed to reach knot server.") 473 return 474 } 475 476 input := &tangled.RepoCreate_Input{ 477 Rkey: rkey, 478 Name: repoName, 479 DefaultBranch: &defaultBranch, 480 } 481 createResp, xe := tangled.RepoCreate( 482 r.Context(), 483 client, 484 input, 485 ) 486 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 487 l.Error("xrpc error", "xe", xe) 488 s.pages.Notice(w, "repo", err.Error()) 489 return 490 } 491 492 var repoDid string 493 if createResp != nil && createResp.RepoDid != nil { 494 repoDid = *createResp.RepoDid 495 } 496 if repoDid == "" { 497 l.Error("knot returned empty repo DID") 498 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 499 return 500 } 501 502 repo := &models.Repo{ 503 Did: user.Active.Did, 504 Name: repoName, 505 Knot: domain, 506 Rkey: rkey, 507 Description: description, 508 Created: time.Now(), 509 Labels: s.config.Label.DefaultLabelDefs, 510 RepoDid: repoDid, 511 } 512 record := repo.AsRecord() 513 514 cleanupKnot := func() { 515 go func() { 516 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 517 for attempt, delay := range delays { 518 time.Sleep(delay) 519 deleteClient, dErr := s.oauth.ServiceClient( 520 r, 521 oauth.WithService(domain), 522 oauth.WithLxm(tangled.RepoDeleteNSID), 523 oauth.WithDev(s.config.Core.Dev), 524 ) 525 if dErr != nil { 526 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 527 continue 528 } 529 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 530 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 531 Did: user.Active.Did, 532 Name: repoName, 533 Rkey: rkey, 534 }); dErr != nil { 535 cancel() 536 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 537 continue 538 } 539 cancel() 540 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 541 return 542 } 543 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 544 "did", user.Active.Did, "repo", repoName, "knot", domain) 545 }() 546 } 547 548 atpClient, err := s.oauth.AuthorizedClient(r) 549 if err != nil { 550 l.Info("PDS write failed", "err", err) 551 cleanupKnot() 552 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 553 return 554 } 555 556 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 557 Collection: tangled.RepoNSID, 558 Repo: user.Active.Did, 559 Rkey: rkey, 560 Record: &lexutil.LexiconTypeDecoder{ 561 Val: &record, 562 }, 563 }) 564 if err != nil { 565 l.Info("PDS write failed", "err", err) 566 cleanupKnot() 567 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 568 return 569 } 570 571 aturi := atresp.Uri 572 l = l.With("aturi", aturi) 573 l.Info("wrote to PDS") 574 575 tx, err := s.db.BeginTx(r.Context(), nil) 576 if err != nil { 577 l.Info("txn failed", "err", err) 578 s.pages.Notice(w, "repo", "Failed to save repository information.") 579 return 580 } 581 582 rollback := func() { 583 err1 := tx.Rollback() 584 err2 := s.enforcer.E.LoadPolicy() 585 err3 := rollbackRecord(context.Background(), aturi, atpClient) 586 587 if errors.Is(err1, sql.ErrTxDone) { 588 err1 = nil 589 } 590 591 if errs := errors.Join(err1, err2, err3); errs != nil { 592 l.Error("failed to rollback changes", "errs", errs) 593 } 594 595 if aturi != "" { 596 cleanupKnot() 597 } 598 } 599 defer rollback() 600 601 err = db.AddRepo(tx, repo) 602 if err != nil { 603 l.Error("db write failed", "err", err) 604 s.pages.Notice(w, "repo", "Failed to save repository information.") 605 return 606 } 607 608 rbacPath := repo.RepoIdentifier() 609 err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath) 610 if err != nil { 611 l.Error("acl setup failed", "err", err) 612 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 613 return 614 } 615 616 err = tx.Commit() 617 if err != nil { 618 l.Error("txn commit failed", "err", err) 619 http.Error(w, err.Error(), http.StatusInternalServerError) 620 return 621 } 622 623 err = s.enforcer.E.SavePolicy() 624 if err != nil { 625 l.Error("acl save failed", "err", err) 626 http.Error(w, err.Error(), http.StatusInternalServerError) 627 return 628 } 629 630 aturi = "" 631 632 s.notifier.NewRepo(r.Context(), repo) 633 if repoDid != "" { 634 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 635 } else { 636 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 637 } 638 } 639} 640 641// this is used to rollback changes made to the PDS 642// 643// it is a no-op if the provided ATURI is empty 644func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 645 if aturi == "" { 646 return nil 647 } 648 649 parsed := syntax.ATURI(aturi) 650 651 collection := parsed.Collection().String() 652 repo := parsed.Authority().String() 653 rkey := parsed.RecordKey().String() 654 655 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 656 Collection: collection, 657 Repo: repo, 658 Rkey: rkey, 659 }) 660 return err 661} 662 663func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 664 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 665 if err != nil { 666 return err 667 } 668 // already present 669 if len(defaultLabels) == len(defaults) { 670 return nil 671 } 672 673 labelDefs, err := models.FetchLabelDefs(r, defaults) 674 if err != nil { 675 return err 676 } 677 678 // Insert each label definition to the database 679 for _, labelDef := range labelDefs { 680 _, err = db.AddLabelDefinition(e, &labelDef) 681 if err != nil { 682 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 683 } 684 } 685 686 return nil 687} 688 689func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 690 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 691 if err != nil { 692 logger.Error("failed to resolve tangled.org DID", "err", err) 693 return 694 } 695 696 pdsEndpoint := resolved.PDSEndpoint() 697 if pdsEndpoint == "" { 698 logger.Error("no PDS endpoint found for tangled.sh DID") 699 return 700 } 701 702 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 703 if err != nil { 704 logger.Error("failed to create appassword session... skipping fetch", "err", err) 705 return 706 } 707 708 client := xrpc.Client{ 709 Auth: &xrpc.AuthInfo{ 710 AccessJwt: session.AccessJwt, 711 Did: session.Did, 712 }, 713 Host: session.PdsEndpoint, 714 } 715 716 l := log.SubLogger(logger, "bluesky") 717 718 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 719 defer ticker.Stop() 720 721 for { 722 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 723 if err != nil { 724 l.Error("failed to fetch bluesky posts", "err", err) 725 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 726 l.Error("failed to insert bluesky posts", "err", err) 727 } else { 728 l.Info("inserted bluesky posts", "count", len(posts)) 729 } 730 731 select { 732 case <-ticker.C: 733 case <-ctx.Done(): 734 l.Info("stopping bluesky updater") 735 return 736 } 737 } 738}