forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/mentions"
23 "tangled.org/core/appview/models"
24 "tangled.org/core/appview/notify"
25 dbnotify "tangled.org/core/appview/notify/db"
26 lognotify "tangled.org/core/appview/notify/logging"
27 phnotify "tangled.org/core/appview/notify/posthog"
28 whnotify "tangled.org/core/appview/notify/webhook"
29 "tangled.org/core/appview/oauth"
30 "tangled.org/core/appview/pages"
31 "tangled.org/core/appview/reporesolver"
32 "tangled.org/core/appview/validator"
33 xrpcclient "tangled.org/core/appview/xrpcclient"
34 "tangled.org/core/consts"
35 "tangled.org/core/eventconsumer"
36 "tangled.org/core/idresolver"
37 "tangled.org/core/jetstream"
38 "tangled.org/core/log"
39 tlog "tangled.org/core/log"
40 "tangled.org/core/orm"
41 "tangled.org/core/rbac"
42 "tangled.org/core/tid"
43
44 comatproto "github.com/bluesky-social/indigo/api/atproto"
45 "github.com/bluesky-social/indigo/atproto/atclient"
46 "github.com/bluesky-social/indigo/atproto/syntax"
47 lexutil "github.com/bluesky-social/indigo/lex/util"
48 "github.com/bluesky-social/indigo/xrpc"
49
50 "github.com/go-chi/chi/v5"
51 "github.com/posthog/posthog-go"
52)
53
54type State struct {
55 db *db.DB
56 notifier notify.Notifier
57 indexer *indexer.Indexer
58 oauth *oauth.OAuth
59 enforcer *rbac.Enforcer
60 pages *pages.Pages
61 idResolver *idresolver.Resolver
62 rdb *cache.Cache
63 mentionsResolver *mentions.Resolver
64 posthog posthog.Client
65 jc *jetstream.JetstreamClient
66 config *config.Config
67 repoResolver *reporesolver.RepoResolver
68 knotstream *eventconsumer.Consumer
69 spindlestream *eventconsumer.Consumer
70 logger *slog.Logger
71 validator *validator.Validator
72 cfClient *cloudflare.Client
73}
74
75func Make(ctx context.Context, config *config.Config) (*State, error) {
76 logger := tlog.FromContext(ctx)
77
78 d, err := db.Make(ctx, config.Core.DbPath)
79 if err != nil {
80 return nil, fmt.Errorf("failed to create db: %w", err)
81 }
82
83 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
84 err = indexer.Init(ctx)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create indexer: %w", err)
87 }
88
89 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
90 if err != nil {
91 return nil, fmt.Errorf("failed to create enforcer: %w", err)
92 }
93
94 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
95 if err != nil {
96 logger.Error("failed to create redis resolver", "err", err)
97 res = idresolver.DefaultResolver(config.Plc.PLCURL)
98 }
99
100 var rdb *cache.Cache
101 if config.Redis.Addr != "" {
102 rdb = cache.New(config.Redis.Addr)
103 }
104
105 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
106 if err != nil {
107 return nil, fmt.Errorf("failed to create posthog client: %w", err)
108 }
109
110 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
111 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
112 if err != nil {
113 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
114 }
115 validator := validator.New(d, res, enforcer)
116
117 repoResolver := reporesolver.New(config, enforcer, d, rdb)
118
119 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
120
121 wrapper := db.DbWrapper{Execer: d}
122 jc, err := jetstream.NewJetstreamClient(
123 config.Jetstream.Endpoint,
124 "appview",
125 []string{
126 tangled.ActorProfileNSID,
127 tangled.FeedStarNSID,
128 tangled.GraphFollowNSID,
129 tangled.GraphVouchNSID,
130 tangled.KnotMemberNSID,
131 tangled.KnotNSID,
132 tangled.LabelDefinitionNSID,
133 tangled.LabelOpNSID,
134 tangled.PublicKeyNSID,
135 tangled.RepoArtifactNSID,
136 tangled.RepoIssueCommentNSID,
137 tangled.RepoIssueNSID,
138 tangled.RepoPullNSID,
139 tangled.SpindleMemberNSID,
140 tangled.SpindleNSID,
141 tangled.StringNSID,
142 },
143 nil,
144 tlog.SubLogger(logger, "jetstream"),
145 wrapper,
146 false,
147
148 // in-memory filter is inapplicable to appview so
149 // we'll never log dids anyway.
150 false,
151 )
152 if err != nil {
153 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
154 }
155
156 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
157 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
158 }
159
160 ingester := appview.Ingester{
161 Db: wrapper,
162 Enforcer: enforcer,
163 IdResolver: res,
164 Cache: rdb,
165 Config: config,
166 Logger: log.SubLogger(logger, "ingester"),
167 Validator: validator,
168 }
169 err = jc.StartJetstream(ctx, ingester.Ingest())
170 if err != nil {
171 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
172 }
173
174 var notifiers []notify.Notifier
175
176 // Always add the database notifier
177 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
178
179 // Add other notifiers in production only
180 if !config.Core.Dev {
181 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
182 }
183 notifiers = append(notifiers, indexer)
184
185 notifiers = append(notifiers, whnotify.NewNotifier(d))
186
187 notifier := notify.NewMergedNotifier(notifiers)
188 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
189
190 var cfClient *cloudflare.Client
191 if config.Cloudflare.ApiToken != "" {
192 cfClient, err = cloudflare.New(config)
193 if err != nil {
194 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
195 cfClient = nil
196 }
197 }
198
199 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
200 if err != nil {
201 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
202 }
203 knotstream.Start(ctx)
204
205 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
206 if err != nil {
207 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
208 }
209 spindlestream.Start(ctx)
210
211 state := &State{
212 db: d,
213 notifier: notifier,
214 indexer: indexer,
215 oauth: oauth,
216 enforcer: enforcer,
217 pages: pages,
218 idResolver: res,
219 rdb: rdb,
220 mentionsResolver: mentionsResolver,
221 posthog: posthog,
222 jc: jc,
223 config: config,
224 repoResolver: repoResolver,
225 knotstream: knotstream,
226 spindlestream: spindlestream,
227 logger: logger,
228 validator: validator,
229 cfClient: cfClient,
230 }
231
232 // fetch initial bluesky posts if configured
233 go fetchBskyPosts(ctx, res, config, d, logger)
234
235 return state, nil
236}
237
238func (s *State) Close() error {
239 // other close up logic goes here
240 return s.db.Close()
241}
242
243func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
244 w.Header().Set("Content-Type", "text/plain")
245 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
246
247 securityTxt := `Contact: mailto:security@tangled.org
248Preferred-Languages: en
249Canonical: https://tangled.org/.well-known/security.txt
250Expires: 2030-01-01T21:59:00.000Z
251`
252 w.Write([]byte(securityTxt))
253}
254
255func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
256 w.Header().Set("Content-Type", "text/plain")
257 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
258
259 robotsTxt := `# Hello, Tanglers!
260User-agent: *
261Allow: /
262Disallow: /*/*/settings
263Disallow: /settings
264Disallow: /*/*/compare
265Disallow: /*/*/fork
266
267Crawl-delay: 1
268`
269 w.Write([]byte(robotsTxt))
270}
271
272func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
273 user := s.oauth.GetMultiAccountUser(r)
274 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
275 LoggedInUser: user,
276 })
277}
278
279func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
280 user := s.oauth.GetMultiAccountUser(r)
281 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
282 LoggedInUser: user,
283 })
284}
285
286func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
287 user := s.oauth.GetMultiAccountUser(r)
288 s.pages.Brand(w, pages.BrandParams{
289 LoggedInUser: user,
290 })
291}
292
293func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
294 user := s.oauth.GetMultiAccountUser(r)
295 if user == nil {
296 return
297 }
298
299 l := s.logger.With("handler", "UpgradeBanner")
300 l = l.With("did", user.Did)
301
302 regs, err := db.GetRegistrations(
303 s.db,
304 orm.FilterEq("did", user.Did),
305 orm.FilterEq("needs_upgrade", 1),
306 )
307 if err != nil {
308 l.Error("non-fatal: failed to get registrations", "err", err)
309 }
310
311 spindles, err := db.GetSpindles(
312 r.Context(),
313 s.db,
314 orm.FilterEq("owner", user.Did),
315 orm.FilterEq("needs_upgrade", 1),
316 )
317 if err != nil {
318 l.Error("non-fatal: failed to get spindles", "err", err)
319 }
320
321 if regs == nil && spindles == nil {
322 return
323 }
324
325 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
326 Registrations: regs,
327 Spindles: spindles,
328 })
329}
330
331func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
332 // target is echoed back from the form via hx-vals so the response span's
333 // id matches the form's hx-target. Fallback keeps the handler useful if
334 // a caller forgets to send it.
335 target := strings.TrimSpace(r.FormValue("target"))
336 if target == "" {
337 target = "home"
338 }
339
340 w.Header().Set("Content-Type", "text/html")
341
342 emailAddr := strings.TrimSpace(r.FormValue("email"))
343 if !email.IsValidEmail(emailAddr) {
344 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
345 Id: target,
346 Error: "Invalid email address.",
347 })
348 return
349 }
350
351 // For logged-in users, persist the signup locally so the widget stays
352 // hidden across devices. The DB row is the render-time source of truth;
353 // Resend still owns the mailing list itself.
354 if user := s.oauth.GetMultiAccountUser(r); user != nil {
355 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
356 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
357 }
358 }
359
360 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
361 go func() {
362 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
363 s.logger.Error("failed to add newsletter contact", "error", err)
364 }
365 }()
366 } else {
367 s.logger.Error(
368 "failed to add newsletter contact, missing resend config",
369 "isKeyPresent", s.config.Resend.ApiKey != "",
370 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
371 "emailAddr", emailAddr,
372 )
373 }
374
375 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
376}
377
378// NewsletterDismiss records that a logged-in user has dismissed the newsletter
379// widget so it stays hidden across their devices. Anonymous callers get a 204
380// with no DB write — localStorage handles the per-browser fallback.
381func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
382 user := s.oauth.GetMultiAccountUser(r)
383 if user == nil {
384 w.WriteHeader(http.StatusNoContent)
385 return
386 }
387
388 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
389 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
390 }
391 w.WriteHeader(http.StatusNoContent)
392}
393
394func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
395 user := chi.URLParam(r, "user")
396 user = strings.TrimPrefix(user, "@")
397
398 if user == "" {
399 w.WriteHeader(http.StatusBadRequest)
400 return
401 }
402
403 id, err := s.idResolver.ResolveIdent(r.Context(), user)
404 if err != nil {
405 w.WriteHeader(http.StatusInternalServerError)
406 return
407 }
408
409 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
410 if err != nil {
411 s.logger.Error("failed to get public keys", "err", err)
412 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
413 return
414 }
415
416 if len(pubKeys) == 0 {
417 w.WriteHeader(http.StatusNoContent)
418 return
419 }
420
421 for _, k := range pubKeys {
422 key := strings.TrimRight(k.Key, "\n")
423 fmt.Fprintln(w, key)
424 }
425}
426
427func validateRepoName(name string) error {
428 // check for path traversal attempts
429 if name == "." || name == ".." ||
430 strings.Contains(name, "/") || strings.Contains(name, "\\") {
431 return fmt.Errorf("Repository name contains invalid path characters")
432 }
433
434 // check for sequences that could be used for traversal when normalized
435 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
436 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
437 return fmt.Errorf("Repository name contains invalid path sequence")
438 }
439
440 // then continue with character validation
441 for _, char := range name {
442 if !((char >= 'a' && char <= 'z') ||
443 (char >= 'A' && char <= 'Z') ||
444 (char >= '0' && char <= '9') ||
445 char == '-' || char == '_' || char == '.') {
446 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
447 }
448 }
449
450 // additional check to prevent multiple sequential dots
451 if strings.Contains(name, "..") {
452 return fmt.Errorf("Repository name cannot contain sequential dots")
453 }
454
455 // if all checks pass
456 return nil
457}
458
459func stripGitExt(name string) string {
460 return strings.TrimSuffix(name, ".git")
461}
462
463func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
464 switch r.Method {
465 case http.MethodGet:
466 user := s.oauth.GetMultiAccountUser(r)
467 knots, err := s.enforcer.GetKnotsForUser(user.Did)
468 if err != nil {
469 s.pages.Notice(w, "repo", "Invalid user account.")
470 return
471 }
472
473 s.pages.NewRepo(w, pages.NewRepoParams{
474 LoggedInUser: user,
475 Knots: knots,
476 })
477
478 case http.MethodPost:
479 l := s.logger.With("handler", "NewRepo")
480
481 user := s.oauth.GetMultiAccountUser(r)
482 l = l.With("did", user.Did)
483
484 // form validation
485 domain := r.FormValue("domain")
486 if domain == "" {
487 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
488 return
489 }
490 l = l.With("knot", domain)
491
492 repoName := r.FormValue("name")
493 if repoName == "" {
494 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
495 return
496 }
497
498 if err := validateRepoName(repoName); err != nil {
499 s.pages.Notice(w, "repo", err.Error())
500 return
501 }
502 repoName = stripGitExt(repoName)
503 l = l.With("repoName", repoName)
504
505 defaultBranch := r.FormValue("branch")
506 if defaultBranch == "" {
507 defaultBranch = "main"
508 }
509 l = l.With("defaultBranch", defaultBranch)
510
511 description := r.FormValue("description")
512 if len([]rune(description)) > 140 {
513 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
514 return
515 }
516
517 // ACL validation
518 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
519 if err != nil || !ok {
520 l.Info("unauthorized")
521 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
522 return
523 }
524
525 // Check for existing repos
526 existingRepo, err := db.GetRepo(
527 s.db,
528 orm.FilterEq("did", user.Did),
529 orm.FilterEq("name", repoName),
530 )
531 if err == nil && existingRepo != nil {
532 l.Info("repo exists")
533 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
534 return
535 }
536
537 rkey := tid.TID()
538
539 client, err := s.oauth.ServiceClient(
540 r,
541 oauth.WithService(domain),
542 oauth.WithLxm(tangled.RepoCreateNSID),
543 oauth.WithDev(s.config.Core.Dev),
544 )
545 if err != nil {
546 l.Error("service auth failed", "err", err)
547 s.pages.Notice(w, "repo", "Failed to reach knot server.")
548 return
549 }
550
551 input := &tangled.RepoCreate_Input{
552 Rkey: rkey,
553 Name: repoName,
554 DefaultBranch: &defaultBranch,
555 }
556 createResp, err := tangled.RepoCreate(
557 r.Context(),
558 client,
559 input,
560 )
561 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
562 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
563 s.pages.Notice(w, "repo", err.Error())
564 return
565 }
566
567 var repoDid string
568 if createResp != nil && createResp.RepoDid != nil {
569 repoDid = *createResp.RepoDid
570 }
571 if repoDid == "" {
572 l.Error("knot returned empty repo DID")
573 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
574 return
575 }
576
577 repo := &models.Repo{
578 Did: user.Did,
579 Name: repoName,
580 Knot: domain,
581 Rkey: rkey,
582 Description: description,
583 Created: time.Now(),
584 Labels: s.config.Label.DefaultLabelDefs,
585 RepoDid: repoDid,
586 }
587 record := repo.AsRecord()
588
589 cleanupKnot := func() {
590 go func() {
591 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
592 for attempt, delay := range delays {
593 time.Sleep(delay)
594 deleteClient, dErr := s.oauth.ServiceClient(
595 r,
596 oauth.WithService(domain),
597 oauth.WithLxm(tangled.RepoDeleteNSID),
598 oauth.WithDev(s.config.Core.Dev),
599 )
600 if dErr != nil {
601 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
602 continue
603 }
604 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
605 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
606 Did: user.Did,
607 Name: repoName,
608 Rkey: rkey,
609 }); dErr != nil {
610 cancel()
611 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
612 continue
613 }
614 cancel()
615 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
616 return
617 }
618 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
619 "did", user.Did, "repo", repoName, "knot", domain)
620 }()
621 }
622
623 atpClient, err := s.oauth.AuthorizedClient(r)
624 if err != nil {
625 l.Info("PDS write failed", "err", err)
626 cleanupKnot()
627 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
628 return
629 }
630
631 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
632 Collection: tangled.RepoNSID,
633 Repo: user.Did,
634 Rkey: rkey,
635 Record: &lexutil.LexiconTypeDecoder{
636 Val: &record,
637 },
638 })
639 if err != nil {
640 l.Info("PDS write failed", "err", err)
641 cleanupKnot()
642 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
643 return
644 }
645
646 aturi := atresp.Uri
647 l = l.With("aturi", aturi)
648 l.Info("wrote to PDS")
649
650 tx, err := s.db.BeginTx(r.Context(), nil)
651 if err != nil {
652 l.Info("txn failed", "err", err)
653 s.pages.Notice(w, "repo", "Failed to save repository information.")
654 return
655 }
656
657 rollback := func() {
658 err1 := tx.Rollback()
659 err2 := s.enforcer.E.LoadPolicy()
660 err3 := rollbackRecord(context.Background(), aturi, atpClient)
661
662 if errors.Is(err1, sql.ErrTxDone) {
663 err1 = nil
664 }
665
666 if errs := errors.Join(err1, err2, err3); errs != nil {
667 l.Error("failed to rollback changes", "errs", errs)
668 }
669
670 if aturi != "" {
671 cleanupKnot()
672 }
673 }
674 defer rollback()
675
676 err = db.AddRepo(tx, repo)
677 if err != nil {
678 l.Error("db write failed", "err", err)
679 s.pages.Notice(w, "repo", "Failed to save repository information.")
680 return
681 }
682
683 rbacPath := repo.RepoIdentifier()
684 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
685 if err != nil {
686 l.Error("acl setup failed", "err", err)
687 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
688 return
689 }
690
691 err = tx.Commit()
692 if err != nil {
693 l.Error("txn commit failed", "err", err)
694 http.Error(w, err.Error(), http.StatusInternalServerError)
695 return
696 }
697
698 err = s.enforcer.E.SavePolicy()
699 if err != nil {
700 l.Error("acl save failed", "err", err)
701 http.Error(w, err.Error(), http.StatusInternalServerError)
702 return
703 }
704
705 aturi = ""
706
707 s.notifier.NewRepo(r.Context(), repo)
708 switch {
709 case repoDid != "":
710 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
711 default:
712 handle := s.pages.DisplayHandle(r.Context(), user.Did)
713 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, repoName))
714 }
715 }
716}
717
718// this is used to rollback changes made to the PDS
719//
720// it is a no-op if the provided ATURI is empty
721func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
722 if aturi == "" {
723 return nil
724 }
725
726 parsed := syntax.ATURI(aturi)
727
728 collection := parsed.Collection().String()
729 repo := parsed.Authority().String()
730 rkey := parsed.RecordKey().String()
731
732 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
733 Collection: collection,
734 Repo: repo,
735 Rkey: rkey,
736 })
737 return err
738}
739
740func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
741 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
742 if err != nil {
743 return err
744 }
745 // already present
746 if len(defaultLabels) == len(defaults) {
747 return nil
748 }
749
750 labelDefs, err := models.FetchLabelDefs(r, defaults)
751 if err != nil {
752 return err
753 }
754
755 // Insert each label definition to the database
756 for _, labelDef := range labelDefs {
757 _, err = db.AddLabelDefinition(e, &labelDef)
758 if err != nil {
759 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
760 }
761 }
762
763 return nil
764}
765
766func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
767 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
768 if err != nil {
769 logger.Error("failed to resolve tangled.org DID", "err", err)
770 return
771 }
772
773 pdsEndpoint := resolved.PDSEndpoint()
774 if pdsEndpoint == "" {
775 logger.Error("no PDS endpoint found for tangled.sh DID")
776 return
777 }
778
779 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
780 if err != nil {
781 logger.Error("failed to create appassword session... skipping fetch", "err", err)
782 return
783 }
784
785 l := log.SubLogger(logger, "bluesky")
786
787 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
788 defer ticker.Stop()
789
790 for {
791 // refresh session if necessary
792 if !session.IsValid() {
793 l.Debug("access token expired, refreshing session")
794 if err := session.RefreshSession(); err != nil {
795 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
796 return
797 }
798 l.Debug("session refreshed")
799 }
800
801 // make client
802 client := xrpc.Client{
803 Auth: &xrpc.AuthInfo{
804 AccessJwt: session.AccessJwt,
805 Did: session.Did,
806 },
807 Host: session.PdsEndpoint,
808 }
809
810 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
811 if err != nil {
812 l.Error("failed to fetch bluesky posts", "err", err)
813 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
814 l.Error("failed to insert bluesky posts", "err", err)
815 } else {
816 l.Info("inserted bluesky posts", "count", len(posts))
817 }
818
819 select {
820 case <-ticker.C:
821 case <-ctx.Done():
822 l.Info("stopping bluesky updater")
823 return
824 }
825 }
826}