forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cloudflare"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/email"
21 "tangled.org/core/appview/indexer"
22 "tangled.org/core/appview/mentions"
23 "tangled.org/core/appview/models"
24 "tangled.org/core/appview/notify"
25 dbnotify "tangled.org/core/appview/notify/db"
26 lognotify "tangled.org/core/appview/notify/logging"
27 phnotify "tangled.org/core/appview/notify/posthog"
28 whnotify "tangled.org/core/appview/notify/webhook"
29 "tangled.org/core/appview/oauth"
30 "tangled.org/core/appview/pages"
31 "tangled.org/core/appview/reporesolver"
32 "tangled.org/core/appview/validator"
33 xrpcclient "tangled.org/core/appview/xrpcclient"
34 "tangled.org/core/consts"
35 "tangled.org/core/eventconsumer"
36 "tangled.org/core/idresolver"
37 "tangled.org/core/jetstream"
38 "tangled.org/core/log"
39 tlog "tangled.org/core/log"
40 "tangled.org/core/orm"
41 "tangled.org/core/rbac"
42 "tangled.org/core/tid"
43
44 comatproto "github.com/bluesky-social/indigo/api/atproto"
45 "github.com/bluesky-social/indigo/atproto/atclient"
46 "github.com/bluesky-social/indigo/atproto/syntax"
47 lexutil "github.com/bluesky-social/indigo/lex/util"
48 "github.com/bluesky-social/indigo/xrpc"
49
50 "github.com/go-chi/chi/v5"
51 "github.com/posthog/posthog-go"
52)
53
54type State struct {
55 db *db.DB
56 notifier notify.Notifier
57 indexer *indexer.Indexer
58 oauth *oauth.OAuth
59 enforcer *rbac.Enforcer
60 pages *pages.Pages
61 idResolver *idresolver.Resolver
62 rdb *cache.Cache
63 mentionsResolver *mentions.Resolver
64 posthog posthog.Client
65 jc *jetstream.JetstreamClient
66 config *config.Config
67 repoResolver *reporesolver.RepoResolver
68 knotstream *eventconsumer.Consumer
69 spindlestream *eventconsumer.Consumer
70 logger *slog.Logger
71 validator *validator.Validator
72 cfClient *cloudflare.Client
73}
74
75func Make(ctx context.Context, config *config.Config) (*State, error) {
76 logger := tlog.FromContext(ctx)
77
78 d, err := db.Make(ctx, config.Core.DbPath)
79 if err != nil {
80 return nil, fmt.Errorf("failed to create db: %w", err)
81 }
82
83 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
84 err = indexer.Init(ctx)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create indexer: %w", err)
87 }
88
89 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
90 if err != nil {
91 return nil, fmt.Errorf("failed to create enforcer: %w", err)
92 }
93
94 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
95 if err != nil {
96 logger.Error("failed to create redis resolver", "err", err)
97 res = idresolver.DefaultResolver(config.Plc.PLCURL)
98 }
99
100 var rdb *cache.Cache
101 if config.Redis.Addr != "" {
102 rdb = cache.New(config.Redis.Addr)
103 }
104
105 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
106 if err != nil {
107 return nil, fmt.Errorf("failed to create posthog client: %w", err)
108 }
109
110 pages := pages.NewPages(config, res, d, rdb, log.SubLogger(logger, "pages"))
111 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
112 if err != nil {
113 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
114 }
115 validator := validator.New(d, res, enforcer)
116
117 repoResolver := reporesolver.New(config, enforcer, d, rdb)
118
119 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
120
121 wrapper := db.DbWrapper{Execer: d}
122 jc, err := jetstream.NewJetstreamClient(
123 config.Jetstream.Endpoint,
124 "appview",
125 []string{
126 tangled.GraphFollowNSID,
127 tangled.FeedStarNSID,
128 tangled.PublicKeyNSID,
129 tangled.RepoArtifactNSID,
130 tangled.ActorProfileNSID,
131 tangled.KnotMemberNSID,
132 tangled.SpindleMemberNSID,
133 tangled.SpindleNSID,
134 tangled.KnotNSID,
135 tangled.StringNSID,
136 tangled.RepoPullNSID,
137 tangled.RepoIssueNSID,
138 tangled.RepoIssueCommentNSID,
139 tangled.LabelDefinitionNSID,
140 tangled.LabelOpNSID,
141 },
142 nil,
143 tlog.SubLogger(logger, "jetstream"),
144 wrapper,
145 false,
146
147 // in-memory filter is inapplicable to appview so
148 // we'll never log dids anyway.
149 false,
150 )
151 if err != nil {
152 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
153 }
154
155 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
156 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
157 }
158
159 ingester := appview.Ingester{
160 Db: wrapper,
161 Enforcer: enforcer,
162 IdResolver: res,
163 Cache: rdb,
164 Config: config,
165 Logger: log.SubLogger(logger, "ingester"),
166 Validator: validator,
167 }
168 err = jc.StartJetstream(ctx, ingester.Ingest())
169 if err != nil {
170 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
171 }
172
173 var notifiers []notify.Notifier
174
175 // Always add the database notifier
176 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
177
178 // Add other notifiers in production only
179 if !config.Core.Dev {
180 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
181 }
182 notifiers = append(notifiers, indexer)
183
184 notifiers = append(notifiers, whnotify.NewNotifier(d))
185
186 notifier := notify.NewMergedNotifier(notifiers)
187 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
188
189 var cfClient *cloudflare.Client
190 if config.Cloudflare.ApiToken != "" {
191 cfClient, err = cloudflare.New(config)
192 if err != nil {
193 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
194 cfClient = nil
195 }
196 }
197
198 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
199 if err != nil {
200 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
201 }
202 knotstream.Start(ctx)
203
204 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
205 if err != nil {
206 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
207 }
208 spindlestream.Start(ctx)
209
210 state := &State{
211 db: d,
212 notifier: notifier,
213 indexer: indexer,
214 oauth: oauth,
215 enforcer: enforcer,
216 pages: pages,
217 idResolver: res,
218 rdb: rdb,
219 mentionsResolver: mentionsResolver,
220 posthog: posthog,
221 jc: jc,
222 config: config,
223 repoResolver: repoResolver,
224 knotstream: knotstream,
225 spindlestream: spindlestream,
226 logger: logger,
227 validator: validator,
228 cfClient: cfClient,
229 }
230
231 // fetch initial bluesky posts if configured
232 go fetchBskyPosts(ctx, res, config, d, logger)
233
234 return state, nil
235}
236
237func (s *State) Close() error {
238 // other close up logic goes here
239 return s.db.Close()
240}
241
242func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
243 w.Header().Set("Content-Type", "text/plain")
244 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
245
246 securityTxt := `Contact: mailto:security@tangled.org
247Preferred-Languages: en
248Canonical: https://tangled.org/.well-known/security.txt
249Expires: 2030-01-01T21:59:00.000Z
250`
251 w.Write([]byte(securityTxt))
252}
253
254func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
255 w.Header().Set("Content-Type", "text/plain")
256 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
257
258 robotsTxt := `# Hello, Tanglers!
259User-agent: *
260Allow: /
261Disallow: /*/*/settings
262Disallow: /settings
263Disallow: /*/*/compare
264Disallow: /*/*/fork
265
266Crawl-delay: 1
267`
268 w.Write([]byte(robotsTxt))
269}
270
271func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
272 user := s.oauth.GetMultiAccountUser(r)
273 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
274 LoggedInUser: user,
275 })
276}
277
278func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
279 user := s.oauth.GetMultiAccountUser(r)
280 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
281 LoggedInUser: user,
282 })
283}
284
285func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
286 user := s.oauth.GetMultiAccountUser(r)
287 s.pages.Brand(w, pages.BrandParams{
288 LoggedInUser: user,
289 })
290}
291
292func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
293 user := s.oauth.GetMultiAccountUser(r)
294 if user == nil {
295 return
296 }
297
298 l := s.logger.With("handler", "UpgradeBanner")
299 l = l.With("did", user.Did)
300
301 regs, err := db.GetRegistrations(
302 s.db,
303 orm.FilterEq("did", user.Did),
304 orm.FilterEq("needs_upgrade", 1),
305 )
306 if err != nil {
307 l.Error("non-fatal: failed to get registrations", "err", err)
308 }
309
310 spindles, err := db.GetSpindles(
311 r.Context(),
312 s.db,
313 orm.FilterEq("owner", user.Did),
314 orm.FilterEq("needs_upgrade", 1),
315 )
316 if err != nil {
317 l.Error("non-fatal: failed to get spindles", "err", err)
318 }
319
320 if regs == nil && spindles == nil {
321 return
322 }
323
324 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
325 Registrations: regs,
326 Spindles: spindles,
327 })
328}
329
330func (s *State) NewsletterSignup(w http.ResponseWriter, r *http.Request) {
331 // target is echoed back from the form via hx-vals so the response span's
332 // id matches the form's hx-target. Fallback keeps the handler useful if
333 // a caller forgets to send it.
334 target := strings.TrimSpace(r.FormValue("target"))
335 if target == "" {
336 target = "home"
337 }
338
339 w.Header().Set("Content-Type", "text/html")
340
341 emailAddr := strings.TrimSpace(r.FormValue("email"))
342 if !email.IsValidEmail(emailAddr) {
343 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{
344 Id: target,
345 Error: "Invalid email address.",
346 })
347 return
348 }
349
350 // For logged-in users, persist the signup locally so the widget stays
351 // hidden across devices. The DB row is the render-time source of truth;
352 // Resend still owns the mailing list itself.
353 if user := s.oauth.GetMultiAccountUser(r); user != nil {
354 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusSubscribed, emailAddr); err != nil {
355 s.logger.Error("failed to persist newsletter preference", "did", user.Did, "err", err)
356 }
357 }
358
359 if s.config.Resend.ApiKey != "" && s.config.Resend.NewsletterSegmentId != "" {
360 go func() {
361 if err := email.AddNewsletterContact(s.config.Resend.ApiKey, s.config.Resend.NewsletterSegmentId, emailAddr); err != nil {
362 s.logger.Error("failed to add newsletter contact", "error", err)
363 }
364 }()
365 } else {
366 s.logger.Error(
367 "failed to add newsletter contact, missing resend config",
368 "isKeyPresent", s.config.Resend.ApiKey != "",
369 "isSegmentIdPresent", s.config.Resend.NewsletterSegmentId != "",
370 "emailAddr", emailAddr,
371 )
372 }
373
374 s.pages.NewsletterResponse(w, pages.NewsletterResponseParams{Id: target})
375}
376
377// NewsletterDismiss records that a logged-in user has dismissed the newsletter
378// widget so it stays hidden across their devices. Anonymous callers get a 204
379// with no DB write — localStorage handles the per-browser fallback.
380func (s *State) NewsletterDismiss(w http.ResponseWriter, r *http.Request) {
381 user := s.oauth.GetMultiAccountUser(r)
382 if user == nil {
383 w.WriteHeader(http.StatusNoContent)
384 return
385 }
386
387 if err := db.UpsertNewsletterPref(s.db, user.Did, db.NewsletterStatusDismissed, ""); err != nil {
388 s.logger.Error("failed to persist newsletter dismissal", "did", user.Did, "err", err)
389 }
390 w.WriteHeader(http.StatusNoContent)
391}
392
393func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
394 user := chi.URLParam(r, "user")
395 user = strings.TrimPrefix(user, "@")
396
397 if user == "" {
398 w.WriteHeader(http.StatusBadRequest)
399 return
400 }
401
402 id, err := s.idResolver.ResolveIdent(r.Context(), user)
403 if err != nil {
404 w.WriteHeader(http.StatusInternalServerError)
405 return
406 }
407
408 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
409 if err != nil {
410 s.logger.Error("failed to get public keys", "err", err)
411 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
412 return
413 }
414
415 if len(pubKeys) == 0 {
416 w.WriteHeader(http.StatusNoContent)
417 return
418 }
419
420 for _, k := range pubKeys {
421 key := strings.TrimRight(k.Key, "\n")
422 fmt.Fprintln(w, key)
423 }
424}
425
426func validateRepoName(name string) error {
427 // check for path traversal attempts
428 if name == "." || name == ".." ||
429 strings.Contains(name, "/") || strings.Contains(name, "\\") {
430 return fmt.Errorf("Repository name contains invalid path characters")
431 }
432
433 // check for sequences that could be used for traversal when normalized
434 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
435 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
436 return fmt.Errorf("Repository name contains invalid path sequence")
437 }
438
439 // then continue with character validation
440 for _, char := range name {
441 if !((char >= 'a' && char <= 'z') ||
442 (char >= 'A' && char <= 'Z') ||
443 (char >= '0' && char <= '9') ||
444 char == '-' || char == '_' || char == '.') {
445 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
446 }
447 }
448
449 // additional check to prevent multiple sequential dots
450 if strings.Contains(name, "..") {
451 return fmt.Errorf("Repository name cannot contain sequential dots")
452 }
453
454 // if all checks pass
455 return nil
456}
457
458func stripGitExt(name string) string {
459 return strings.TrimSuffix(name, ".git")
460}
461
462func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
463 switch r.Method {
464 case http.MethodGet:
465 user := s.oauth.GetMultiAccountUser(r)
466 knots, err := s.enforcer.GetKnotsForUser(user.Did)
467 if err != nil {
468 s.pages.Notice(w, "repo", "Invalid user account.")
469 return
470 }
471
472 s.pages.NewRepo(w, pages.NewRepoParams{
473 LoggedInUser: user,
474 Knots: knots,
475 })
476
477 case http.MethodPost:
478 l := s.logger.With("handler", "NewRepo")
479
480 user := s.oauth.GetMultiAccountUser(r)
481 l = l.With("did", user.Did)
482
483 // form validation
484 domain := r.FormValue("domain")
485 if domain == "" {
486 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
487 return
488 }
489 l = l.With("knot", domain)
490
491 repoName := r.FormValue("name")
492 if repoName == "" {
493 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
494 return
495 }
496
497 if err := validateRepoName(repoName); err != nil {
498 s.pages.Notice(w, "repo", err.Error())
499 return
500 }
501 repoName = stripGitExt(repoName)
502 l = l.With("repoName", repoName)
503
504 defaultBranch := r.FormValue("branch")
505 if defaultBranch == "" {
506 defaultBranch = "main"
507 }
508 l = l.With("defaultBranch", defaultBranch)
509
510 description := r.FormValue("description")
511 if len([]rune(description)) > 140 {
512 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
513 return
514 }
515
516 // ACL validation
517 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
518 if err != nil || !ok {
519 l.Info("unauthorized")
520 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
521 return
522 }
523
524 // Check for existing repos
525 existingRepo, err := db.GetRepo(
526 s.db,
527 orm.FilterEq("did", user.Did),
528 orm.FilterEq("name", repoName),
529 )
530 if err == nil && existingRepo != nil {
531 l.Info("repo exists")
532 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
533 return
534 }
535
536 rkey := tid.TID()
537
538 client, err := s.oauth.ServiceClient(
539 r,
540 oauth.WithService(domain),
541 oauth.WithLxm(tangled.RepoCreateNSID),
542 oauth.WithDev(s.config.Core.Dev),
543 )
544 if err != nil {
545 l.Error("service auth failed", "err", err)
546 s.pages.Notice(w, "repo", "Failed to reach knot server.")
547 return
548 }
549
550 input := &tangled.RepoCreate_Input{
551 Rkey: rkey,
552 Name: repoName,
553 DefaultBranch: &defaultBranch,
554 }
555 createResp, err := tangled.RepoCreate(
556 r.Context(),
557 client,
558 input,
559 )
560 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
561 l.Error("failed to call XRPC repo.create", "xrpcerr", xrpcerr, "err", err)
562 s.pages.Notice(w, "repo", err.Error())
563 return
564 }
565
566 var repoDid string
567 if createResp != nil && createResp.RepoDid != nil {
568 repoDid = *createResp.RepoDid
569 }
570 if repoDid == "" {
571 l.Error("knot returned empty repo DID")
572 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
573 return
574 }
575
576 repo := &models.Repo{
577 Did: user.Did,
578 Name: repoName,
579 Knot: domain,
580 Rkey: rkey,
581 Description: description,
582 Created: time.Now(),
583 Labels: s.config.Label.DefaultLabelDefs,
584 RepoDid: repoDid,
585 }
586 record := repo.AsRecord()
587
588 cleanupKnot := func() {
589 go func() {
590 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
591 for attempt, delay := range delays {
592 time.Sleep(delay)
593 deleteClient, dErr := s.oauth.ServiceClient(
594 r,
595 oauth.WithService(domain),
596 oauth.WithLxm(tangled.RepoDeleteNSID),
597 oauth.WithDev(s.config.Core.Dev),
598 )
599 if dErr != nil {
600 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
601 continue
602 }
603 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
604 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
605 Did: user.Did,
606 Name: repoName,
607 Rkey: rkey,
608 }); dErr != nil {
609 cancel()
610 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
611 continue
612 }
613 cancel()
614 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
615 return
616 }
617 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
618 "did", user.Did, "repo", repoName, "knot", domain)
619 }()
620 }
621
622 atpClient, err := s.oauth.AuthorizedClient(r)
623 if err != nil {
624 l.Info("PDS write failed", "err", err)
625 cleanupKnot()
626 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
627 return
628 }
629
630 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
631 Collection: tangled.RepoNSID,
632 Repo: user.Did,
633 Rkey: rkey,
634 Record: &lexutil.LexiconTypeDecoder{
635 Val: &record,
636 },
637 })
638 if err != nil {
639 l.Info("PDS write failed", "err", err)
640 cleanupKnot()
641 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
642 return
643 }
644
645 aturi := atresp.Uri
646 l = l.With("aturi", aturi)
647 l.Info("wrote to PDS")
648
649 tx, err := s.db.BeginTx(r.Context(), nil)
650 if err != nil {
651 l.Info("txn failed", "err", err)
652 s.pages.Notice(w, "repo", "Failed to save repository information.")
653 return
654 }
655
656 rollback := func() {
657 err1 := tx.Rollback()
658 err2 := s.enforcer.E.LoadPolicy()
659 err3 := rollbackRecord(context.Background(), aturi, atpClient)
660
661 if errors.Is(err1, sql.ErrTxDone) {
662 err1 = nil
663 }
664
665 if errs := errors.Join(err1, err2, err3); errs != nil {
666 l.Error("failed to rollback changes", "errs", errs)
667 }
668
669 if aturi != "" {
670 cleanupKnot()
671 }
672 }
673 defer rollback()
674
675 err = db.AddRepo(tx, repo)
676 if err != nil {
677 l.Error("db write failed", "err", err)
678 s.pages.Notice(w, "repo", "Failed to save repository information.")
679 return
680 }
681
682 rbacPath := repo.RepoIdentifier()
683 err = s.enforcer.AddRepo(user.Did, domain, rbacPath)
684 if err != nil {
685 l.Error("acl setup failed", "err", err)
686 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
687 return
688 }
689
690 err = tx.Commit()
691 if err != nil {
692 l.Error("txn commit failed", "err", err)
693 http.Error(w, err.Error(), http.StatusInternalServerError)
694 return
695 }
696
697 err = s.enforcer.E.SavePolicy()
698 if err != nil {
699 l.Error("acl save failed", "err", err)
700 http.Error(w, err.Error(), http.StatusInternalServerError)
701 return
702 }
703
704 aturi = ""
705
706 s.notifier.NewRepo(r.Context(), repo)
707 switch {
708 case repoDid != "":
709 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
710 default:
711 handle := s.pages.DisplayHandle(r.Context(), user.Did)
712 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", handle, repoName))
713 }
714 }
715}
716
717// this is used to rollback changes made to the PDS
718//
719// it is a no-op if the provided ATURI is empty
720func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
721 if aturi == "" {
722 return nil
723 }
724
725 parsed := syntax.ATURI(aturi)
726
727 collection := parsed.Collection().String()
728 repo := parsed.Authority().String()
729 rkey := parsed.RecordKey().String()
730
731 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
732 Collection: collection,
733 Repo: repo,
734 Rkey: rkey,
735 })
736 return err
737}
738
739func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
740 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
741 if err != nil {
742 return err
743 }
744 // already present
745 if len(defaultLabels) == len(defaults) {
746 return nil
747 }
748
749 labelDefs, err := models.FetchLabelDefs(r, defaults)
750 if err != nil {
751 return err
752 }
753
754 // Insert each label definition to the database
755 for _, labelDef := range labelDefs {
756 _, err = db.AddLabelDefinition(e, &labelDef)
757 if err != nil {
758 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
759 }
760 }
761
762 return nil
763}
764
765func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
766 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
767 if err != nil {
768 logger.Error("failed to resolve tangled.org DID", "err", err)
769 return
770 }
771
772 pdsEndpoint := resolved.PDSEndpoint()
773 if pdsEndpoint == "" {
774 logger.Error("no PDS endpoint found for tangled.sh DID")
775 return
776 }
777
778 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
779 if err != nil {
780 logger.Error("failed to create appassword session... skipping fetch", "err", err)
781 return
782 }
783
784 l := log.SubLogger(logger, "bluesky")
785
786 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
787 defer ticker.Stop()
788
789 for {
790 // refresh session if necessary
791 if !session.IsValid() {
792 l.Debug("access token expired, refreshing session")
793 if err := session.RefreshSession(); err != nil {
794 l.Error("failed to refresh session, stopping bluesky updater", "err", err)
795 return
796 }
797 l.Debug("session refreshed")
798 }
799
800 // make client
801 client := xrpc.Client{
802 Auth: &xrpc.AuthInfo{
803 AccessJwt: session.AccessJwt,
804 Did: session.Did,
805 },
806 Host: session.PdsEndpoint,
807 }
808
809 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
810 if err != nil {
811 l.Error("failed to fetch bluesky posts", "err", err)
812 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
813 l.Error("failed to insert bluesky posts", "err", err)
814 } else {
815 l.Info("inserted bluesky posts", "count", len(posts))
816 }
817
818 select {
819 case <-ticker.C:
820 case <-ctx.Done():
821 l.Info("stopping bluesky updater")
822 return
823 }
824 }
825}