forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cloudflare"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/indexer"
20 "tangled.org/core/appview/mentions"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/notify"
23 dbnotify "tangled.org/core/appview/notify/db"
24 lognotify "tangled.org/core/appview/notify/logging"
25 phnotify "tangled.org/core/appview/notify/posthog"
26 whnotify "tangled.org/core/appview/notify/webhook"
27 "tangled.org/core/appview/oauth"
28 "tangled.org/core/appview/pages"
29 "tangled.org/core/appview/reporesolver"
30 "tangled.org/core/appview/validator"
31 xrpcclient "tangled.org/core/appview/xrpcclient"
32 "tangled.org/core/consts"
33 "tangled.org/core/eventconsumer"
34 "tangled.org/core/idresolver"
35 "tangled.org/core/jetstream"
36 "tangled.org/core/log"
37 tlog "tangled.org/core/log"
38 "tangled.org/core/orm"
39 "tangled.org/core/rbac"
40 "tangled.org/core/tid"
41
42 comatproto "github.com/bluesky-social/indigo/api/atproto"
43 "github.com/bluesky-social/indigo/atproto/atclient"
44 "github.com/bluesky-social/indigo/atproto/syntax"
45 lexutil "github.com/bluesky-social/indigo/lex/util"
46 "github.com/bluesky-social/indigo/xrpc"
47
48 "github.com/go-chi/chi/v5"
49 "github.com/posthog/posthog-go"
50)
51
52type State struct {
53 db *db.DB
54 notifier notify.Notifier
55 indexer *indexer.Indexer
56 oauth *oauth.OAuth
57 enforcer *rbac.Enforcer
58 pages *pages.Pages
59 idResolver *idresolver.Resolver
60 mentionsResolver *mentions.Resolver
61 posthog posthog.Client
62 jc *jetstream.JetstreamClient
63 config *config.Config
64 repoResolver *reporesolver.RepoResolver
65 knotstream *eventconsumer.Consumer
66 spindlestream *eventconsumer.Consumer
67 logger *slog.Logger
68 validator *validator.Validator
69 cfClient *cloudflare.Client
70}
71
72func Make(ctx context.Context, config *config.Config) (*State, error) {
73 logger := tlog.FromContext(ctx)
74
75 d, err := db.Make(ctx, config.Core.DbPath)
76 if err != nil {
77 return nil, fmt.Errorf("failed to create db: %w", err)
78 }
79
80 indexer := indexer.New(log.SubLogger(logger, "indexer"))
81 err = indexer.Init(ctx, d)
82 if err != nil {
83 return nil, fmt.Errorf("failed to create indexer: %w", err)
84 }
85
86 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
87 if err != nil {
88 return nil, fmt.Errorf("failed to create enforcer: %w", err)
89 }
90
91 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
92 if err != nil {
93 logger.Error("failed to create redis resolver", "err", err)
94 res = idresolver.DefaultResolver(config.Plc.PLCURL)
95 }
96
97 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
98 if err != nil {
99 return nil, fmt.Errorf("failed to create posthog client: %w", err)
100 }
101
102 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
103 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
104 if err != nil {
105 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
106 }
107 validator := validator.New(d, res, enforcer)
108
109 repoResolver := reporesolver.New(config, enforcer, d)
110
111 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
112
113 wrapper := db.DbWrapper{Execer: d}
114 jc, err := jetstream.NewJetstreamClient(
115 config.Jetstream.Endpoint,
116 "appview",
117 []string{
118 tangled.GraphFollowNSID,
119 tangled.FeedStarNSID,
120 tangled.PublicKeyNSID,
121 tangled.RepoArtifactNSID,
122 tangled.ActorProfileNSID,
123 tangled.KnotMemberNSID,
124 tangled.SpindleMemberNSID,
125 tangled.SpindleNSID,
126 tangled.StringNSID,
127 tangled.RepoIssueNSID,
128 tangled.RepoIssueCommentNSID,
129 tangled.LabelDefinitionNSID,
130 tangled.LabelOpNSID,
131 },
132 nil,
133 tlog.SubLogger(logger, "jetstream"),
134 wrapper,
135 false,
136
137 // in-memory filter is inapplicable to appview so
138 // we'll never log dids anyway.
139 false,
140 )
141 if err != nil {
142 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
143 }
144
145 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
146 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
147 }
148
149 ingester := appview.Ingester{
150 Db: wrapper,
151 Enforcer: enforcer,
152 IdResolver: res,
153 Config: config,
154 Logger: log.SubLogger(logger, "ingester"),
155 Validator: validator,
156 }
157 err = jc.StartJetstream(ctx, ingester.Ingest())
158 if err != nil {
159 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
160 }
161
162 var notifiers []notify.Notifier
163
164 // Always add the database notifier
165 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
166
167 // Add other notifiers in production only
168 if !config.Core.Dev {
169 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
170 }
171 notifiers = append(notifiers, indexer)
172
173 notifiers = append(notifiers, whnotify.NewNotifier(d))
174
175 notifier := notify.NewMergedNotifier(notifiers)
176 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
177
178 var cfClient *cloudflare.Client
179 if config.Cloudflare.ApiToken != "" {
180 cfClient, err = cloudflare.New(config)
181 if err != nil {
182 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
183 cfClient = nil
184 }
185 }
186
187 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
188 if err != nil {
189 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
190 }
191 knotstream.Start(ctx)
192
193 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
194 if err != nil {
195 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
196 }
197 spindlestream.Start(ctx)
198
199 state := &State{
200 db: d,
201 notifier: notifier,
202 indexer: indexer,
203 oauth: oauth,
204 enforcer: enforcer,
205 pages: pages,
206 idResolver: res,
207 mentionsResolver: mentionsResolver,
208 posthog: posthog,
209 jc: jc,
210 config: config,
211 repoResolver: repoResolver,
212 knotstream: knotstream,
213 spindlestream: spindlestream,
214 logger: logger,
215 validator: validator,
216 cfClient: cfClient,
217 }
218
219 // fetch initial bluesky posts if configured
220 go fetchBskyPosts(ctx, res, config, d, logger)
221
222 return state, nil
223}
224
225func (s *State) Close() error {
226 // other close up logic goes here
227 return s.db.Close()
228}
229
230func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
231 w.Header().Set("Content-Type", "text/plain")
232 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
233
234 securityTxt := `Contact: mailto:security@tangled.org
235Preferred-Languages: en
236Canonical: https://tangled.org/.well-known/security.txt
237Expires: 2030-01-01T21:59:00.000Z
238`
239 w.Write([]byte(securityTxt))
240}
241
242func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
243 w.Header().Set("Content-Type", "text/plain")
244 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
245
246 robotsTxt := `# Hello, Tanglers!
247User-agent: *
248Allow: /
249Disallow: /*/*/settings
250Disallow: /settings
251Disallow: /*/*/compare
252Disallow: /*/*/fork
253
254Crawl-delay: 1
255`
256 w.Write([]byte(robotsTxt))
257}
258
259func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
260 user := s.oauth.GetMultiAccountUser(r)
261 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
262 LoggedInUser: user,
263 })
264}
265
266func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
267 user := s.oauth.GetMultiAccountUser(r)
268 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
269 LoggedInUser: user,
270 })
271}
272
273func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
274 user := s.oauth.GetMultiAccountUser(r)
275 s.pages.Brand(w, pages.BrandParams{
276 LoggedInUser: user,
277 })
278}
279
280func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
281 user := s.oauth.GetMultiAccountUser(r)
282 if user == nil {
283 return
284 }
285
286 l := s.logger.With("handler", "UpgradeBanner")
287 l = l.With("did", user.Active.Did)
288
289 regs, err := db.GetRegistrations(
290 s.db,
291 orm.FilterEq("did", user.Active.Did),
292 orm.FilterEq("needs_upgrade", 1),
293 )
294 if err != nil {
295 l.Error("non-fatal: failed to get registrations", "err", err)
296 }
297
298 spindles, err := db.GetSpindles(
299 r.Context(),
300 s.db,
301 orm.FilterEq("owner", user.Active.Did),
302 orm.FilterEq("needs_upgrade", 1),
303 )
304 if err != nil {
305 l.Error("non-fatal: failed to get spindles", "err", err)
306 }
307
308 if regs == nil && spindles == nil {
309 return
310 }
311
312 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
313 Registrations: regs,
314 Spindles: spindles,
315 })
316}
317
318func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
319 user := chi.URLParam(r, "user")
320 user = strings.TrimPrefix(user, "@")
321
322 if user == "" {
323 w.WriteHeader(http.StatusBadRequest)
324 return
325 }
326
327 id, err := s.idResolver.ResolveIdent(r.Context(), user)
328 if err != nil {
329 w.WriteHeader(http.StatusInternalServerError)
330 return
331 }
332
333 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
334 if err != nil {
335 s.logger.Error("failed to get public keys", "err", err)
336 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
337 return
338 }
339
340 if len(pubKeys) == 0 {
341 w.WriteHeader(http.StatusNoContent)
342 return
343 }
344
345 for _, k := range pubKeys {
346 key := strings.TrimRight(k.Key, "\n")
347 fmt.Fprintln(w, key)
348 }
349}
350
351func validateRepoName(name string) error {
352 // check for path traversal attempts
353 if name == "." || name == ".." ||
354 strings.Contains(name, "/") || strings.Contains(name, "\\") {
355 return fmt.Errorf("Repository name contains invalid path characters")
356 }
357
358 // check for sequences that could be used for traversal when normalized
359 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
360 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
361 return fmt.Errorf("Repository name contains invalid path sequence")
362 }
363
364 // then continue with character validation
365 for _, char := range name {
366 if !((char >= 'a' && char <= 'z') ||
367 (char >= 'A' && char <= 'Z') ||
368 (char >= '0' && char <= '9') ||
369 char == '-' || char == '_' || char == '.') {
370 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
371 }
372 }
373
374 // additional check to prevent multiple sequential dots
375 if strings.Contains(name, "..") {
376 return fmt.Errorf("Repository name cannot contain sequential dots")
377 }
378
379 // if all checks pass
380 return nil
381}
382
383func stripGitExt(name string) string {
384 return strings.TrimSuffix(name, ".git")
385}
386
387func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
388 switch r.Method {
389 case http.MethodGet:
390 user := s.oauth.GetMultiAccountUser(r)
391 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
392 if err != nil {
393 s.pages.Notice(w, "repo", "Invalid user account.")
394 return
395 }
396
397 s.pages.NewRepo(w, pages.NewRepoParams{
398 LoggedInUser: user,
399 Knots: knots,
400 })
401
402 case http.MethodPost:
403 l := s.logger.With("handler", "NewRepo")
404
405 user := s.oauth.GetMultiAccountUser(r)
406 l = l.With("did", user.Active.Did)
407
408 // form validation
409 domain := r.FormValue("domain")
410 if domain == "" {
411 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
412 return
413 }
414 l = l.With("knot", domain)
415
416 repoName := r.FormValue("name")
417 if repoName == "" {
418 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
419 return
420 }
421
422 if err := validateRepoName(repoName); err != nil {
423 s.pages.Notice(w, "repo", err.Error())
424 return
425 }
426 repoName = stripGitExt(repoName)
427 l = l.With("repoName", repoName)
428
429 defaultBranch := r.FormValue("branch")
430 if defaultBranch == "" {
431 defaultBranch = "main"
432 }
433 l = l.With("defaultBranch", defaultBranch)
434
435 description := r.FormValue("description")
436 if len([]rune(description)) > 140 {
437 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
438 return
439 }
440
441 // ACL validation
442 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
443 if err != nil || !ok {
444 l.Info("unauthorized")
445 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
446 return
447 }
448
449 // Check for existing repos
450 existingRepo, err := db.GetRepo(
451 s.db,
452 orm.FilterEq("did", user.Active.Did),
453 orm.FilterEq("name", repoName),
454 )
455 if err == nil && existingRepo != nil {
456 l.Info("repo exists")
457 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
458 return
459 }
460
461 rkey := tid.TID()
462
463 client, err := s.oauth.ServiceClient(
464 r,
465 oauth.WithService(domain),
466 oauth.WithLxm(tangled.RepoCreateNSID),
467 oauth.WithDev(s.config.Core.Dev),
468 )
469 if err != nil {
470 l.Error("service auth failed", "err", err)
471 s.pages.Notice(w, "repo", "Failed to reach knot server.")
472 return
473 }
474
475 input := &tangled.RepoCreate_Input{
476 Rkey: rkey,
477 Name: repoName,
478 DefaultBranch: &defaultBranch,
479 }
480 createResp, xe := tangled.RepoCreate(
481 r.Context(),
482 client,
483 input,
484 )
485 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
486 l.Error("xrpc error", "xe", xe)
487 s.pages.Notice(w, "repo", err.Error())
488 return
489 }
490
491 var repoDid string
492 if createResp != nil && createResp.RepoDid != nil {
493 repoDid = *createResp.RepoDid
494 }
495 if repoDid == "" {
496 l.Error("knot returned empty repo DID")
497 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
498 return
499 }
500
501 repo := &models.Repo{
502 Did: user.Active.Did,
503 Name: repoName,
504 Knot: domain,
505 Rkey: rkey,
506 Description: description,
507 Created: time.Now(),
508 Labels: s.config.Label.DefaultLabelDefs,
509 RepoDid: repoDid,
510 }
511 record := repo.AsRecord()
512
513 cleanupKnot := func() {
514 go func() {
515 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
516 for attempt, delay := range delays {
517 time.Sleep(delay)
518 deleteClient, dErr := s.oauth.ServiceClient(
519 r,
520 oauth.WithService(domain),
521 oauth.WithLxm(tangled.RepoDeleteNSID),
522 oauth.WithDev(s.config.Core.Dev),
523 )
524 if dErr != nil {
525 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
526 continue
527 }
528 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
529 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
530 Did: user.Active.Did,
531 Name: repoName,
532 Rkey: rkey,
533 }); dErr != nil {
534 cancel()
535 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
536 continue
537 }
538 cancel()
539 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
540 return
541 }
542 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
543 "did", user.Active.Did, "repo", repoName, "knot", domain)
544 }()
545 }
546
547 atpClient, err := s.oauth.AuthorizedClient(r)
548 if err != nil {
549 l.Info("PDS write failed", "err", err)
550 cleanupKnot()
551 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
552 return
553 }
554
555 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
556 Collection: tangled.RepoNSID,
557 Repo: user.Active.Did,
558 Rkey: rkey,
559 Record: &lexutil.LexiconTypeDecoder{
560 Val: &record,
561 },
562 })
563 if err != nil {
564 l.Info("PDS write failed", "err", err)
565 cleanupKnot()
566 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
567 return
568 }
569
570 aturi := atresp.Uri
571 l = l.With("aturi", aturi)
572 l.Info("wrote to PDS")
573
574 tx, err := s.db.BeginTx(r.Context(), nil)
575 if err != nil {
576 l.Info("txn failed", "err", err)
577 s.pages.Notice(w, "repo", "Failed to save repository information.")
578 return
579 }
580
581 rollback := func() {
582 err1 := tx.Rollback()
583 err2 := s.enforcer.E.LoadPolicy()
584 err3 := rollbackRecord(context.Background(), aturi, atpClient)
585
586 if errors.Is(err1, sql.ErrTxDone) {
587 err1 = nil
588 }
589
590 if errs := errors.Join(err1, err2, err3); errs != nil {
591 l.Error("failed to rollback changes", "errs", errs)
592 }
593
594 if aturi != "" {
595 cleanupKnot()
596 }
597 }
598 defer rollback()
599
600 err = db.AddRepo(tx, repo)
601 if err != nil {
602 l.Error("db write failed", "err", err)
603 s.pages.Notice(w, "repo", "Failed to save repository information.")
604 return
605 }
606
607 rbacPath := repo.RepoIdentifier()
608 err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath)
609 if err != nil {
610 l.Error("acl setup failed", "err", err)
611 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
612 return
613 }
614
615 err = tx.Commit()
616 if err != nil {
617 l.Error("txn commit failed", "err", err)
618 http.Error(w, err.Error(), http.StatusInternalServerError)
619 return
620 }
621
622 err = s.enforcer.E.SavePolicy()
623 if err != nil {
624 l.Error("acl save failed", "err", err)
625 http.Error(w, err.Error(), http.StatusInternalServerError)
626 return
627 }
628
629 aturi = ""
630
631 s.notifier.NewRepo(r.Context(), repo)
632 if repoDid != "" {
633 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
634 } else {
635 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
636 }
637 }
638}
639
640// this is used to rollback changes made to the PDS
641//
642// it is a no-op if the provided ATURI is empty
643func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
644 if aturi == "" {
645 return nil
646 }
647
648 parsed := syntax.ATURI(aturi)
649
650 collection := parsed.Collection().String()
651 repo := parsed.Authority().String()
652 rkey := parsed.RecordKey().String()
653
654 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
655 Collection: collection,
656 Repo: repo,
657 Rkey: rkey,
658 })
659 return err
660}
661
662func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
663 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
664 if err != nil {
665 return err
666 }
667 // already present
668 if len(defaultLabels) == len(defaults) {
669 return nil
670 }
671
672 labelDefs, err := models.FetchLabelDefs(r, defaults)
673 if err != nil {
674 return err
675 }
676
677 // Insert each label definition to the database
678 for _, labelDef := range labelDefs {
679 _, err = db.AddLabelDefinition(e, &labelDef)
680 if err != nil {
681 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
682 }
683 }
684
685 return nil
686}
687
688func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
689 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
690 if err != nil {
691 logger.Error("failed to resolve tangled.org DID", "err", err)
692 return
693 }
694
695 pdsEndpoint := resolved.PDSEndpoint()
696 if pdsEndpoint == "" {
697 logger.Error("no PDS endpoint found for tangled.sh DID")
698 return
699 }
700
701 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
702 if err != nil {
703 logger.Error("failed to create appassword session... skipping fetch", "err", err)
704 return
705 }
706
707 client := xrpc.Client{
708 Auth: &xrpc.AuthInfo{
709 AccessJwt: session.AccessJwt,
710 Did: session.Did,
711 },
712 Host: session.PdsEndpoint,
713 }
714
715 l := log.SubLogger(logger, "bluesky")
716
717 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
718 defer ticker.Stop()
719
720 for {
721 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
722 if err != nil {
723 l.Error("failed to fetch bluesky posts", "err", err)
724 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
725 l.Error("failed to insert bluesky posts", "err", err)
726 } else {
727 l.Info("inserted bluesky posts", "count", len(posts))
728 }
729
730 select {
731 case <-ticker.C:
732 case <-ctx.Done():
733 l.Info("stopping bluesky updater")
734 return
735 }
736 }
737}