Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at fix/knot-version-string 825 lines 23 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/mentions" 23 "tangled.org/core/appview/models" 24 "tangled.org/core/appview/notify" 25 dbnotify "tangled.org/core/appview/notify/db" 26 lognotify "tangled.org/core/appview/notify/logging" 27 phnotify "tangled.org/core/appview/notify/posthog" 28 whnotify "tangled.org/core/appview/notify/webhook" 29 "tangled.org/core/appview/oauth" 30 "tangled.org/core/appview/pages" 31 "tangled.org/core/appview/reporesolver" 32 "tangled.org/core/appview/validator" 33 xrpcclient "tangled.org/core/appview/xrpcclient" 34 "tangled.org/core/consts" 35 "tangled.org/core/eventconsumer" 36 "tangled.org/core/idresolver" 37 "tangled.org/core/jetstream" 38 "tangled.org/core/log" 39 tlog "tangled.org/core/log" 40 "tangled.org/core/orm" 41 "tangled.org/core/rbac" 42 "tangled.org/core/tid" 43 44 comatproto "github.com/bluesky-social/indigo/api/atproto" 45 "github.com/bluesky-social/indigo/atproto/atclient" 46 "github.com/bluesky-social/indigo/atproto/syntax" 47 lexutil "github.com/bluesky-social/indigo/lex/util" 48 "github.com/bluesky-social/indigo/xrpc" 49 50 "github.com/go-chi/chi/v5" 51 "github.com/posthog/posthog-go" 52) 53 54type State struct { 55 db *db.DB 56 notifier notify.Notifier 57 indexer *indexer.Indexer 58 oauth *oauth.OAuth 59 enforcer *rbac.Enforcer 60 pages *pages.Pages 61 idResolver *idresolver.Resolver 62 rdb *cache.Cache 63 mentionsResolver *mentions.Resolver 64 posthog posthog.Client 65 jc *jetstream.JetstreamClient 66 config *config.Config 67 repoResolver *reporesolver.RepoResolver 68 knotstream *eventconsumer.Consumer 69 spindlestream *eventconsumer.Consumer 70 logger *slog.Logger 71 validator *validator.Validator 72 cfClient *cloudflare.Client 73} 74 75func Make(ctx context.Context, config *config.Config) (*State, error) { 76 logger := tlog.FromContext(ctx) 77 78 d, err := db.Make(ctx, config.Core.DbPath) 79 if err != nil { 80 return nil, fmt.Errorf("failed to create db: %w", err) 81 } 82 83 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 84 err = indexer.Init(ctx) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create indexer: %w", err) 87 } 88 89 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 90 if err != nil { 91 return nil, fmt.Errorf("failed to create enforcer: %w", err) 92 } 93 94 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 95 if err != nil { 96 logger.Error("failed to create redis resolver", "err", err) 97 res = idresolver.DefaultResolver(config.Plc.PLCURL) 98 } 99 100 var rdb *cache.Cache 101 if config.Redis.Addr != "" { 102 rdb = cache.New(config.Redis.Addr) 103 } 104 105 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 106 if err != nil { 107 return nil, fmt.Errorf("failed to create posthog client: %w", err) 108 } 109 110 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 111 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 112 if err != nil { 113 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 114 } 115 validator := validator.New(d, res, enforcer) 116 117 repoResolver := reporesolver.New(config, enforcer, d, rdb) 118 119 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 120 121 wrapper := db.DbWrapper{Execer: d} 122 jc, err := jetstream.NewJetstreamClient( 123 config.Jetstream.Endpoint, 124 "appview", 125 []string{ 126 tangled.GraphFollowNSID, 127 tangled.FeedStarNSID, 128 tangled.PublicKeyNSID, 129 tangled.RepoArtifactNSID, 130 tangled.ActorProfileNSID, 131 tangled.KnotMemberNSID, 132 tangled.SpindleMemberNSID, 133 tangled.SpindleNSID, 134 tangled.KnotNSID, 135 tangled.StringNSID, 136 tangled.RepoPullNSID, 137 tangled.RepoIssueNSID, 138 tangled.RepoIssueCommentNSID, 139 tangled.LabelDefinitionNSID, 140 tangled.LabelOpNSID, 141 }, 142 nil, 143 tlog.SubLogger(logger, "jetstream"), 144 wrapper, 145 false, 146 147 // in-memory filter is inapplicable to appview so 148 // we'll never log dids anyway. 149 false, 150 ) 151 if err != nil { 152 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 153 } 154 155 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 156 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 157 } 158 159 ingester := appview.Ingester{ 160 Db: wrapper, 161 Enforcer: enforcer, 162 IdResolver: res, 163 Cache: rdb, 164 Config: config, 165 Logger: log.SubLogger(logger, "ingester"), 166 Validator: validator, 167 } 168 err = jc.StartJetstream(ctx, ingester.Ingest()) 169 if err != nil { 170 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 171 } 172 173 var notifiers []notify.Notifier 174 175 // Always add the database notifier 176 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 177 178 // Add other notifiers in production only 179 if !config.Core.Dev { 180 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 181 } 182 notifiers = append(notifiers, indexer) 183 184 notifiers = append(notifiers, whnotify.NewNotifier(d)) 185 186 notifier := notify.NewMergedNotifier(notifiers) 187 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 188 189 var cfClient *cloudflare.Client 190 if config.Cloudflare.ApiToken != "" { 191 cfClient, err = cloudflare.New(config) 192 if err != nil { 193 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 194 cfClient = nil 195 } 196 } 197 198 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 199 if err != nil { 200 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 201 } 202 knotstream.Start(ctx) 203 204 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 205 if err != nil { 206 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 207 } 208 spindlestream.Start(ctx) 209 210 state := &State{ 211 db: d, 212 notifier: notifier, 213 indexer: indexer, 214 oauth: oauth, 215 enforcer: enforcer, 216 pages: pages, 217 idResolver: res, 218 rdb: rdb, 219 mentionsResolver: mentionsResolver, 220 posthog: posthog, 221 jc: jc, 222 config: config, 223 repoResolver: repoResolver, 224 knotstream: knotstream, 225 spindlestream: spindlestream, 226 logger: logger, 227 validator: validator, 228 cfClient: cfClient, 229 } 230 231 // fetch initial bluesky posts if configured 232 go fetchBskyPosts(ctx, res, config, d, logger) 233 234 return state, nil 235} 236 237func (s *State) Close() error { 238 // other close up logic goes here 239 return s.db.Close() 240} 241 242func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 243 w.Header().Set("Content-Type", "text/plain") 244 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 245 246 securityTxt := `Contact: mailto:security@tangled.org 247Preferred-Languages: en 248Canonical: https://tangled.org/.well-known/security.txt 249Expires: 2030-01-01T21:59:00.000Z 250` 251 w.Write([]byte(securityTxt)) 252} 253 254func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 255 w.Header().Set("Content-Type", "text/plain") 256 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 257 258 robotsTxt := `# Hello, Tanglers! 259User-agent: * 260Allow: / 261Disallow: /*/*/settings 262Disallow: /settings 263Disallow: /*/*/compare 264Disallow: /*/*/fork 265 266Crawl-delay: 1 267` 268 w.Write([]byte(robotsTxt)) 269} 270 271func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 272 user := s.oauth.GetMultiAccountUser(r) 273 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 274 LoggedInUser: user, 275 }) 276} 277 278func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 279 user := s.oauth.GetMultiAccountUser(r) 280 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 281 LoggedInUser: user, 282 }) 283} 284 285func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 286 user := s.oauth.GetMultiAccountUser(r) 287 s.pages.Brand(w, pages.BrandParams{ 288 LoggedInUser: user, 289 }) 290} 291 292func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 293 user := s.oauth.GetMultiAccountUser(r) 294 if user == nil { 295 return 296 } 297 298 l := s.logger.With("handler", "UpgradeBanner") 299 l = l.With("did", user.Did) 300 301 regs, err := db.GetRegistrations( 302 s.db, 303 orm.FilterEq("did", user.Did), 304 orm.FilterEq("needs_upgrade", 1), 305 ) 306 if err != nil { 307 l.Error("non-fatal: failed to get registrations", "err", err) 308 } 309 310 spindles, err := db.GetSpindles( 311 r.Context(), 312 s.db, 313 orm.FilterEq("owner", user.Did), 314 orm.FilterEq("needs_upgrade", 1), 315 ) 316 if err != nil { 317 l.Error("non-fatal: failed to get spindles", "err", err) 318 } 319 320 if regs == nil && spindles == nil { 321 return 322 } 323 324 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 325 Registrations: regs, 326 Spindles: spindles, 327 }) 328} 329 330func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 331 // target is echoed back from the form via hx-vals so the response span's 332 // id matches the form's hx-target. Fallback keeps the handler useful if 333 // a caller forgets to send it. 334 target := strings.TrimSpace(r.FormValue("target")) 335 if target == "" { 336 target = "home" 337 } 338 339 w.Header().Set("Content-Type", "text/html") 340 341 emailAddr := strings.TrimSpace(r.FormValue("email")) 342 if !email.IsValidEmail(emailAddr) { 343 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 344 Id: target, 345 Error: "Invalid email address.", 346 }) 347 return 348 } 349 350 // For logged-in users, persist the signup locally so the widget stays 351 // hidden across devices. The DB row is the render-time source of truth; 352 // Resend still owns the mailing list itself. 353 if user := s.oauth.GetMultiAccountUser(r); user != nil { 354 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 355 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 356 } 357 } 358 359 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 360 go func() { 361 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 362 s.logger.Error("failed to add newsletter contact", "error", err) 363 } 364 }() 365 } else { 366 s.logger.Error( 367 "failed to add newsletter contact, missing resend config", 368 "isKeyPresent", s.config.Resend.ApiKey != "", 369 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 370 "emailAddr", emailAddr, 371 ) 372 } 373 374 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 375} 376 377// NewsletterDismiss records that a logged-in user has dismissed the newsletter 378// widget so it stays hidden across their devices. Anonymous callers get a 204 379// with no DB write — localStorage handles the per-browser fallback. 380func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 381 user := s.oauth.GetMultiAccountUser(r) 382 if user == nil { 383 w.WriteHeader(http.StatusNoContent) 384 return 385 } 386 387 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 388 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 389 } 390 w.WriteHeader(http.StatusNoContent) 391} 392 393func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 394 user := chi.URLParam(r, "user") 395 user = strings.TrimPrefix(user, "@") 396 397 if user == "" { 398 w.WriteHeader(http.StatusBadRequest) 399 return 400 } 401 402 id, err := s.idResolver.ResolveIdent(r.Context(), user) 403 if err != nil { 404 w.WriteHeader(http.StatusInternalServerError) 405 return 406 } 407 408 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 409 if err != nil { 410 s.logger.Error("failed to get public keys", "err", err) 411 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 412 return 413 } 414 415 if len(pubKeys) == 0 { 416 w.WriteHeader(http.StatusNoContent) 417 return 418 } 419 420 for _, k := range pubKeys { 421 key := strings.TrimRight(k.Key, "\n") 422 fmt.Fprintln(w, key) 423 } 424} 425 426func validateRepoName(name string) error { 427 // check for path traversal attempts 428 if name == "." || name == ".." || 429 strings.Contains(name, "/") || strings.Contains(name, "\\") { 430 return fmt.Errorf("Repository name contains invalid path characters") 431 } 432 433 // check for sequences that could be used for traversal when normalized 434 if strings.Contains(name, "./") || strings.Contains(name, "../") || 435 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 436 return fmt.Errorf("Repository name contains invalid path sequence") 437 } 438 439 // then continue with character validation 440 for _, char := range name { 441 if !((char >= 'a' && char <= 'z') || 442 (char >= 'A' && char <= 'Z') || 443 (char >= '0' && char <= '9') || 444 char == '-' || char == '_' || char == '.') { 445 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 446 } 447 } 448 449 // additional check to prevent multiple sequential dots 450 if strings.Contains(name, "..") { 451 return fmt.Errorf("Repository name cannot contain sequential dots") 452 } 453 454 // if all checks pass 455 return nil 456} 457 458func stripGitExt(name string) string { 459 return strings.TrimSuffix(name, ".git") 460} 461 462func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 463 switch r.Method { 464 case http.MethodGet: 465 user := s.oauth.GetMultiAccountUser(r) 466 knots, err := s.enforcer.GetKnotsForUser(user.Did) 467 if err != nil { 468 s.pages.Notice(w, "repo", "Invalid user account.") 469 return 470 } 471 472 s.pages.NewRepo(w, pages.NewRepoParams{ 473 LoggedInUser: user, 474 Knots: knots, 475 }) 476 477 case http.MethodPost: 478 l := s.logger.With("handler", "NewRepo") 479 480 user := s.oauth.GetMultiAccountUser(r) 481 l = l.With("did", user.Did) 482 483 // form validation 484 domain := r.FormValue("domain") 485 if domain == "" { 486 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 487 return 488 } 489 l = l.With("knot", domain) 490 491 repoName := r.FormValue("name") 492 if repoName == "" { 493 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 494 return 495 } 496 497 if err := validateRepoName(repoName); err != nil { 498 s.pages.Notice(w, "repo", err.Error()) 499 return 500 } 501 repoName = stripGitExt(repoName) 502 l = l.With("repoName", repoName) 503 504 defaultBranch := r.FormValue("branch") 505 if defaultBranch == "" { 506 defaultBranch = "main" 507 } 508 l = l.With("defaultBranch", defaultBranch) 509 510 description := r.FormValue("description") 511 if len([]rune(description)) > 140 { 512 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 513 return 514 } 515 516 // ACL validation 517 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 518 if err != nil || !ok { 519 l.Info("unauthorized") 520 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 521 return 522 } 523 524 // Check for existing repos 525 existingRepo, err := db.GetRepo( 526 s.db, 527 orm.FilterEq("did", user.Did), 528 orm.FilterEq("name", repoName), 529 ) 530 if err == nil && existingRepo != nil { 531 l.Info("repo exists") 532 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 533 return 534 } 535 536 rkey := tid.TID() 537 538 client, err := s.oauth.ServiceClient( 539 r, 540 oauth.WithService(domain), 541 oauth.WithLxm(tangled.RepoCreateNSID), 542 oauth.WithDev(s.config.Core.Dev), 543 ) 544 if err != nil { 545 l.Error("service auth failed", "err", err) 546 s.pages.Notice(w, "repo", "Failed to reach knot server.") 547 return 548 } 549 550 input := &tangled.RepoCreate_Input{ 551 Rkey: rkey, 552 Name: repoName, 553 DefaultBranch: &defaultBranch, 554 } 555 createResp, err := tangled.RepoCreate( 556 r.Context(), 557 client, 558 input, 559 ) 560 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 561 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 562 s.pages.Notice(w, "repo", err.Error()) 563 return 564 } 565 566 var repoDid string 567 if createResp != nil && createResp.RepoDid != nil { 568 repoDid = *createResp.RepoDid 569 } 570 if repoDid == "" { 571 l.Error("knot returned empty repo DID") 572 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 573 return 574 } 575 576 repo := &models.Repo{ 577 Did: user.Did, 578 Name: repoName, 579 Knot: domain, 580 Rkey: rkey, 581 Description: description, 582 Created: time.Now(), 583 Labels: s.config.Label.DefaultLabelDefs, 584 RepoDid: repoDid, 585 } 586 record := repo.AsRecord() 587 588 cleanupKnot := func() { 589 go func() { 590 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 591 for attempt, delay := range delays { 592 time.Sleep(delay) 593 deleteClient, dErr := s.oauth.ServiceClient( 594 r, 595 oauth.WithService(domain), 596 oauth.WithLxm(tangled.RepoDeleteNSID), 597 oauth.WithDev(s.config.Core.Dev), 598 ) 599 if dErr != nil { 600 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 601 continue 602 } 603 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 604 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 605 Did: user.Did, 606 Name: repoName, 607 Rkey: rkey, 608 }); dErr != nil { 609 cancel() 610 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 611 continue 612 } 613 cancel() 614 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 615 return 616 } 617 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 618 "did", user.Did, "repo", repoName, "knot", domain) 619 }() 620 } 621 622 atpClient, err := s.oauth.AuthorizedClient(r) 623 if err != nil { 624 l.Info("PDS write failed", "err", err) 625 cleanupKnot() 626 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 627 return 628 } 629 630 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 631 Collection: tangled.RepoNSID, 632 Repo: user.Did, 633 Rkey: rkey, 634 Record: &lexutil.LexiconTypeDecoder{ 635 Val: &record, 636 }, 637 }) 638 if err != nil { 639 l.Info("PDS write failed", "err", err) 640 cleanupKnot() 641 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 642 return 643 } 644 645 aturi := atresp.Uri 646 l = l.With("aturi", aturi) 647 l.Info("wrote to PDS") 648 649 tx, err := s.db.BeginTx(r.Context(), nil) 650 if err != nil { 651 l.Info("txn failed", "err", err) 652 s.pages.Notice(w, "repo", "Failed to save repository information.") 653 return 654 } 655 656 rollback := func() { 657 err1 := tx.Rollback() 658 err2 := s.enforcer.E.LoadPolicy() 659 err3 := rollbackRecord(context.Background(), aturi, atpClient) 660 661 if errors.Is(err1, sql.ErrTxDone) { 662 err1 = nil 663 } 664 665 if errs := errors.Join(err1, err2, err3); errs != nil { 666 l.Error("failed to rollback changes", "errs", errs) 667 } 668 669 if aturi != "" { 670 cleanupKnot() 671 } 672 } 673 defer rollback() 674 675 err = db.AddRepo(tx, repo) 676 if err != nil { 677 l.Error("db write failed", "err", err) 678 s.pages.Notice(w, "repo", "Failed to save repository information.") 679 return 680 } 681 682 rbacPath := repo.RepoIdentifier() 683 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 684 if err != nil { 685 l.Error("acl setup failed", "err", err) 686 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 687 return 688 } 689 690 err = tx.Commit() 691 if err != nil { 692 l.Error("txn commit failed", "err", err) 693 http.Error(w, err.Error(), http.StatusInternalServerError) 694 return 695 } 696 697 err = s.enforcer.E.SavePolicy() 698 if err != nil { 699 l.Error("acl save failed", "err", err) 700 http.Error(w, err.Error(), http.StatusInternalServerError) 701 return 702 } 703 704 aturi = "" 705 706 s.notifier.NewRepo(r.Context(), repo) 707 switch { 708 case repoDid != "": 709 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 710 default: 711 handle := s.pages.DisplayHandle(r.Context(), user.Did) 712 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, repoName)) 713 } 714 } 715} 716 717// this is used to rollback changes made to the PDS 718// 719// it is a no-op if the provided ATURI is empty 720func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 721 if aturi == "" { 722 return nil 723 } 724 725 parsed := syntax.ATURI(aturi) 726 727 collection := parsed.Collection().String() 728 repo := parsed.Authority().String() 729 rkey := parsed.RecordKey().String() 730 731 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 732 Collection: collection, 733 Repo: repo, 734 Rkey: rkey, 735 }) 736 return err 737} 738 739func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 740 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 741 if err != nil { 742 return err 743 } 744 // already present 745 if len(defaultLabels) == len(defaults) { 746 return nil 747 } 748 749 labelDefs, err := models.FetchLabelDefs(r, defaults) 750 if err != nil { 751 return err 752 } 753 754 // Insert each label definition to the database 755 for _, labelDef := range labelDefs { 756 _, err = db.AddLabelDefinition(e, &labelDef) 757 if err != nil { 758 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 759 } 760 } 761 762 return nil 763} 764 765func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 766 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 767 if err != nil { 768 logger.Error("failed to resolve tangled.org DID", "err", err) 769 return 770 } 771 772 pdsEndpoint := resolved.PDSEndpoint() 773 if pdsEndpoint == "" { 774 logger.Error("no PDS endpoint found for tangled.sh DID") 775 return 776 } 777 778 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 779 if err != nil { 780 logger.Error("failed to create appassword session... skipping fetch", "err", err) 781 return 782 } 783 784 l := log.SubLogger(logger, "bluesky") 785 786 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 787 defer ticker.Stop() 788 789 for { 790 // refresh session if necessary 791 if !session.IsValid() { 792 l.Debug("access token expired, refreshing session") 793 if err := session.RefreshSession(); err != nil { 794 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 795 return 796 } 797 l.Debug("session refreshed") 798 } 799 800 // make client 801 client := xrpc.Client{ 802 Auth: &xrpc.AuthInfo{ 803 AccessJwt: session.AccessJwt, 804 Did: session.Did, 805 }, 806 Host: session.PdsEndpoint, 807 } 808 809 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 810 if err != nil { 811 l.Error("failed to fetch bluesky posts", "err", err) 812 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 813 l.Error("failed to insert bluesky posts", "err", err) 814 } else { 815 l.Info("inserted bluesky posts", "count", len(posts)) 816 } 817 818 select { 819 case <-ticker.C: 820 case <-ctx.Done(): 821 l.Info("stopping bluesky updater") 822 return 823 } 824 } 825}