Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 826 lines 23 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cloudflare" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/email" 21 "tangled.org/core/appview/indexer" 22 "tangled.org/core/appview/mentions" 23 "tangled.org/core/appview/models" 24 "tangled.org/core/appview/notify" 25 dbnotify "tangled.org/core/appview/notify/db" 26 lognotify "tangled.org/core/appview/notify/logging" 27 phnotify "tangled.org/core/appview/notify/posthog" 28 whnotify "tangled.org/core/appview/notify/webhook" 29 "tangled.org/core/appview/oauth" 30 "tangled.org/core/appview/pages" 31 "tangled.org/core/appview/reporesolver" 32 "tangled.org/core/appview/validator" 33 xrpcclient "tangled.org/core/appview/xrpcclient" 34 "tangled.org/core/consts" 35 "tangled.org/core/eventconsumer" 36 "tangled.org/core/idresolver" 37 "tangled.org/core/jetstream" 38 "tangled.org/core/log" 39 tlog "tangled.org/core/log" 40 "tangled.org/core/orm" 41 "tangled.org/core/rbac" 42 "tangled.org/core/tid" 43 44 comatproto "github.com/bluesky-social/indigo/api/atproto" 45 "github.com/bluesky-social/indigo/atproto/atclient" 46 "github.com/bluesky-social/indigo/atproto/syntax" 47 lexutil "github.com/bluesky-social/indigo/lex/util" 48 "github.com/bluesky-social/indigo/xrpc" 49 50 "github.com/go-chi/chi/v5" 51 "github.com/posthog/posthog-go" 52) 53 54type State struct { 55 db *db.DB 56 notifier notify.Notifier 57 indexer *indexer.Indexer 58 oauth *oauth.OAuth 59 enforcer *rbac.Enforcer 60 pages *pages.Pages 61 idResolver *idresolver.Resolver 62 rdb *cache.Cache 63 mentionsResolver *mentions.Resolver 64 posthog posthog.Client 65 jc *jetstream.JetstreamClient 66 config *config.Config 67 repoResolver *reporesolver.RepoResolver 68 knotstream *eventconsumer.Consumer 69 spindlestream *eventconsumer.Consumer 70 logger *slog.Logger 71 validator *validator.Validator 72 cfClient *cloudflare.Client 73} 74 75func Make(ctx context.Context, config *config.Config) (*State, error) { 76 logger := tlog.FromContext(ctx) 77 78 d, err := db.Make(ctx, config.Core.DbPath) 79 if err != nil { 80 return nil, fmt.Errorf("failed to create db: %w", err) 81 } 82 83 indexer := indexer.New(log.SubLogger(logger, "indexer"), d) 84 err = indexer.Init(ctx) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create indexer: %w", err) 87 } 88 89 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 90 if err != nil { 91 return nil, fmt.Errorf("failed to create enforcer: %w", err) 92 } 93 94 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 95 if err != nil { 96 logger.Error("failed to create redis resolver", "err", err) 97 res = idresolver.DefaultResolver(config.Plc.PLCURL) 98 } 99 100 var rdb *cache.Cache 101 if config.Redis.Addr != "" { 102 rdb = cache.New(config.Redis.Addr) 103 } 104 105 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 106 if err != nil { 107 return nil, fmt.Errorf("failed to create posthog client: %w", err) 108 } 109 110 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages")) 111 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 112 if err != nil { 113 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 114 } 115 validator := validator.New(d, res, enforcer) 116 117 repoResolver := reporesolver.New(config, enforcer, d, rdb) 118 119 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 120 121 wrapper := db.DbWrapper{Execer: d} 122 jc, err := jetstream.NewJetstreamClient( 123 config.Jetstream.Endpoint, 124 "appview", 125 []string{ 126 tangled.ActorProfileNSID, 127 tangled.FeedStarNSID, 128 tangled.GraphFollowNSID, 129 tangled.GraphVouchNSID, 130 tangled.KnotMemberNSID, 131 tangled.KnotNSID, 132 tangled.LabelDefinitionNSID, 133 tangled.LabelOpNSID, 134 tangled.PublicKeyNSID, 135 tangled.RepoArtifactNSID, 136 tangled.RepoIssueCommentNSID, 137 tangled.RepoIssueNSID, 138 tangled.RepoPullNSID, 139 tangled.SpindleMemberNSID, 140 tangled.SpindleNSID, 141 tangled.StringNSID, 142 }, 143 nil, 144 tlog.SubLogger(logger, "jetstream"), 145 wrapper, 146 false, 147 148 // in-memory filter is inapplicable to appview so 149 // we'll never log dids anyway. 150 false, 151 ) 152 if err != nil { 153 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 154 } 155 156 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 157 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 158 } 159 160 ingester := appview.Ingester{ 161 Db: wrapper, 162 Enforcer: enforcer, 163 IdResolver: res, 164 Cache: rdb, 165 Config: config, 166 Logger: log.SubLogger(logger, "ingester"), 167 Validator: validator, 168 } 169 err = jc.StartJetstream(ctx, ingester.Ingest()) 170 if err != nil { 171 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 172 } 173 174 var notifiers []notify.Notifier 175 176 // Always add the database notifier 177 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 178 179 // Add other notifiers in production only 180 if !config.Core.Dev { 181 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 182 } 183 notifiers = append(notifiers, indexer) 184 185 notifiers = append(notifiers, whnotify.NewNotifier(d)) 186 187 notifier := notify.NewMergedNotifier(notifiers) 188 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 189 190 var cfClient *cloudflare.Client 191 if config.Cloudflare.ApiToken != "" { 192 cfClient, err = cloudflare.New(config) 193 if err != nil { 194 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 195 cfClient = nil 196 } 197 } 198 199 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 200 if err != nil { 201 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 202 } 203 knotstream.Start(ctx) 204 205 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 206 if err != nil { 207 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 208 } 209 spindlestream.Start(ctx) 210 211 state := &State{ 212 db: d, 213 notifier: notifier, 214 indexer: indexer, 215 oauth: oauth, 216 enforcer: enforcer, 217 pages: pages, 218 idResolver: res, 219 rdb: rdb, 220 mentionsResolver: mentionsResolver, 221 posthog: posthog, 222 jc: jc, 223 config: config, 224 repoResolver: repoResolver, 225 knotstream: knotstream, 226 spindlestream: spindlestream, 227 logger: logger, 228 validator: validator, 229 cfClient: cfClient, 230 } 231 232 // fetch initial bluesky posts if configured 233 go fetchBskyPosts(ctx, res, config, d, logger) 234 235 return state, nil 236} 237 238func (s *State) Close() error { 239 // other close up logic goes here 240 return s.db.Close() 241} 242 243func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 244 w.Header().Set("Content-Type", "text/plain") 245 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 246 247 securityTxt := `Contact: mailto:security@tangled.org 248Preferred-Languages: en 249Canonical: https://tangled.org/.well-known/security.txt 250Expires: 2030-01-01T21:59:00.000Z 251` 252 w.Write([]byte(securityTxt)) 253} 254 255func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 256 w.Header().Set("Content-Type", "text/plain") 257 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 258 259 robotsTxt := `# Hello, Tanglers! 260User-agent: * 261Allow: / 262Disallow: /*/*/settings 263Disallow: /settings 264Disallow: /*/*/compare 265Disallow: /*/*/fork 266 267Crawl-delay: 1 268` 269 w.Write([]byte(robotsTxt)) 270} 271 272func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 273 user := s.oauth.GetMultiAccountUser(r) 274 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 275 LoggedInUser: user, 276 }) 277} 278 279func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 280 user := s.oauth.GetMultiAccountUser(r) 281 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 282 LoggedInUser: user, 283 }) 284} 285 286func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 287 user := s.oauth.GetMultiAccountUser(r) 288 s.pages.Brand(w, pages.BrandParams{ 289 LoggedInUser: user, 290 }) 291} 292 293func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 294 user := s.oauth.GetMultiAccountUser(r) 295 if user == nil { 296 return 297 } 298 299 l := s.logger.With("handler", "UpgradeBanner") 300 l = l.With("did", user.Did) 301 302 regs, err := db.GetRegistrations( 303 s.db, 304 orm.FilterEq("did", user.Did), 305 orm.FilterEq("needs_upgrade", 1), 306 ) 307 if err != nil { 308 l.Error("non-fatal: failed to get registrations", "err", err) 309 } 310 311 spindles, err := db.GetSpindles( 312 r.Context(), 313 s.db, 314 orm.FilterEq("owner", user.Did), 315 orm.FilterEq("needs_upgrade", 1), 316 ) 317 if err != nil { 318 l.Error("non-fatal: failed to get spindles", "err", err) 319 } 320 321 if regs == nil && spindles == nil { 322 return 323 } 324 325 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 326 Registrations: regs, 327 Spindles: spindles, 328 }) 329} 330 331func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) { 332 // target is echoed back from the form via hx-vals so the response span's 333 // id matches the form's hx-target. Fallback keeps the handler useful if 334 // a caller forgets to send it. 335 target := strings.TrimSpace(r.FormValue("target")) 336 if target == "" { 337 target = "home" 338 } 339 340 w.Header().Set("Content-Type", "text/html") 341 342 emailAddr := strings.TrimSpace(r.FormValue("email")) 343 if !email.IsValidEmail(emailAddr) { 344 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{ 345 Id: target, 346 Error: "Invalid email address.", 347 }) 348 return 349 } 350 351 // For logged-in users, persist the signup locally so the widget stays 352 // hidden across devices. The DB row is the render-time source of truth; 353 // Resend still owns the mailing list itself. 354 if user := s.oauth.GetMultiAccountUser(r); user != nil { 355 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil { 356 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err) 357 } 358 } 359 360 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" { 361 go func() { 362 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil { 363 s.logger.Error("failed to add newsletter contact", "error", err) 364 } 365 }() 366 } else { 367 s.logger.Error( 368 "failed to add newsletter contact, missing resend config", 369 "isKeyPresent", s.config.Resend.ApiKey != "", 370 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "", 371 "emailAddr", emailAddr, 372 ) 373 } 374 375 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target}) 376} 377 378// NewsletterDismiss records that a logged-in user has dismissed the newsletter 379// widget so it stays hidden across their devices. Anonymous callers get a 204 380// with no DB write — localStorage handles the per-browser fallback. 381func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) { 382 user := s.oauth.GetMultiAccountUser(r) 383 if user == nil { 384 w.WriteHeader(http.StatusNoContent) 385 return 386 } 387 388 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil { 389 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err) 390 } 391 w.WriteHeader(http.StatusNoContent) 392} 393 394func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 395 user := chi.URLParam(r, "user") 396 user = strings.TrimPrefix(user, "@") 397 398 if user == "" { 399 w.WriteHeader(http.StatusBadRequest) 400 return 401 } 402 403 id, err := s.idResolver.ResolveIdent(r.Context(), user) 404 if err != nil { 405 w.WriteHeader(http.StatusInternalServerError) 406 return 407 } 408 409 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 410 if err != nil { 411 s.logger.Error("failed to get public keys", "err", err) 412 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 413 return 414 } 415 416 if len(pubKeys) == 0 { 417 w.WriteHeader(http.StatusNoContent) 418 return 419 } 420 421 for _, k := range pubKeys { 422 key := strings.TrimRight(k.Key, "\n") 423 fmt.Fprintln(w, key) 424 } 425} 426 427func validateRepoName(name string) error { 428 // check for path traversal attempts 429 if name == "." || name == ".." || 430 strings.Contains(name, "/") || strings.Contains(name, "\\") { 431 return fmt.Errorf("Repository name contains invalid path characters") 432 } 433 434 // check for sequences that could be used for traversal when normalized 435 if strings.Contains(name, "./") || strings.Contains(name, "../") || 436 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 437 return fmt.Errorf("Repository name contains invalid path sequence") 438 } 439 440 // then continue with character validation 441 for _, char := range name { 442 if !((char >= 'a' && char <= 'z') || 443 (char >= 'A' && char <= 'Z') || 444 (char >= '0' && char <= '9') || 445 char == '-' || char == '_' || char == '.') { 446 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 447 } 448 } 449 450 // additional check to prevent multiple sequential dots 451 if strings.Contains(name, "..") { 452 return fmt.Errorf("Repository name cannot contain sequential dots") 453 } 454 455 // if all checks pass 456 return nil 457} 458 459func stripGitExt(name string) string { 460 return strings.TrimSuffix(name, ".git") 461} 462 463func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 464 switch r.Method { 465 case http.MethodGet: 466 user := s.oauth.GetMultiAccountUser(r) 467 knots, err := s.enforcer.GetKnotsForUser(user.Did) 468 if err != nil { 469 s.pages.Notice(w, "repo", "Invalid user account.") 470 return 471 } 472 473 s.pages.NewRepo(w, pages.NewRepoParams{ 474 LoggedInUser: user, 475 Knots: knots, 476 }) 477 478 case http.MethodPost: 479 l := s.logger.With("handler", "NewRepo") 480 481 user := s.oauth.GetMultiAccountUser(r) 482 l = l.With("did", user.Did) 483 484 // form validation 485 domain := r.FormValue("domain") 486 if domain == "" { 487 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 488 return 489 } 490 l = l.With("knot", domain) 491 492 repoName := r.FormValue("name") 493 if repoName == "" { 494 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 495 return 496 } 497 498 if err := validateRepoName(repoName); err != nil { 499 s.pages.Notice(w, "repo", err.Error()) 500 return 501 } 502 repoName = stripGitExt(repoName) 503 l = l.With("repoName", repoName) 504 505 defaultBranch := r.FormValue("branch") 506 if defaultBranch == "" { 507 defaultBranch = "main" 508 } 509 l = l.With("defaultBranch", defaultBranch) 510 511 description := r.FormValue("description") 512 if len([]rune(description)) > 140 { 513 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 514 return 515 } 516 517 // ACL validation 518 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 519 if err != nil || !ok { 520 l.Info("unauthorized") 521 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 522 return 523 } 524 525 // Check for existing repos 526 existingRepo, err := db.GetRepo( 527 s.db, 528 orm.FilterEq("did", user.Did), 529 orm.FilterEq("name", repoName), 530 ) 531 if err == nil && existingRepo != nil { 532 l.Info("repo exists") 533 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 534 return 535 } 536 537 rkey := tid.TID() 538 539 client, err := s.oauth.ServiceClient( 540 r, 541 oauth.WithService(domain), 542 oauth.WithLxm(tangled.RepoCreateNSID), 543 oauth.WithDev(s.config.Core.Dev), 544 ) 545 if err != nil { 546 l.Error("service auth failed", "err", err) 547 s.pages.Notice(w, "repo", "Failed to reach knot server.") 548 return 549 } 550 551 input := &tangled.RepoCreate_Input{ 552 Rkey: rkey, 553 Name: repoName, 554 DefaultBranch: &defaultBranch, 555 } 556 createResp, err := tangled.RepoCreate( 557 r.Context(), 558 client, 559 input, 560 ) 561 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 562 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err) 563 s.pages.Notice(w, "repo", err.Error()) 564 return 565 } 566 567 var repoDid string 568 if createResp != nil && createResp.RepoDid != nil { 569 repoDid = *createResp.RepoDid 570 } 571 if repoDid == "" { 572 l.Error("knot returned empty repo DID") 573 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 574 return 575 } 576 577 repo := &models.Repo{ 578 Did: user.Did, 579 Name: repoName, 580 Knot: domain, 581 Rkey: rkey, 582 Description: description, 583 Created: time.Now(), 584 Labels: s.config.Label.DefaultLabelDefs, 585 RepoDid: repoDid, 586 } 587 record := repo.AsRecord() 588 589 cleanupKnot := func() { 590 go func() { 591 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 592 for attempt, delay := range delays { 593 time.Sleep(delay) 594 deleteClient, dErr := s.oauth.ServiceClient( 595 r, 596 oauth.WithService(domain), 597 oauth.WithLxm(tangled.RepoDeleteNSID), 598 oauth.WithDev(s.config.Core.Dev), 599 ) 600 if dErr != nil { 601 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 602 continue 603 } 604 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 605 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 606 Did: user.Did, 607 Name: repoName, 608 Rkey: rkey, 609 }); dErr != nil { 610 cancel() 611 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 612 continue 613 } 614 cancel() 615 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 616 return 617 } 618 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 619 "did", user.Did, "repo", repoName, "knot", domain) 620 }() 621 } 622 623 atpClient, err := s.oauth.AuthorizedClient(r) 624 if err != nil { 625 l.Info("PDS write failed", "err", err) 626 cleanupKnot() 627 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 628 return 629 } 630 631 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 632 Collection: tangled.RepoNSID, 633 Repo: user.Did, 634 Rkey: rkey, 635 Record: &lexutil.LexiconTypeDecoder{ 636 Val: &record, 637 }, 638 }) 639 if err != nil { 640 l.Info("PDS write failed", "err", err) 641 cleanupKnot() 642 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 643 return 644 } 645 646 aturi := atresp.Uri 647 l = l.With("aturi", aturi) 648 l.Info("wrote to PDS") 649 650 tx, err := s.db.BeginTx(r.Context(), nil) 651 if err != nil { 652 l.Info("txn failed", "err", err) 653 s.pages.Notice(w, "repo", "Failed to save repository information.") 654 return 655 } 656 657 rollback := func() { 658 err1 := tx.Rollback() 659 err2 := s.enforcer.E.LoadPolicy() 660 err3 := rollbackRecord(context.Background(), aturi, atpClient) 661 662 if errors.Is(err1, sql.ErrTxDone) { 663 err1 = nil 664 } 665 666 if errs := errors.Join(err1, err2, err3); errs != nil { 667 l.Error("failed to rollback changes", "errs", errs) 668 } 669 670 if aturi != "" { 671 cleanupKnot() 672 } 673 } 674 defer rollback() 675 676 err = db.AddRepo(tx, repo) 677 if err != nil { 678 l.Error("db write failed", "err", err) 679 s.pages.Notice(w, "repo", "Failed to save repository information.") 680 return 681 } 682 683 rbacPath := repo.RepoIdentifier() 684 err = s.enforcer.AddRepo(user.Did, domain, rbacPath) 685 if err != nil { 686 l.Error("acl setup failed", "err", err) 687 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 688 return 689 } 690 691 err = tx.Commit() 692 if err != nil { 693 l.Error("txn commit failed", "err", err) 694 http.Error(w, err.Error(), http.StatusInternalServerError) 695 return 696 } 697 698 err = s.enforcer.E.SavePolicy() 699 if err != nil { 700 l.Error("acl save failed", "err", err) 701 http.Error(w, err.Error(), http.StatusInternalServerError) 702 return 703 } 704 705 aturi = "" 706 707 s.notifier.NewRepo(r.Context(), repo) 708 switch { 709 case repoDid != "": 710 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 711 default: 712 handle := s.pages.DisplayHandle(r.Context(), user.Did) 713 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, repoName)) 714 } 715 } 716} 717 718// this is used to rollback changes made to the PDS 719// 720// it is a no-op if the provided ATURI is empty 721func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 722 if aturi == "" { 723 return nil 724 } 725 726 parsed := syntax.ATURI(aturi) 727 728 collection := parsed.Collection().String() 729 repo := parsed.Authority().String() 730 rkey := parsed.RecordKey().String() 731 732 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 733 Collection: collection, 734 Repo: repo, 735 Rkey: rkey, 736 }) 737 return err 738} 739 740func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 741 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 742 if err != nil { 743 return err 744 } 745 // already present 746 if len(defaultLabels) == len(defaults) { 747 return nil 748 } 749 750 labelDefs, err := models.FetchLabelDefs(r, defaults) 751 if err != nil { 752 return err 753 } 754 755 // Insert each label definition to the database 756 for _, labelDef := range labelDefs { 757 _, err = db.AddLabelDefinition(e, &labelDef) 758 if err != nil { 759 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 760 } 761 } 762 763 return nil 764} 765 766func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 767 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 768 if err != nil { 769 logger.Error("failed to resolve tangled.org DID", "err", err) 770 return 771 } 772 773 pdsEndpoint := resolved.PDSEndpoint() 774 if pdsEndpoint == "" { 775 logger.Error("no PDS endpoint found for tangled.sh DID") 776 return 777 } 778 779 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 780 if err != nil { 781 logger.Error("failed to create appassword session... skipping fetch", "err", err) 782 return 783 } 784 785 l := log.SubLogger(logger, "bluesky") 786 787 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 788 defer ticker.Stop() 789 790 for { 791 // refresh session if necessary 792 if !session.IsValid() { 793 l.Debug("access token expired, refreshing session") 794 if err := session.RefreshSession(); err != nil { 795 l.Error("failed to refresh session, stopping bluesky updater", "err", err) 796 return 797 } 798 l.Debug("session refreshed") 799 } 800 801 // make client 802 client := xrpc.Client{ 803 Auth: &xrpc.AuthInfo{ 804 AccessJwt: session.AccessJwt, 805 Did: session.Did, 806 }, 807 Host: session.PdsEndpoint, 808 } 809 810 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 811 if err != nil { 812 l.Error("failed to fetch bluesky posts", "err", err) 813 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 814 l.Error("failed to insert bluesky posts", "err", err) 815 } else { 816 l.Info("inserted bluesky posts", "count", len(posts)) 817 } 818 819 select { 820 case <-ticker.C: 821 case <-ctx.Done(): 822 l.Info("stopping bluesky updater") 823 return 824 } 825 } 826}